class-events-store.php 16.7 KB
Newer Older
Erick Hitter's avatar
Erick Hitter committed
1
2
3
4
5
6
7
8
9
10
11
12
<?php

namespace Automattic\WP\Cron_Control;

class Events_Store extends Singleton {
	/**
	 * PLUGIN SETUP
	 */

	/**
	 * Class properties
	 */
13
	const TABLE_SUFFIX = 'a8c_cron_control_jobs';
14
15
16

	const DB_VERSION        = 1;
	const DB_VERSION_OPTION = 'a8c_cron_control_db_version';
17
	const TABLE_CREATE_LOCK = 'a8c_cron_control_creating_table';
18

19
20
21
22
23
24
25
	const STATUS_PENDING   = 'pending';
	const STATUS_RUNNING   = 'running';
	const STATUS_COMPLETED = 'complete';

	const CACHE_KEY = 'a8c_cron_ctrl_option';

	private $job_creation_suspended = false;
Erick Hitter's avatar
Erick Hitter committed
26
27
28
29

	/**
	 * Register hooks
	 */
30
31
	protected function class_init() {
		// Check that the table exists and is the correct version
32
		$this->prepare_tables();
33
34
35
36

		// Option interception
		add_filter( 'pre_option_cron', array( $this, 'get_option' ) );
		add_filter( 'pre_update_option_cron', array( $this, 'update_option' ), 10, 2 );
Erick Hitter's avatar
Erick Hitter committed
37
38
39

		// Disallow duplicates
		add_filter( 'schedule_event', array( $this, 'block_creation_if_job_exists' ) );
40
41
42
43
44
	}

