class-events-store.php 15.8 KB
Newer Older
Erick Hitter's avatar
Erick Hitter committed
1
2
3
4
5
6
7
8
9
10
11
12
<?php

namespace Automattic\WP\Cron_Control;

class Events_Store extends Singleton {
	/**
	 * PLUGIN SETUP
	 */

	/**
	 * Class properties
	 */
13
	const TABLE_SUFFIX = 'a8c_cron_control_jobs';
14
15
16

	const DB_VERSION        = 1;
	const DB_VERSION_OPTION = 'a8c_cron_control_db_version';
17
	const TABLE_CREATE_LOCK = 'a8c_cron_control_creating_table';
18

19
20
21
22
23
24
25
	const STATUS_PENDING   = 'pending';
	const STATUS_RUNNING   = 'running';
	const STATUS_COMPLETED = 'complete';

	const CACHE_KEY = 'a8c_cron_ctrl_option';

	private $job_creation_suspended = false;
Erick Hitter's avatar
Erick Hitter committed
26
27
28
29

	/**
	 * Register hooks
	 */
30
31
	protected function class_init() {
		// Check that the table exists and is the correct version
32
		$this->prepare_tables();
33
34
35
36

		// Option interception
		add_filter( 'pre_option_cron', array( $this, 'get_option' ) );
		add_filter( 'pre_update_option_cron', array( $this, 'update_option' ), 10, 2 );
Erick Hitter's avatar
Erick Hitter committed
37
38
39

		// Disallow duplicates
		add_filter( 'schedule_event', array( $this, 'block_creation_if_job_exists' ) );
40
41
42
43
44
	}

