runner.go 8.78 KB
Newer Older
Erick Hitter's avatar
Erick Hitter committed
1
2
3
package main

import (
4
	"encoding/json"
Erick Hitter's avatar
Erick Hitter committed
5
6
7
	"flag"
	"fmt"
	"log"
Erick Hitter's avatar
Erick Hitter committed
8
	"math/rand"
Erick Hitter's avatar
Erick Hitter committed
9
10
	"os"
	"os/exec"
11
	"os/signal"
Erick Hitter's avatar
Erick Hitter committed
12
	"path/filepath"
13
	"sync/atomic"
Erick Hitter's avatar
Erick Hitter committed
14
	"syscall"
Erick Hitter's avatar
Erick Hitter committed
15
16
17
	"time"
)

Erick Hitter's avatar
Erick Hitter committed
18
type siteInfo struct {
19
20
	Multisite int
	Siteurl   string
21
	Disabled  int
22
23
}

Erick Hitter's avatar
Erick Hitter committed
24
25
type site struct {
	URL string
26
27
}

Erick Hitter's avatar
Erick Hitter committed
28
29
type event struct {
	URL       string
30
31
32
	Timestamp int
	Action    string
	Instance  string
Erick Hitter's avatar
Erick Hitter committed
33
34
35
36
}

var (
	wpCliPath string
37
	wpNetwork int
Erick Hitter's avatar
Erick Hitter committed
38
39
	wpPath    string

40
41
	numGetWorkers int
	numRunWorkers int
Erick Hitter's avatar
Erick Hitter committed
42

43
44
	getEventsInterval int

45
46
	heartbeatInt int

47
48
49
	disabledLoopCount    uint64
	eventRunErrCount     uint64
	eventRunSuccessCount uint64
50

Erick Hitter's avatar
Erick Hitter committed
51
52
	logger  *log.Logger
	logDest string
53
	debug   bool
Erick Hitter's avatar
Erick Hitter committed
54
55
56
57
58
59
60
)

const getEventsBreak time.Duration = time.Second
const runEventsBreak time.Duration = time.Second * 10

func init() {
	flag.StringVar(&wpCliPath, "cli", "/usr/local/bin/wp", "Path to WP-CLI binary")
Erick Hitter's avatar
Erick Hitter committed
61
	flag.IntVar(&wpNetwork, "network", 0, "WordPress network ID, `0` to disable")
Erick Hitter's avatar
Erick Hitter committed
62
	flag.StringVar(&wpPath, "wp", "/var/www/html", "Path to WordPress installation")
63
64
	flag.IntVar(&numGetWorkers, "workers-get", 1, "Number of workers to retrieve events")
	flag.IntVar(&numRunWorkers, "workers-run", 5, "Number of workers to run events")
65
	flag.IntVar(&getEventsInterval, "get-events-interval", 60, "Seconds between event retrieval")
66
	flag.IntVar(&heartbeatInt, "heartbeat", 60, "Heartbeat interval in seconds")
Erick Hitter's avatar
Erick Hitter committed
67
	flag.StringVar(&logDest, "log", "os.Stdout", "Log path, omit to log to Stdout")
68
	flag.BoolVar(&debug, "debug", false, "Include additional log data for debugging")
Erick Hitter's avatar
Erick Hitter committed
69
70
71
72
73
	flag.Parse()

	setUpLogger()

	// TODO: Should check for wp-config.php instead?
74
75
	validatePath(&wpCliPath, "WP-CLI path")
	validatePath(&wpPath, "WordPress path")
Erick Hitter's avatar
Erick Hitter committed
76
77
78
}

func main() {
79
	logger.Printf("Starting with %d event-retreival worker(s) and %d event worker(s)", numGetWorkers, numRunWorkers)
80
	logger.Printf("Retrieving events every %d seconds", getEventsInterval)
81
	sig := make(chan os.Signal, 1)
Erick Hitter's avatar
Erick Hitter committed
82
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
Erick Hitter's avatar
Erick Hitter committed
83

Erick Hitter's avatar
Erick Hitter committed
84
85
	sites := make(chan site)
	events := make(chan event)
Erick Hitter's avatar
Erick Hitter committed
86

87
	go spawnEventRetrievers(sites, events)
Erick Hitter's avatar
Erick Hitter committed
88
	go spawnEventWorkers(events)
89
	go retrieveSitesPeriodically(sites)
90
	go heartbeat()
Erick Hitter's avatar
Erick Hitter committed
91

92
93
	caughtSig := <-sig
	logger.Printf("Stopping, got signal %s", caughtSig)
Erick Hitter's avatar
Erick Hitter committed
94
95
}