	/**
	 * Build appropriate table name for this install
	 */
45
	public function get_table_name() {
46
47
		global $wpdb;

48
		return $wpdb->prefix . self::TABLE_SUFFIX;
49
50
	}

Erick Hitter's avatar
Erick Hitter committed
51
52
53
54
55
56
57
58
59
60
61
	/**
	 * Build array of valid event statuses
	 */
	public function get_allowed_statuses() {
		return array(
			self::STATUS_PENDING,
			self::STATUS_RUNNING,
			self::STATUS_COMPLETED,
		);
	}

62
63
64
	/**
	 * Create the plugin's DB table when necessary
	 */
65
	protected function prepare_tables() {
66
67
68
69
70
		// Nothing to do
		if ( (int) get_option( self::DB_VERSION_OPTION ) === self::DB_VERSION ) {
			return;
		}

71
72
73
74
75
		// Limit chance of race conditions when creating table
		if ( false === wp_cache_add( self::TABLE_CREATE_LOCK, 1, null, 1 * \MINUTE_IN_SECONDS ) ) {
			return;
		}

76
77
78
79
80
81
82
83
		// Use Core's method of creating/updating tables
		if ( ! function_exists( 'dbDelta' ) ) {
			require_once ABSPATH . '/wp-admin/includes/upgrade.php';
		}

		global $wpdb;

		// Define schema and create the table
84
		$schema = "CREATE TABLE `{$this->get_table_name()}` (
85
			`ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
86
87
88

			`timestamp` bigint(20) unsigned NOT NULL,
			`action` varchar(255) NOT NULL,
89
			`action_hashed` varchar(32) NOT NULL,
90
91
92
93
			`instance` varchar(32) NOT NULL,

			`args` longtext NOT NULL,
			`schedule` varchar(255) DEFAULT NULL,
94
95
			`interval` int unsigned DEFAULT 0,
			`status` varchar(32) NOT NULL DEFAULT 'pending',
96
97
98
99

			`created` datetime NOT NULL,
			`last_modified` datetime NOT NULL,

Erick Hitter's avatar
Erick Hitter committed
100
			PRIMARY KEY (`ID`),
101
			UNIQUE KEY `ts_action_instance_status` (`timestamp`, `action` (191), `instance`, `status`)
102
103
104
105
106
107
108
109
110
111
112
113
114
		) ENGINE=InnoDB;\n";

		dbDelta( $schema, true );

		// Confirm that the table was created, and set the option to prevent further updates
		$table_count = count( $wpdb->get_col( "SHOW TABLES LIKE '{$this->get_table_name()}'" ) );

		if ( 1 === $table_count ) {
			update_option( self::DB_VERSION_OPTION, self::DB_VERSION, true );
		} else {
			delete_option( self::DB_VERSION_OPTION );
		}
	}
115
116
117
118
119
120

	/**
	 * PLUGIN FUNCTIONALITY
	 */

	/**
121
	 * Override cron option requests with data from custom table
122
123
	 */
	public function get_option() {
124
125
		// Use cached value when available
		$cached_option = wp_cache_get( self::CACHE_KEY, null, true );
126

127
128
		if ( false !== $cached_option ) {
			return $cached_option;
129
130
131
132
133
134
135
136
		}

		// Start building a new cron option
		$cron_array = array(
			'version' => 2, // Core versions the cron array; without this, events will continually requeue
		);

		// Get events to re-render as the cron option
137
138
		$page     = 1;
		$quantity = 100;
139
140
141
142

		do {
			$jobs_posts = $this->get_jobs( array(
				'status'   => self::STATUS_PENDING,
143
				'quantity' => $quantity,
144
				'page'     => $page++,
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
			) );

			// Nothing more to add
			if ( empty( $jobs_posts ) ) {
				break;
			}

			// Something's probably wrong if a site has more than 1,500 pending cron actions
			if ( $page > 15 ) {
				do_action( 'a8c_cron_control_stopped_runaway_cron_option_rebuild' );
				break;
			}

			// Loop through results and built output Core expects
			if ( ! empty( $jobs_posts ) ) {
				foreach ( $jobs_posts as $jobs_post ) {
					// Alias event timestamp
162
					$timestamp = $jobs_post->timestamp;
163
164
165
166
167
168
169
170
171
172
173
174
175
176

					// If timestamp is invalid, event is removed to let its source fix it
					if ( $timestamp <= 0 ) {
						$this->mark_job_record_completed( $jobs_post->ID );
						continue;
					}

					// Basic arguments to add a job to the array format Core expects
					$action   = $jobs_post->action;
					$instance = $jobs_post->instance;

					// Populate remaining job data
					$cron_array[ $timestamp ][ $action ][ $instance ] = array(
						'schedule' => $jobs_post->schedule,
177
						'args'     => $jobs_post->args,
178
						'interval' => 0,
179
180
181
					);

					if ( isset( $jobs_post->interval ) ) {
182
						$cron_array[ $timestamp ][ $action ][ $instance ]['interval'] = $jobs_post->interval;
183
					}
184
				}
185
			}
186
		} while( count( $jobs_posts ) >= $quantity );
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203

		// Re-sort the array just as Core does when events are scheduled
		// Ensures events are sorted chronologically
		uksort( $cron_array, 'strnatcasecmp' );

		// Cache the results, bearing in mind that they won't be used during unscheduling events
		wp_cache_set( self::CACHE_KEY, $cron_array, null, 1 * \HOUR_IN_SECONDS );

		return $cron_array;
	}

	/**
	 * Handle requests to update the cron option
	 *
	 * By returning $old_value, `cron` option won't be updated
	 */
	public function update_option( $new_value, $old_value ) {
204
205
206
		// Find changes to record
		$new_events     = $this->find_cron_array_differences( $new_value, $old_value );
		$deleted_events = $this->find_cron_array_differences( $old_value, $new_value );
207

208
209
		// Add/update new events
		foreach ( $new_events as $new_event ) {
210
			$job_id = $this->get_job_id( $new_event['timestamp'], $new_event['action'], $new_event['instance'] );
211

212
213
214
			if ( 0 === $job_id ) {
				$job_id = null;
			}
215

216
			$this->create_or_update_job( $new_event['timestamp'], $new_event['action'], $new_event['args'], $job_id, false );
217
218
		}

219
220
		// Mark deleted entries for removal
		foreach ( $deleted_events as $deleted_event ) {
221
			$this->mark_job_completed( $deleted_event['timestamp'], $deleted_event['action'], $deleted_event['instance'], false );
222
		}
223

224
225
		$this->flush_internal_caches();

226
		return $old_value;
227
228
	}

Erick Hitter's avatar
Erick Hitter committed
229
230
231
232
233
	/**
	 * When an entry exists, don't try to create it again
	 */
	public function block_creation_if_job_exists( $job ) {
		$instance = md5( maybe_serialize( $job->args ) );
234
		if ( 0 !== $this->get_job_id( $job->timestamp, $job->hook, $instance ) ) {
Erick Hitter's avatar
Erick Hitter committed
235
236
237
238
239
240
			return false;
		}

		return $job;
	}

241
242
243
244
245
	/**
	 * PLUGIN UTILITY METHODS
	 */

	/**
Erick Hitter's avatar
Erick Hitter committed
246
247
248
249
	 * Retrieve jobs given a set of parameters
	 *
	 * @param array $args
	 * @return array|false
250
	 */
251
	public function get_jobs( $args ) {
252
253
254
255
256
257
258
259
260
261
262
263
264
		global $wpdb;

		if ( ! isset( $args['quantity'] ) || ! is_numeric( $args['quantity'] ) ) {
			$args['quantity'] = 100;
		}

		if ( isset( $args['page'] ) ) {
			$page  = max( 0, $args['page'] - 1 );
			$offset = $page * $args['quantity'];
		} else {
			$offset = 0;
		}

Erick Hitter's avatar
Erick Hitter committed
265
266
		// Do not sort, otherwise index isn't used
		$jobs = $wpdb->get_results( $wpdb->prepare( "SELECT * FROM {$this->get_table_name()} WHERE status = %s LIMIT %d,%d;", $args['status'], $offset, $args['quantity'] ), 'OBJECT' );
267
268
269
270
271
272
273
274

		if ( is_array( $jobs ) ) {
			$jobs = array_map( array( $this, 'format_job' ), $jobs );
		} else {
			$jobs = false;
		}

		return $jobs;
275
276
	}

277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
	/**
	 * Retrieve a single event by ID, or by a combination of its timestamp, instance identifier, and either action or the action's hashed representation
	 *
	 * @param  array $attrs Array of event attributes to query by
	 * @return object|false
	 */
	public function get_job( $attrs ) {
		global $wpdb;

		// Validate basic inputs
		if ( ! is_array( $attrs ) || empty( $attrs ) ) {
			return false;
		}

		// Validate requested status
Erick Hitter's avatar
Erick Hitter committed
292
293
		$allowed_status = $this->get_allowed_statuses();
		$allowed_status[] = 'any';
294
295
296
297
298
299

		if ( ! isset( $attrs['status'] ) || ! in_array( $attrs['status'], $allowed_status, true ) ) {
			$attrs['status'] = self::STATUS_PENDING;
		}

		// Validate attributes provided to query for a post
300
301
302
		$query              = null;
		$query_replacements = array();

303
		if ( isset( $attrs['ID'] ) ) {
304
305
			$query                = "SELECT * FROM {$this->get_table_name()} WHERE ID = %d";
			$query_replacements[] = $attrs['ID'];
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
		} else {
			// Need a timestamp, an instance, and either an action or its hashed representation
			if ( ! isset( $attrs['timestamp'] ) || ! isset( $attrs['instance'] ) ) {
				return false;
			} elseif ( ! isset( $attrs['action'] ) && ! isset( $attrs['action_hashed'] ) ) {
				return false;
			}

			// Build query
			if ( isset( $attrs['action'] ) ) {
				$action_column = 'action';
				$action_value  = $attrs['action'];
			} else {
				$action_column = 'action_hashed';
				$action_value  = $attrs['action_hashed'];
			}

Erick Hitter's avatar
Erick Hitter committed
323
			// Do not sort, otherwise index isn't used
324
325
326
327
			$query                = "SELECT * FROM {$this->get_table_name()} WHERE timestamp = %d AND {$action_column} = %s AND instance = %s";
			$query_replacements[] = $attrs['timestamp'];
			$query_replacements[] = $action_value;
			$query_replacements[] = $attrs['instance'];
328
329
330
331
		}

		// Final query preparations
		if ( 'any' !== $attrs['status'] ) {
332
333
			$query                .= ' AND status = %s';
			$query_replacements[]  = $attrs['status'];
334
335
336
337
		}

		$query .= ' LIMIT 1';

338
339
		$query = $wpdb->prepare( $query, $query_replacements );

340
341
342
343
344
345
346
347
348
349
350
351
		// Query and format results
		$job = $wpdb->get_row( $query );

		if ( is_object( $job ) && ! is_wp_error( $job ) ) {
			$job = $this->format_job( $job );
		} else {
			$job = false;
		}

		return $job;
	}

352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
	/**
	 * Get ID for given event details
	 *
	 * Used in situations where performance matters, which is why it exists despite duplicating `get_job()`
	 * Queries outside of this class should use `get_job()`
	 *
	 * @param  int    $timestamp    Unix timestamp event executes at
	 * @param  string $action       Name of action used when the event is registered (unhashed)
	 * @param  string $instance     md5 hash of the event's arguments array, which Core uses to index the `cron` option
	 * @return int
	 */
	private function get_job_id( $timestamp, $action, $instance ) {
		global $wpdb;

		$job = $wpdb->get_col( $wpdb->prepare( "SELECT ID FROM {$this->get_table_name()} WHERE timestamp = %d AND action = %s AND instance = %s AND status = %s LIMIT 1;", $timestamp, $action, $instance, self::STATUS_PENDING ) );

		return empty( $job ) ? 0 : (int) array_shift( $job );
	}

371
372
	/**
	 * Standardize formatting and expand serialized data
373
374
375
	 *
	 * @param  object $job Job row from DB, in object form
	 * @return object
376
	 */
377
378
379
	private function format_job( $job ) {
		if ( ! is_object( $job ) || is_wp_error( $job ) ) {
			return $job;
380
381
		}

382
383
384
385
		$job->ID        = (int) $job->ID;
		$job->timestamp = (int) $job->timestamp;
		$job->interval  = (int) $job->interval;
		$job->args      = maybe_unserialize( $job->args );
386

387
388
389
390
		if ( empty( $job->schedule ) ) {
			$job->schedule = false;
		}

391
		return $job;
392
393
394
	}

	/**
395
	 * Create or update entry for a given job
396
397
398
399
400
401
	 *
	 * @param int    $timestamp    Unix timestamp event executes at
	 * @param string $action       Hook event fires
	 * @param array  $args         Array of event's schedule, arguments, and interval
	 * @param bool   $update_id    ID of existing entry to update, rather than creating a new entry
	 * @param bool   $flush_cache  Whether or not to flush internal caches after creating/updating the event
402
	 */
403
	public function create_or_update_job( $timestamp, $action, $args, $update_id = null, $flush_cache = true ) {
404
405
406
407
408
409
410
411
412
413
		// Don't create new jobs when manipulating jobs via the plugin's CLI commands
		if ( $this->job_creation_suspended ) {
			return;
		}

		global $wpdb;

		$job_post = array(
			'timestamp'     => $timestamp,
			'action'        => $action,
414
			'action_hashed' => md5( $action ),
415
			'instance'      => md5( maybe_serialize( $args['args'] ) ),
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
			'args'          => maybe_serialize( $args['args'] ),
			'last_modified' => current_time( 'mysql', true ),
		);

		if ( isset( $args['schedule'] ) && ! empty( $args['schedule'] ) ) {
			$job_post['schedule'] = $args['schedule'];
		}

		if ( isset( $args['interval'] ) && ! empty( $args['interval'] ) && is_numeric( $args['interval'] ) ) {
			$job_post['interval'] = (int) $args['interval'];
		}

		// Create the post, or update an existing entry to run again in the future
		if ( is_int( $update_id ) && $update_id > 0 ) {
			$wpdb->update( $this->get_table_name(), $job_post, array( 'ID' => $update_id, ) );
		} else {
Erick Hitter's avatar
Erick Hitter committed
432
			$job_post['created'] = current_time( 'mysql', true );
433
434
435
436

			$wpdb->insert( $this->get_table_name(), $job_post );
		}

437
438
439
440
441
		// Delete internal cache
		// Should only be skipped during bulk operations
		if ( $flush_cache ) {
			$this->flush_internal_caches();
		}
442
443
444
	}

	/**
445
	 * Mark an event's entry as completed
446
447
448
	 *
	 * Completed entries will be cleaned up by an internal job
	 *
449
450
451
452
	 * @param int    $timestamp    Unix timestamp event executes at
	 * @param string $action       Name of action used when the event is registered (unhashed)
	 * @param string $instance     md5 hash of the event's arguments array, which Core uses to index the `cron` option
	 * @param bool   $flush_cache  Whether or not to flush internal caches after creating/updating the event
453
454
	 * @return bool
	 */
455
	public function mark_job_completed( $timestamp, $action, $instance, $flush_cache = true ) {
456
		$job_id = $this->get_job_id( $timestamp, $action, $instance );
457
458
459
460
461

		if ( ! $job_id ) {
			return false;
		}

462
		return $this->mark_job_record_completed( $job_id, $flush_cache );
463
464
465
466
	}

	/**
	 * Set a job post to the "completed" status
467
468
469
470
	 *
	 * @param int $job_id        ID of job's record
	 * @param bool $flush_cache  Whether or not to flush internal caches after creating/updating the event
	 * @return bool
471
	 */
Erick Hitter's avatar
Erick Hitter committed
472
	public function mark_job_record_completed( $job_id, $flush_cache = true ) {
473
474
		global $wpdb;

475
476
477
478
479
480
481
482
		/**
		 * Constraint is broken to accommodate the following situation:
		 * 1. Event with specific timestamp is scheduled.
		 * 2. Event is unscheduled.
		 * 3. Event is rescheduled.
		 * 4. Event runs, or is unscheduled, but unique constraint prevents query from succeeding.
		 * 5. Event retains `pending` status and runs again. Repeat steps 4 and 5 until `a8c_cron_control_purge_completed_events` runs and removes the entry from step 2.
		 */
483
484
		$updates = array(
			'status'   => self::STATUS_COMPLETED,
485
			'instance' => mt_rand( 1000000, 999999999 ), // Breaks unique constraint, and can be recreated from entry's remaining data
486
487
488
		);

		$success = $wpdb->update( $this->get_table_name(), $updates, array( 'ID' => $job_id, ) );
489
490

		// Delete internal cache
491
		// Should only be skipped during bulk operations
492
		if ( $flush_cache ) {
Erick Hitter's avatar
Erick Hitter committed
493
			$this->flush_internal_caches();
494
495
		}

Erick Hitter's avatar
Erick Hitter committed
496
		return (bool) $success;
497
498
	}

499
	/**
500
	 * Compare two arrays and return collapsed representation of their differences
501
	 *
502
503
	 * @param array $new New cron array
	 * @param array $old Old cron array
504
	 *
505
	 * @return array
506
	 */
507
	private function find_cron_array_differences( $new, $old ) {
508
509
		$differences = array();

510
		$new = collapse_events_array( $new );
511

512
513
		foreach ( $new as $event ) {
			$event = (object) $event;
514

515
			if ( ! isset( $old[ $event->timestamp ][ $event->action ][ $event->instance ] ) ) {
516
				$differences[] = array(
517
518
519
520
					'timestamp' => $event->timestamp,
					'action'    => $event->action,
					'instance'  => $event->instance,
					'args'      => $event->args,
521
522
523
524
525
526
527
				);
			}
		}

		return $differences;
	}

Erick Hitter's avatar
Erick Hitter committed
528
529
530
531
532
533
534
	/**
	 * Delete the cached representation of the cron option
	 */
	public function flush_internal_caches() {
		return wp_cache_delete( self::CACHE_KEY );
	}

535
	/**
536
	 * Prevent event store from creating new entries
537
538
539
540
541
542
543
544
	 *
	 * Should be used sparingly, and followed by a call to resume_event_creation(), during bulk operations
	 */
	public function suspend_event_creation() {
		$this->job_creation_suspended = true;
	}

	/**
545
	 * Stop discarding events, once again storing them in the table
546
547
548
549
550
551
552
553
	 */
	public function resume_event_creation() {
		$this->job_creation_suspended = false;
	}

	/**
	 * Remove entries for non-recurring events that have been run
	 */
554
	public function purge_completed_events( $count_first = true ) {
555
556
		global $wpdb;

557
558
		// Skip count if already performed
		if ( $count_first ) {
559
560
561
562
563
			if ( property_exists( $wpdb, 'srtm' ) ) {
				$srtm = $wpdb->srtm;
				$wpdb->srtm = true;
			}

564
			$count = $this->count_events_by_status( self::STATUS_COMPLETED );
565
566
567
568

			if ( isset( $srtm ) ) {
				$wpdb->srtm = $srtm;
			}
569
570
571
572
573
574
575
		} else {
			$count = 1;
		}

		if ( $count > 0 ) {
			$wpdb->delete( $this->get_table_name(), array( 'status' => self::STATUS_COMPLETED, ) );
		}
576
	}
Erick Hitter's avatar
Erick Hitter committed
577
578
579
580
581
582
583
584
585
586

	/**
	 * Count number of events with a given status
	 *
	 * @param string $status
	 * @return int|false
	 */
	public function count_events_by_status( $status ) {
		global $wpdb;

Erick Hitter's avatar
Erick Hitter committed
587
		if ( ! in_array( $status, $this->get_allowed_statuses(), true ) ) {
Erick Hitter's avatar
Erick Hitter committed
588
589
590
			return false;
		}

591
		return (int) $wpdb->get_var( $wpdb->prepare( "SELECT COUNT(ID) FROM {$this->get_table_name()} WHERE status = %s", $status ) );
Erick Hitter's avatar
Erick Hitter committed
592
	}
Erick Hitter's avatar
Erick Hitter committed
593
594
595
}

Events_Store::instance();