	/**
	 * Build appropriate table name for this install
	 */
45
	public function get_table_name() {
46
47
		global $wpdb;

48
		return $wpdb->prefix . self::TABLE_SUFFIX;
49
50
	}

Erick Hitter's avatar
Erick Hitter committed
51
52
53
54
55
56
57
58
59
60
61
	/**
	 * Build array of valid event statuses
	 */
	public function get_allowed_statuses() {
		return array(
			self::STATUS_PENDING,
			self::STATUS_RUNNING,
			self::STATUS_COMPLETED,
		);
	}

62
63
64
	/**
	 * Create the plugin's DB table when necessary
	 */
65
	protected function prepare_tables() {
66
67
68
69
70
		// Nothing to do
		if ( (int) get_option( self::DB_VERSION_OPTION ) === self::DB_VERSION ) {
			return;
		}

71
72
73
74
75
		// Limit chance of race conditions when creating table
		if ( false === wp_cache_add( self::TABLE_CREATE_LOCK, 1, null, 1 * \MINUTE_IN_SECONDS ) ) {
			return;
		}

76
77
78
79
80
81
82
83
		// Use Core's method of creating/updating tables
		if ( ! function_exists( 'dbDelta' ) ) {
			require_once ABSPATH . '/wp-admin/includes/upgrade.php';
		}

		global $wpdb;

		// Define schema and create the table
84
		$schema = "CREATE TABLE `{$this->get_table_name()}` (
85
			`ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
86
87
88

			`timestamp` bigint(20) unsigned NOT NULL,
			`action` varchar(255) NOT NULL,
89
			`action_hashed` varchar(32) NOT NULL,
90
91
92
93
			`instance` varchar(32) NOT NULL,

			`args` longtext NOT NULL,
			`schedule` varchar(255) DEFAULT NULL,
94
95
			`interval` int unsigned DEFAULT 0,
			`status` varchar(32) NOT NULL DEFAULT 'pending',
96
97
98
99

			`created` datetime NOT NULL,
			`last_modified` datetime NOT NULL,

Erick Hitter's avatar
Erick Hitter committed
100
			PRIMARY KEY (`ID`),
101
			UNIQUE KEY `ts_action_instance` (`timestamp`, `action`, `instance`, `status`)
102
103
104
105
106
107
108
109
110
111
112
113
114
		) ENGINE=InnoDB;\n";

		dbDelta( $schema, true );

		// Confirm that the table was created, and set the option to prevent further updates
		$table_count = count( $wpdb->get_col( "SHOW TABLES LIKE '{$this->get_table_name()}'" ) );

		if ( 1 === $table_count ) {
			update_option( self::DB_VERSION_OPTION, self::DB_VERSION, true );
		} else {
			delete_option( self::DB_VERSION_OPTION );
		}
	}
115
116
117
118
119
120

	/**
	 * PLUGIN FUNCTIONALITY
	 */

	/**
121
	 * Override cron option requests with data from custom table
122
123
	 */
	public function get_option() {
124
125
		// Use cached value when available
		$cached_option = wp_cache_get( self::CACHE_KEY, null, true );
126

127
128
		if ( false !== $cached_option ) {
			return $cached_option;
129
130
131
132
133
134
135
136
		}

		// Start building a new cron option
		$cron_array = array(
			'version' => 2, // Core versions the cron array; without this, events will continually requeue
		);

		// Get events to re-render as the cron option
137
138
		$page     = 1;
		$quantity = 100;
139
140
141
142

		do {
			$jobs_posts = $this->get_jobs( array(
				'status'   => self::STATUS_PENDING,
143
144
				'quantity' => $quantity,
				'page'     => $page,
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
			) );

			// Nothing more to add
			if ( empty( $jobs_posts ) ) {
				break;
			}

			$page++;

			// Something's probably wrong if a site has more than 1,500 pending cron actions
			if ( $page > 15 ) {
				do_action( 'a8c_cron_control_stopped_runaway_cron_option_rebuild' );
				break;
			}

			// Loop through results and built output Core expects
			if ( ! empty( $jobs_posts ) ) {
				foreach ( $jobs_posts as $jobs_post ) {
					// Alias event timestamp
164
					$timestamp = $jobs_post->timestamp;
165
166
167
168
169
170
171
172
173
174
175
176
177
178

					// If timestamp is invalid, event is removed to let its source fix it
					if ( $timestamp <= 0 ) {
						$this->mark_job_record_completed( $jobs_post->ID );
						continue;
					}

					// Basic arguments to add a job to the array format Core expects
					$action   = $jobs_post->action;
					$instance = $jobs_post->instance;

					// Populate remaining job data
					$cron_array[ $timestamp ][ $action ][ $instance ] = array(
						'schedule' => $jobs_post->schedule,
179
						'args'     => $jobs_post->args,
180
						'interval' => 0,
181
182
183
					);

					if ( isset( $jobs_post->interval ) ) {
184
						$cron_array[ $timestamp ][ $action ][ $instance ]['interval'] = $jobs_post->interval;
185
					}
186
				}
187

188
189
190
				// No need to keep looping if there were fewer events than we asked for
				if ( count( $jobs_posts ) < $quantity ) {
					break;
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
				}
			}
		} while( true );

		// Re-sort the array just as Core does when events are scheduled
		// Ensures events are sorted chronologically
		uksort( $cron_array, 'strnatcasecmp' );

		// Cache the results, bearing in mind that they won't be used during unscheduling events
		wp_cache_set( self::CACHE_KEY, $cron_array, null, 1 * \HOUR_IN_SECONDS );

		return $cron_array;
	}

	/**
	 * Handle requests to update the cron option
	 *
	 * By returning $old_value, `cron` option won't be updated
	 */
	public function update_option( $new_value, $old_value ) {
211
212
213
		// Find changes to record
		$new_events     = $this->find_cron_array_differences( $new_value, $old_value );
		$deleted_events = $this->find_cron_array_differences( $old_value, $new_value );
214

215
216
		// Add/update new events
		foreach ( $new_events as $new_event ) {
217
			$job_id = $this->job_exists( $new_event['timestamp'], $new_event['action'], $new_event['instance'], true );
218

219
220
221
			if ( 0 === $job_id ) {
				$job_id = null;
			}
222

223
			$this->create_or_update_job( $new_event['timestamp'], $new_event['action'], $new_event['args'], $job_id, false );
224
225
		}

226
227
		// Mark deleted entries for removal
		foreach ( $deleted_events as $deleted_event ) {
228
			$this->mark_job_completed( $deleted_event['timestamp'], $deleted_event['action'], $deleted_event['instance'], false );
229
		}
230

231
232
		$this->flush_internal_caches();

233
		return $old_value;
234
235
	}

Erick Hitter's avatar
Erick Hitter committed
236
237
238
239
240
241
242
243
244
245
246
247
	/**
	 * When an entry exists, don't try to create it again
	 */
	public function block_creation_if_job_exists( $job ) {
		$instance = md5( maybe_serialize( $job->args ) );
		if ( $this->job_exists( $job->timestamp, $job->hook, $instance ) ) {
			return false;
		}

		return $job;
	}

248
249
250
251
252
	/**
	 * PLUGIN UTILITY METHODS
	 */

	/**
Erick Hitter's avatar
Erick Hitter committed
253
254
255
256
	 * Retrieve jobs given a set of parameters
	 *
	 * @param array $args
	 * @return array|false
257
	 */
258
	public function get_jobs( $args ) {
259
260
261
262
263
264
265
266
267
268
269
270
271
		global $wpdb;

		if ( ! isset( $args['quantity'] ) || ! is_numeric( $args['quantity'] ) ) {
			$args['quantity'] = 100;
		}

		if ( isset( $args['page'] ) ) {
			$page  = max( 0, $args['page'] - 1 );
			$offset = $page * $args['quantity'];
		} else {
			$offset = 0;
		}

Erick Hitter's avatar
Erick Hitter committed
272
273
		// Do not sort, otherwise index isn't used
		$jobs = $wpdb->get_results( $wpdb->prepare( "SELECT * FROM {$this->get_table_name()} WHERE status = %s LIMIT %d,%d;", $args['status'], $offset, $args['quantity'] ), 'OBJECT' );
274
275
276
277
278
279
280
281

		if ( is_array( $jobs ) ) {
			$jobs = array_map( array( $this, 'format_job' ), $jobs );
		} else {
			$jobs = false;
		}

		return $jobs;
282
283
	}

284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
	/**
	 * Retrieve a single event by ID, or by a combination of its timestamp, instance identifier, and either action or the action's hashed representation
	 *
	 * @param  array $attrs Array of event attributes to query by
	 * @return object|false
	 */
	public function get_job( $attrs ) {
		global $wpdb;

		// Validate basic inputs
		if ( ! is_array( $attrs ) || empty( $attrs ) ) {
			return false;
		}

		// Validate requested status
Erick Hitter's avatar
Erick Hitter committed
299
300
		$allowed_status = $this->get_allowed_statuses();
		$allowed_status[] = 'any';
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325

		if ( ! isset( $attrs['status'] ) || ! in_array( $attrs['status'], $allowed_status, true ) ) {
			$attrs['status'] = self::STATUS_PENDING;
		}

		// Validate attributes provided to query for a post
		if ( isset( $attrs['ID'] ) ) {
			$query = $wpdb->prepare( "SELECT * FROM {$this->get_table_name()} WHERE ID = %d", $attrs['ID'] );
		} else {
			// Need a timestamp, an instance, and either an action or its hashed representation
			if ( ! isset( $attrs['timestamp'] ) || ! isset( $attrs['instance'] ) ) {
				return false;
			} elseif ( ! isset( $attrs['action'] ) && ! isset( $attrs['action_hashed'] ) ) {
				return false;
			}

			// Build query
			if ( isset( $attrs['action'] ) ) {
				$action_column = 'action';
				$action_value  = $attrs['action'];
			} else {
				$action_column = 'action_hashed';
				$action_value  = $attrs['action_hashed'];
			}

Erick Hitter's avatar
Erick Hitter committed
326
			// Do not sort, otherwise index isn't used
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
			$query = $wpdb->prepare( "SELECT * FROM {$this->get_table_name()} WHERE timestamp = %d AND {$action_column} = %s AND instance = %s", $attrs['timestamp'], $action_value, $attrs['instance'] );
		}

		// Final query preparations
		if ( 'any' !== $attrs['status'] ) {
			$query .= " AND status = '{$attrs['status']}'";
		}

		$query .= ' LIMIT 1';

		// Query and format results
		$job = $wpdb->get_row( $query );

		if ( is_object( $job ) && ! is_wp_error( $job ) ) {
			$job = $this->format_job( $job );
		} else {
			$job = false;
		}

		return $job;
	}

349
350
	/**
	 * Standardize formatting and expand serialized data
351
352
353
	 *
	 * @param  object $job Job row from DB, in object form
	 * @return object
354
	 */
355
356
357
	private function format_job( $job ) {
		if ( ! is_object( $job ) || is_wp_error( $job ) ) {
			return $job;
358
359
		}

360
361
362
363
		$job->ID        = (int) $job->ID;
		$job->timestamp = (int) $job->timestamp;
		$job->interval  = (int) $job->interval;
		$job->args      = maybe_unserialize( $job->args );
364

365
366
367
368
		if ( empty( $job->schedule ) ) {
			$job->schedule = false;
		}

369
		return $job;
370
371
372
373
374
375
376
377
378
379
	}

	/**
	 * Check if a job post exists
	 *
	 * Uses a direct query to avoid stale caches that result in duplicate events
	 */
	public function job_exists( $timestamp, $action, $instance, $return_id = false ) {
		global $wpdb;

380
		$exists = $wpdb->get_col( $wpdb->prepare( "SELECT ID FROM {$this->get_table_name()} WHERE timestamp = %d AND action = %s AND instance = %s AND status = %s LIMIT 1;", $timestamp, $action, $instance, self::STATUS_PENDING ) );
381
382
383
384
385
386
387
388
389

		if ( $return_id ) {
			return empty( $exists ) ? 0 : (int) array_shift( $exists );
		} else {
			return ! empty( $exists );
		}
	}

	/**
390
	 * Create or update entry for a given job
391
392
393
394
395
396
	 *
	 * @param int    $timestamp    Unix timestamp event executes at
	 * @param string $action       Hook event fires
	 * @param array  $args         Array of event's schedule, arguments, and interval
	 * @param bool   $update_id    ID of existing entry to update, rather than creating a new entry
	 * @param bool   $flush_cache  Whether or not to flush internal caches after creating/updating the event
397
	 */
398
	public function create_or_update_job( $timestamp, $action, $args, $update_id = null, $flush_cache = true ) {
399
400
401
402
403
404
405
406
407
408
		// Don't create new jobs when manipulating jobs via the plugin's CLI commands
		if ( $this->job_creation_suspended ) {
			return;
		}

		global $wpdb;

		$job_post = array(
			'timestamp'     => $timestamp,
			'action'        => $action,
409
			'action_hashed' => md5( $action ),
410
			'instance'      => md5( maybe_serialize( $args['args'] ) ),
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
			'args'          => maybe_serialize( $args['args'] ),
			'last_modified' => current_time( 'mysql', true ),
		);

		if ( isset( $args['schedule'] ) && ! empty( $args['schedule'] ) ) {
			$job_post['schedule'] = $args['schedule'];
		}

		if ( isset( $args['interval'] ) && ! empty( $args['interval'] ) && is_numeric( $args['interval'] ) ) {
			$job_post['interval'] = (int) $args['interval'];
		}

		// Create the post, or update an existing entry to run again in the future
		if ( is_int( $update_id ) && $update_id > 0 ) {
			$wpdb->update( $this->get_table_name(), $job_post, array( 'ID' => $update_id, ) );
		} else {
Erick Hitter's avatar
Erick Hitter committed
427
			$job_post['created'] = current_time( 'mysql', true );
428
429
430
431

			$wpdb->insert( $this->get_table_name(), $job_post );
		}

432
433
434
435
436
		// Delete internal cache
		// Should only be skipped during bulk operations
		if ( $flush_cache ) {
			$this->flush_internal_caches();
		}
437
438
439
	}

	/**
440
	 * Mark an event's entry as completed
441
442
443
	 *
	 * Completed entries will be cleaned up by an internal job
	 *
444
445
446
447
	 * @param int    $timestamp    Unix timestamp event executes at
	 * @param string $action       Name of action used when the event is registered (unhashed)
	 * @param string $instance     md5 hash of the event's arguments array, which Core uses to index the `cron` option
	 * @param bool   $flush_cache  Whether or not to flush internal caches after creating/updating the event
448
449
	 * @return bool
	 */
450
	public function mark_job_completed( $timestamp, $action, $instance, $flush_cache = true ) {
451
452
453
454
455
456
		$job_id = $this->job_exists( $timestamp, $action, $instance, true );

		if ( ! $job_id ) {
			return false;
		}

457
		return $this->mark_job_record_completed( $job_id, $flush_cache );
458
459
460
461
	}

	/**
	 * Set a job post to the "completed" status
462
463
464
465
	 *
	 * @param int $job_id        ID of job's record
	 * @param bool $flush_cache  Whether or not to flush internal caches after creating/updating the event
	 * @return bool
466
	 */
Erick Hitter's avatar
Erick Hitter committed
467
	public function mark_job_record_completed( $job_id, $flush_cache = true ) {
468
469
		global $wpdb;

470
471
472
473
474
475
		$updates = array(
			'status'   => self::STATUS_COMPLETED,
			'instance' => wp_rand( 1000000, 999999999 ), // Breaks unique constraint, and can be recreated from entry's remaining data
		);

		$success = $wpdb->update( $this->get_table_name(), $updates, array( 'ID' => $job_id, ) );
476
477

		// Delete internal cache
478
		// Should only be skipped during bulk operations
479
		if ( $flush_cache ) {
Erick Hitter's avatar
Erick Hitter committed
480
			$this->flush_internal_caches();
481
482
		}

Erick Hitter's avatar
Erick Hitter committed
483
		return (bool) $success;
484
485
	}

486
	/**
487
	 * Compare two arrays and return collapsed representation of their differences
488
	 *
489
490
	 * @param array $new New cron array
	 * @param array $old Old cron array
491
	 *
492
	 * @return array
493
	 */
494
	private function find_cron_array_differences( $new, $old ) {
495
496
		$differences = array();

497
		$new = collapse_events_array( $new );
498

499
500
		foreach ( $new as $event ) {
			$event = (object) $event;
501

502
			if ( ! isset( $old[ $event->timestamp ][ $event->action ][ $event->instance ] ) ) {
503
				$differences[] = array(
504
505
506
507
					'timestamp' => $event->timestamp,
					'action'    => $event->action,
					'instance'  => $event->instance,
					'args'      => $event->args,
508
509
510
511
512
513
514
				);
			}
		}

		return $differences;
	}

Erick Hitter's avatar
Erick Hitter committed
515
516
517
518
519
520
521
	/**
	 * Delete the cached representation of the cron option
	 */
	public function flush_internal_caches() {
		return wp_cache_delete( self::CACHE_KEY );
	}

522
	/**
523
	 * Prevent event store from creating new entries
524
525
526
527
528
529
530
531
	 *
	 * Should be used sparingly, and followed by a call to resume_event_creation(), during bulk operations
	 */
	public function suspend_event_creation() {
		$this->job_creation_suspended = true;
	}

	/**
532
	 * Stop discarding events, once again storing them in the table
533
534
535
536
537
538
539
540
	 */
	public function resume_event_creation() {
		$this->job_creation_suspended = false;
	}

	/**
	 * Remove entries for non-recurring events that have been run
	 */
541
	public function purge_completed_events( $count_first = true ) {
542
543
		global $wpdb;

544
545
		// Skip count if already performed
		if ( $count_first ) {
546
547
548
549
550
			if ( property_exists( $wpdb, 'srtm' ) ) {
				$srtm = $wpdb->srtm;
				$wpdb->srtm = true;
			}

551
			$count = $this->count_events_by_status( self::STATUS_COMPLETED );
552
553
554
555

			if ( isset( $srtm ) ) {
				$wpdb->srtm = $srtm;
			}
556
557
558
559
560
561
562
		} else {
			$count = 1;
		}

		if ( $count > 0 ) {
			$wpdb->delete( $this->get_table_name(), array( 'status' => self::STATUS_COMPLETED, ) );
		}
563
	}
Erick Hitter's avatar
Erick Hitter committed
564
565
566
567
568
569
570
571
572
573

	/**
	 * Count number of events with a given status
	 *
	 * @param string $status
	 * @return int|false
	 */
	public function count_events_by_status( $status ) {
		global $wpdb;

Erick Hitter's avatar
Erick Hitter committed
574
		if ( ! in_array( $status, $this->get_allowed_statuses(), true ) ) {
Erick Hitter's avatar
Erick Hitter committed
575
576
577
578
579
			return false;
		}

		return (int) $wpdb->get_var( $wpdb->prepare( "SELECT COUNT(action) FROM {$this->get_table_name()} WHERE status = %s", $status ) );
	}
Erick Hitter's avatar
Erick Hitter committed
580
581
582
}

Events_Store::instance();