Erick Hitter's avatar
Erick Hitter committed
96
func spawnEventRetrievers(sites <-chan site, queue chan<- event) {
97
	for w := 1; w <= numGetWorkers; w++ {
98
99
100
101
		go queueSiteEvents(w, sites, queue)
	}
}

Erick Hitter's avatar
Erick Hitter committed
102
103
func spawnEventWorkers(queue <-chan event) {
	workerEvents := make(chan event)
Erick Hitter's avatar
Erick Hitter committed
104

105
	for w := 1; w <= numRunWorkers; w++ {
Erick Hitter's avatar
Erick Hitter committed
106
107
108
109
110
111
112
113
114
115
		go runEvents(w, workerEvents)
	}

	for event := range queue {
		workerEvents <- event
	}

	close(workerEvents)
}

Erick Hitter's avatar
Erick Hitter committed
116
func retrieveSitesPeriodically(sites chan<- site) {
117
118
119
	loopInterval := time.Duration(getEventsInterval) * time.Second

	for range time.Tick(loopInterval) {
120
121
122
123
124
125
126
127
128
129
130
		siteList, err := getSites()
		if err != nil {
			continue
		}

		for _, site := range siteList {
			sites <- site
		}
	}
}

131
func heartbeat() {
132
133
134
135
136
	if heartbeatInt == 0 {
		logger.Println("heartbeat disabled")
		return
	}

137
	interval := time.Duration(heartbeatInt) * time.Second
Erick Hitter's avatar
Erick Hitter committed
138

139
	for range time.Tick(interval) {
140
141
142
143
		successCount, errCount := atomic.LoadUint64(&eventRunSuccessCount), atomic.LoadUint64(&eventRunErrCount)
		atomic.SwapUint64(&eventRunSuccessCount, 0)
		atomic.SwapUint64(&eventRunErrCount, 0)
		logger.Printf("<heartbeat eventsSucceededSinceLast=%d eventsErroredSinceLast=%d>", successCount, errCount)
Erick Hitter's avatar
Erick Hitter committed
144
145
146
	}
}

Erick Hitter's avatar
Erick Hitter committed
147
func getSites() ([]site, error) {
148
	siteInfo, err := getInstanceInfo()
Erick Hitter's avatar
Erick Hitter committed
149
	if err != nil {
150
		siteInfo.Disabled = 1
Erick Hitter's avatar
Erick Hitter committed
151
152
	}

153
	if run := shouldGetSites(siteInfo.Disabled); false == run {
154
		return nil, err
155
156
	}

157
	if siteInfo.Multisite == 1 {
158
		sites, err := getMultisiteSites()
Erick Hitter's avatar
Erick Hitter committed
159
		if err != nil {
160
			sites = nil
Erick Hitter's avatar
Erick Hitter committed
161
162
163
164
		}

		return sites, err
	}
Erick Hitter's avatar
Erick Hitter committed
165
166
167
168
169
170

	// Mock for single site
	sites := make([]site, 0)
	sites = append(sites, site{URL: siteInfo.Siteurl})

	return sites, nil
Erick Hitter's avatar
Erick Hitter committed
171
172
}

Erick Hitter's avatar
Erick Hitter committed
173
func getInstanceInfo() (siteInfo, error) {
174
	raw, err := runWpCliCmd([]string{"cron-control", "orchestrate", "runner-only", "get-info", "--format=json"})
Erick Hitter's avatar
Reorg    
Erick Hitter committed
175
	if err != nil {
Erick Hitter's avatar
Erick Hitter committed
176
		return siteInfo{}, err
Erick Hitter's avatar
Reorg    
Erick Hitter committed
177
178
	}

Erick Hitter's avatar
Erick Hitter committed
179
	jsonRes := make([]siteInfo, 0)
Erick Hitter's avatar
Reorg    
Erick Hitter committed
180
	if err = json.Unmarshal([]byte(raw), &jsonRes); err != nil {
181
		if debug {
182
			logger.Println(fmt.Sprintf("%+v", err))
183
184
		}

Erick Hitter's avatar
Erick Hitter committed
185
		return siteInfo{}, err
Erick Hitter's avatar
Reorg    
Erick Hitter committed
186
187
188
189
190
	}

	return jsonRes[0], nil
}

Erick Hitter's avatar
Erick Hitter committed
191
func shouldGetSites(disabled int) bool {
192
193
	if disabled == 0 {
		atomic.SwapUint64(&disabledLoopCount, 0)
Erick Hitter's avatar
Erick Hitter committed
194
		return true
195
196
	}

Erick Hitter's avatar
Erick Hitter committed
197
198
	disabledCount, now := atomic.LoadUint64(&disabledLoopCount), time.Now()
	disabledSleep := time.Minute * 3 * time.Duration(disabledCount)
199
200
	disabledSleepSeconds := int64(disabledSleep) / 1000 / 1000 / 1000

Erick Hitter's avatar
Erick Hitter committed
201
	if disabled > 1 && (now.Unix()+disabledSleepSeconds) > int64(disabled) {
202
203
204
205
206
207
208
209
		atomic.SwapUint64(&disabledLoopCount, 0)
	} else if disabledSleep > time.Hour {
		atomic.SwapUint64(&disabledLoopCount, 0)
	} else {
		atomic.AddUint64(&disabledLoopCount, 1)
	}

	if disabledSleep > 0 {
210
		if debug {
Erick Hitter's avatar
Erick Hitter committed
211
			logger.Printf("Automatic execution disabled, sleeping for an additional %d minutes", disabledSleepSeconds/60)
212
213
		}

214
		time.Sleep(disabledSleep)
215
	} else if debug {
216
217
218
		logger.Println("Automatic execution disabled")
	}

Erick Hitter's avatar
Erick Hitter committed
219
	return false
220
221
}

Erick Hitter's avatar
Erick Hitter committed
222
func getMultisiteSites() ([]site, error) {
Erick Hitter's avatar
Reorg    
Erick Hitter committed
223
224
	raw, err := runWpCliCmd([]string{"site", "list", "--fields=url", "--archived=false", "--deleted=false", "--spam=false", "--format=json"})
	if err != nil {
225
		return nil, err
Erick Hitter's avatar
Reorg    
Erick Hitter committed
226
227
	}

Erick Hitter's avatar
Erick Hitter committed
228
	jsonRes := make([]site, 0)
Erick Hitter's avatar
Reorg    
Erick Hitter committed
229
	if err = json.Unmarshal([]byte(raw), &jsonRes); err != nil {
230
		if debug {
231
			logger.Println(fmt.Sprintf("%+v", err))
232
233
		}

234
		return nil, err
Erick Hitter's avatar
Reorg    
Erick Hitter committed
235
236
	}

237
238
239
240
241
242
	// Shuffle site order so that none are favored
	for i := range jsonRes {
		j := rand.Intn(i + 1)
		jsonRes[i], jsonRes[j] = jsonRes[j], jsonRes[i]
	}

Erick Hitter's avatar
Reorg    
Erick Hitter committed
243
244
245
	return jsonRes, nil
}

Erick Hitter's avatar
Erick Hitter committed
246
func queueSiteEvents(workerID int, sites <-chan site, queue chan<- event) {
Erick Hitter's avatar
Erick Hitter committed
247
	for site := range sites {
248
		if debug {
Erick Hitter's avatar
Erick Hitter committed
249
			logger.Printf("getEvents-%d processing %s", workerID, site.URL)
250
		}
Erick Hitter's avatar
Erick Hitter committed
251

Erick Hitter's avatar
Erick Hitter committed
252
		events, err := getSiteEvents(site.URL)
253
254
255
		if err != nil {
			time.Sleep(getEventsBreak)
			continue
Erick Hitter's avatar
Erick Hitter committed
256
257
		}

Erick Hitter's avatar
Erick Hitter committed
258
259
		for _, event := range events {
			event.URL = site.URL
260
			queue <- event
Erick Hitter's avatar
Erick Hitter committed
261
262
263
264
265
266
		}

		time.Sleep(getEventsBreak)
	}
}

Erick Hitter's avatar
Erick Hitter committed
267
func getSiteEvents(site string) ([]event, error) {
268
	raw, err := runWpCliCmd([]string{"cron-control", "orchestrate", "runner-only", "list-due-batch", fmt.Sprintf("--url=%s", site), "--format=json"})
Erick Hitter's avatar
Erick Hitter committed
269
	if err != nil {
270
		return nil, err
Erick Hitter's avatar
Erick Hitter committed
271
272
	}

Erick Hitter's avatar
Erick Hitter committed
273
	siteEvents := make([]event, 0)
Erick Hitter's avatar
Erick Hitter committed
274
	if err = json.Unmarshal([]byte(raw), &siteEvents); err != nil {
275
		if debug {
276
			logger.Println(fmt.Sprintf("%+v", err))
277
278
		}

279
		return nil, err
Erick Hitter's avatar
Erick Hitter committed
280
281
282
283
284
	}

	return siteEvents, nil
}

Erick Hitter's avatar
Erick Hitter committed
285
func runEvents(workerID int, events <-chan event) {
Erick Hitter's avatar
Erick Hitter committed
286
	for event := range events {
287
288
		if now := time.Now(); event.Timestamp > int(now.Unix()) {
			if debug {
Erick Hitter's avatar
Erick Hitter committed
289
				logger.Printf("runEvents-%d skipping premature job %d|%s|%s for %s", workerID, event.Timestamp, event.Action, event.Instance, event.URL)
290
291
292
293
294
			}

			continue
		}

Erick Hitter's avatar
Erick Hitter committed
295
		subcommand := []string{"cron-control", "orchestrate", "runner-only", "run", fmt.Sprintf("--timestamp=%d", event.Timestamp), fmt.Sprintf("--action=%s", event.Action), fmt.Sprintf("--instance=%s", event.Instance), fmt.Sprintf("--url=%s", event.URL)}
Erick Hitter's avatar
Erick Hitter committed
296

297
		_, err := runWpCliCmd(subcommand)
Erick Hitter's avatar
Erick Hitter committed
298

299
300
301
302
303
304
		if err == nil {
			if heartbeatInt > 0 {
				atomic.AddUint64(&eventRunSuccessCount, 1)
			}

			if debug {
Erick Hitter's avatar
Erick Hitter committed
305
				logger.Printf("runEvents-%d finished job %d|%s|%s for %s", workerID, event.Timestamp, event.Action, event.Instance, event.URL)
306
307
308
			}
		} else if heartbeatInt > 0 {
			atomic.AddUint64(&eventRunErrCount, 1)
309
		}
Erick Hitter's avatar
Erick Hitter committed
310
311
312
313
314
315

		time.Sleep(runEventsBreak)
	}
}

func runWpCliCmd(subcommand []string) (string, error) {
316
	// `--quiet`` included to prevent WP-CLI commands from generating invalid JSON
Erick Hitter's avatar
Erick Hitter committed
317
	subcommand = append(subcommand, "--allow-root", "--quiet", fmt.Sprintf("--path=%s", wpPath))
Erick Hitter's avatar
Erick Hitter committed
318
319
320
	if wpNetwork > 0 {
		subcommand = append(subcommand, fmt.Sprintf("--network=%d", wpNetwork))
	}
Erick Hitter's avatar
Erick Hitter committed
321
322
323
324
325
326

	wpCli := exec.Command(wpCliPath, subcommand...)
	wpOut, err := wpCli.CombinedOutput()
	wpOutStr := string(wpOut)

	if err != nil {
327
328
		if debug {
			logger.Printf("%s - %s", err, wpOutStr)
329
			logger.Println(fmt.Sprintf("%+v", subcommand))
330
331
		}

Erick Hitter's avatar
Erick Hitter committed
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
		return wpOutStr, err
	}

	return wpOutStr, nil
}

func setUpLogger() {
	logOpts := log.Ldate | log.Ltime | log.LUTC | log.Lshortfile

	if logDest == "os.Stdout" {
		logger = log.New(os.Stdout, "DEBUG: ", logOpts)
	} else {
		path, err := filepath.Abs(logDest)
		if err != nil {
			logger.Fatal(err)
		}

Erick Hitter's avatar
Erick Hitter committed
349
		logFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
Erick Hitter's avatar
Erick Hitter committed
350
351
352
353
354
355
356
357
		if err != nil {
			log.Fatal(err)
		}

		logger = log.New(logFile, "", logOpts)
	}
}

358
func validatePath(path *string, label string) {
Erick Hitter's avatar
Erick Hitter committed
359
360
361
362
363
	if len(*path) > 1 {
		var err error
		*path, err = filepath.Abs(*path)

		if err != nil {
364
			fmt.Printf("Error for %s: %s\n", label, err.Error())
Erick Hitter's avatar
Erick Hitter committed
365
366
367
368
			os.Exit(3)
		}

		if _, err = os.Stat(*path); os.IsNotExist(err) {
369
			fmt.Printf("Error for %s: '%s' does not exist\n", label, *path)
370
			usage()
Erick Hitter's avatar
Erick Hitter committed
371
372
		}
	} else {
373
		fmt.Printf("Empty path provided for %s\n", label)
Erick Hitter's avatar
Erick Hitter committed
374
375
376
377
378
379
380
381
		usage()
	}
}

func usage() {
	flag.Usage()
	os.Exit(3)
}