runner.go 8.42 KB
Newer Older
Erick Hitter's avatar
Erick Hitter committed
1
2
3
package main

import (
4
	"encoding/json"
Erick Hitter's avatar
Erick Hitter committed
5
6
7
	"flag"
	"fmt"
	"log"
Erick Hitter's avatar
Erick Hitter committed
8
	"math/rand"
Erick Hitter's avatar
Erick Hitter committed
9
10
	"os"
	"os/exec"
11
	"os/signal"
Erick Hitter's avatar
Erick Hitter committed
12
	"path/filepath"
13
	"sync/atomic"
Erick Hitter's avatar
Erick Hitter committed
14
	"syscall"
Erick Hitter's avatar
Erick Hitter committed
15
16
17
	"time"
)

Erick Hitter's avatar
Erick Hitter committed
18
type siteInfo struct {
19
20
	Multisite int
	Siteurl   string
21
	Disabled  int
22
23
}

Erick Hitter's avatar
Erick Hitter committed
24
25
type site struct {
	URL string
26
27
}

Erick Hitter's avatar
Erick Hitter committed
28
29
type event struct {
	URL       string
30
31
32
	Timestamp int
	Action    string
	Instance  string
Erick Hitter's avatar
Erick Hitter committed
33
34
35
36
}

var (
	wpCliPath string
37
	wpNetwork int
Erick Hitter's avatar
Erick Hitter committed
38
39
40
41
42
	wpPath    string

	workersGetEvents int
	workersRunEvents int

43
44
	heartbeatInt int

45
46
47
	disabledLoopCount    uint64
	eventRunErrCount     uint64
	eventRunSuccessCount uint64
48

Erick Hitter's avatar
Erick Hitter committed
49
50
	logger  *log.Logger
	logDest string
51
	debug   bool
Erick Hitter's avatar
Erick Hitter committed
52
53
)

Erick Hitter's avatar
Erick Hitter committed
54
const getEventsLoop time.Duration = time.Minute
Erick Hitter's avatar
Erick Hitter committed
55
56
57
58
59
const getEventsBreak time.Duration = time.Second
const runEventsBreak time.Duration = time.Second * 10

func init() {
	flag.StringVar(&wpCliPath, "cli", "/usr/local/bin/wp", "Path to WP-CLI binary")
Erick Hitter's avatar
Erick Hitter committed
60
	flag.IntVar(&wpNetwork, "network", 0, "WordPress network ID, `0` to disable")
Erick Hitter's avatar
Erick Hitter committed
61
	flag.StringVar(&wpPath, "wp", "/var/www/html", "Path to WordPress installation")
62
	flag.IntVar(&workersGetEvents, "workers-get", 1, "Number of workers to retrieve events")
Erick Hitter's avatar
Erick Hitter committed
63
	flag.IntVar(&workersRunEvents, "workers-run", 5, "Number of workers to run events")
64
	flag.IntVar(&heartbeatInt, "heartbeat", 60, "Heartbeat interval in seconds")
Erick Hitter's avatar
Erick Hitter committed
65
	flag.StringVar(&logDest, "log", "os.Stdout", "Log path, omit to log to Stdout")
66
	flag.BoolVar(&debug, "debug", false, "Include additional log data for debugging")
Erick Hitter's avatar
Erick Hitter committed
67
68
69
70
71
72
73
74
75
76
	flag.Parse()

	setUpLogger()

	// TODO: Should check for wp-config.php instead?
	validatePath(&wpCliPath)
	validatePath(&wpPath)
}

func main() {
77
78
	logger.Printf("Starting with %d event-retreival worker(s) and %d event worker(s)", workersGetEvents, workersRunEvents)
	sig := make(chan os.Signal, 1)
Erick Hitter's avatar
Erick Hitter committed
79
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
Erick Hitter's avatar
Erick Hitter committed
80

Erick Hitter's avatar
Erick Hitter committed
81
82
	sites := make(chan site)
	events := make(chan event)
Erick Hitter's avatar
Erick Hitter committed
83

84
	go spawnEventRetrievers(sites, events)
Erick Hitter's avatar
Erick Hitter committed
85
	go spawnEventWorkers(events)
86
	go retrieveSitesPeriodically(sites)
87
	go heartbeat()
Erick Hitter's avatar
Erick Hitter committed
88

89
90
	caughtSig := <-sig
	logger.Printf("Stopping, got signal %s", caughtSig)
Erick Hitter's avatar
Erick Hitter committed
91
92
}

Erick Hitter's avatar
Erick Hitter committed
93
func spawnEventRetrievers(sites <-chan site, queue chan<- event) {
94
95
96
97
98
	for w := 1; w <= workersGetEvents; w++ {
		go queueSiteEvents(w, sites, queue)
	}
}

Erick Hitter's avatar
Erick Hitter committed
99
100
func spawnEventWorkers(queue <-chan event) {
	workerEvents := make(chan event)
Erick Hitter's avatar
Erick Hitter committed
101
102
103
104
105
106
107
108
109
110
111
112

	for w := 1; w <= workersRunEvents; w++ {
		go runEvents(w, workerEvents)
	}

	for event := range queue {
		workerEvents <- event
	}

	close(workerEvents)
}

Erick Hitter's avatar
Erick Hitter committed
113
func retrieveSitesPeriodically(sites chan<- site) {
114
	for range time.Tick(getEventsLoop) {
115
116
117
118
119
120
121
122
123
124
125
		siteList, err := getSites()
		if err != nil {
			continue
		}

		for _, site := range siteList {
			sites <- site
		}
	}
}

126
func heartbeat() {
127
128
129
130
131
	if heartbeatInt == 0 {
		logger.Println("heartbeat disabled")
		return
	}

132
	interval := time.Duration(heartbeatInt) * time.Second
Erick Hitter's avatar
Erick Hitter committed
133

134
	for range time.Tick(interval) {
135
136
137
138
		successCount, errCount := atomic.LoadUint64(&eventRunSuccessCount), atomic.LoadUint64(&eventRunErrCount)
		atomic.SwapUint64(&eventRunSuccessCount, 0)
		atomic.SwapUint64(&eventRunErrCount, 0)
		logger.Printf("<heartbeat eventsSucceededSinceLast=%d eventsErroredSinceLast=%d>", successCount, errCount)
Erick Hitter's avatar
Erick Hitter committed
139
140
141
	}
}

Erick Hitter's avatar
Erick Hitter committed
142
func getSites() ([]site, error) {
143
	siteInfo, err := getInstanceInfo()
Erick Hitter's avatar
Erick Hitter committed
144
	if err != nil {
Erick Hitter's avatar
Erick Hitter committed
145
		return make([]site, 0), err
Erick Hitter's avatar
Erick Hitter committed
146
147
	}

148
	if run := shouldGetSites(siteInfo.Disabled); false == run {
Erick Hitter's avatar
Erick Hitter committed
149
		return make([]site, 0), err
150
151
	}

152
	if siteInfo.Multisite == 1 {
153
		sites, err := getMultisiteSites()
Erick Hitter's avatar
Erick Hitter committed
154
		if err != nil {
Erick Hitter's avatar
Erick Hitter committed
155
			sites = make([]site, 0)
Erick Hitter's avatar
Erick Hitter committed
156
157
158
159
		}

		return sites, err
	}
Erick Hitter's avatar
Erick Hitter committed
160
161
162
163
164
165

	// Mock for single site
	sites := make([]site, 0)
	sites = append(sites, site{URL: siteInfo.Siteurl})

	return sites, nil
Erick Hitter's avatar
Erick Hitter committed
166
167
}

Erick Hitter's avatar
Erick Hitter committed
168
func getInstanceInfo() (siteInfo, error) {
169
	raw, err := runWpCliCmd([]string{"cron-control", "orchestrate", "runner-only", "get-info", "--format=json"})
Erick Hitter's avatar
Reorg    
Erick Hitter committed
170
	if err != nil {
Erick Hitter's avatar
Erick Hitter committed
171
		return siteInfo{}, err
Erick Hitter's avatar
Reorg    
Erick Hitter committed
172
173
	}

Erick Hitter's avatar
Erick Hitter committed
174
	jsonRes := make([]siteInfo, 0)
Erick Hitter's avatar
Reorg    
Erick Hitter committed
175
	if err = json.Unmarshal([]byte(raw), &jsonRes); err != nil {
176
		if debug {
177
			logger.Println(fmt.Sprintf("%+v", err))
178
179
		}

Erick Hitter's avatar
Erick Hitter committed
180
		return siteInfo{}, err
Erick Hitter's avatar
Reorg    
Erick Hitter committed
181
182
183
184
185
	}

	return jsonRes[0], nil
}

Erick Hitter's avatar
Erick Hitter committed
186
func shouldGetSites(disabled int) bool {
187
188
	if disabled == 0 {
		atomic.SwapUint64(&disabledLoopCount, 0)
Erick Hitter's avatar
Erick Hitter committed
189
		return true
190
191
	}

Erick Hitter's avatar
Erick Hitter committed
192
193
	disabledCount, now := atomic.LoadUint64(&disabledLoopCount), time.Now()
	disabledSleep := time.Minute * 3 * time.Duration(disabledCount)
194
195
	disabledSleepSeconds := int64(disabledSleep) / 1000 / 1000 / 1000

Erick Hitter's avatar
Erick Hitter committed
196
	if disabled > 1 && (now.Unix()+disabledSleepSeconds) > int64(disabled) {
197
198
199
200
201
202
203
204
		atomic.SwapUint64(&disabledLoopCount, 0)
	} else if disabledSleep > time.Hour {
		atomic.SwapUint64(&disabledLoopCount, 0)
	} else {
		atomic.AddUint64(&disabledLoopCount, 1)
	}

	if disabledSleep > 0 {
205
		if debug {
Erick Hitter's avatar
Erick Hitter committed
206
			logger.Printf("Automatic execution disabled, sleeping for an additional %d minutes", disabledSleepSeconds/60)
207
208
		}

209
		time.Sleep(disabledSleep)
210
	} else if debug {
211
212
213
		logger.Println("Automatic execution disabled")
	}

Erick Hitter's avatar
Erick Hitter committed
214
	return false
215
216
}

Erick Hitter's avatar
Erick Hitter committed
217
func getMultisiteSites() ([]site, error) {
Erick Hitter's avatar
Reorg    
Erick Hitter committed
218
219
	raw, err := runWpCliCmd([]string{"site", "list", "--fields=url", "--archived=false", "--deleted=false", "--spam=false", "--format=json"})
	if err != nil {
Erick Hitter's avatar
Erick Hitter committed
220
		return make([]site, 0), err
Erick Hitter's avatar
Reorg    
Erick Hitter committed
221
222
	}

Erick Hitter's avatar
Erick Hitter committed
223
	jsonRes := make([]site, 0)
Erick Hitter's avatar
Reorg    
Erick Hitter committed
224
	if err = json.Unmarshal([]byte(raw), &jsonRes); err != nil {
225
		if debug {
226
			logger.Println(fmt.Sprintf("%+v", err))
227
228
		}

Erick Hitter's avatar
Erick Hitter committed
229
		return make([]site, 0), err
Erick Hitter's avatar
Reorg    
Erick Hitter committed
230
231
	}

232
233
234
235
236
237
	// Shuffle site order so that none are favored
	for i := range jsonRes {
		j := rand.Intn(i + 1)
		jsonRes[i], jsonRes[j] = jsonRes[j], jsonRes[i]
	}

Erick Hitter's avatar
Reorg    
Erick Hitter committed
238
239
240
	return jsonRes, nil
}

Erick Hitter's avatar
Erick Hitter committed
241
func queueSiteEvents(workerID int, sites <-chan site, queue chan<- event) {
Erick Hitter's avatar
Erick Hitter committed
242
	for site := range sites {
243
		if debug {
Erick Hitter's avatar
Erick Hitter committed
244
			logger.Printf("getEvents-%d processing %s", workerID, site.URL)
245
		}
Erick Hitter's avatar
Erick Hitter committed
246

Erick Hitter's avatar
Erick Hitter committed
247
		events, err := getSiteEvents(site.URL)
248
249
250
		if err != nil {
			time.Sleep(getEventsBreak)
			continue
Erick Hitter's avatar
Erick Hitter committed
251
252
		}

Erick Hitter's avatar
Erick Hitter committed
253
254
		for _, event := range events {
			event.URL = site.URL
255
			queue <- event
Erick Hitter's avatar
Erick Hitter committed
256
257
258
259
260
261
		}

		time.Sleep(getEventsBreak)
	}
}

Erick Hitter's avatar
Erick Hitter committed
262
func getSiteEvents(site string) ([]event, error) {
263
	raw, err := runWpCliCmd([]string{"cron-control", "orchestrate", "runner-only", "list-due-batch", fmt.Sprintf("--url=%s", site), "--format=json"})
Erick Hitter's avatar
Erick Hitter committed
264
	if err != nil {
Erick Hitter's avatar
Erick Hitter committed
265
		return make([]event, 0), err
Erick Hitter's avatar
Erick Hitter committed
266
267
	}

Erick Hitter's avatar
Erick Hitter committed
268
	siteEvents := make([]event, 0)
Erick Hitter's avatar
Erick Hitter committed
269
	if err = json.Unmarshal([]byte(raw), &siteEvents); err != nil {
270
		if debug {
271
			logger.Println(fmt.Sprintf("%+v", err))
272
273
		}

Erick Hitter's avatar
Erick Hitter committed
274
		return make([]event, 0), err
Erick Hitter's avatar
Erick Hitter committed
275
276
277
278
279
	}

	return siteEvents, nil
}

Erick Hitter's avatar
Erick Hitter committed
280
func runEvents(workerID int, events <-chan event) {
Erick Hitter's avatar
Erick Hitter committed
281
	for event := range events {
282
283
		if now := time.Now(); event.Timestamp > int(now.Unix()) {
			if debug {
Erick Hitter's avatar
Erick Hitter committed
284
				logger.Printf("runEvents-%d skipping premature job %d|%s|%s for %s", workerID, event.Timestamp, event.Action, event.Instance, event.URL)
285
286
287
288
289
			}

			continue
		}

Erick Hitter's avatar
Erick Hitter committed
290
		subcommand := []string{"cron-control", "orchestrate", "runner-only", "run", fmt.Sprintf("--timestamp=%d", event.Timestamp), fmt.Sprintf("--action=%s", event.Action), fmt.Sprintf("--instance=%s", event.Instance), fmt.Sprintf("--url=%s", event.URL)}
Erick Hitter's avatar
Erick Hitter committed
291

292
		_, err := runWpCliCmd(subcommand)
Erick Hitter's avatar
Erick Hitter committed
293

294
295
296
297
298
299
		if err == nil {
			if heartbeatInt > 0 {
				atomic.AddUint64(&eventRunSuccessCount, 1)
			}

			if debug {
Erick Hitter's avatar
Erick Hitter committed
300
				logger.Printf("runEvents-%d finished job %d|%s|%s for %s", workerID, event.Timestamp, event.Action, event.Instance, event.URL)
301
302
303
			}
		} else if heartbeatInt > 0 {
			atomic.AddUint64(&eventRunErrCount, 1)
304
		}
Erick Hitter's avatar
Erick Hitter committed
305
306
307
308
309
310
311

		time.Sleep(runEventsBreak)
	}
}

func runWpCliCmd(subcommand []string) (string, error) {
	subcommand = append(subcommand, "--allow-root", "--quiet", fmt.Sprintf("--path=%s", wpPath))
Erick Hitter's avatar
Erick Hitter committed
312
313
314
	if wpNetwork > 0 {
		subcommand = append(subcommand, fmt.Sprintf("--network=%d", wpNetwork))
	}
Erick Hitter's avatar
Erick Hitter committed
315
316
317
318
319
320

	wpCli := exec.Command(wpCliPath, subcommand...)
	wpOut, err := wpCli.CombinedOutput()
	wpOutStr := string(wpOut)

	if err != nil {
321
322
		if debug {
			logger.Printf("%s - %s", err, wpOutStr)
323
			logger.Println(fmt.Sprintf("%+v", subcommand))
324
325
		}

Erick Hitter's avatar
Erick Hitter committed
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
		return wpOutStr, err
	}

	return wpOutStr, nil
}

func setUpLogger() {
	logOpts := log.Ldate | log.Ltime | log.LUTC | log.Lshortfile

	if logDest == "os.Stdout" {
		logger = log.New(os.Stdout, "DEBUG: ", logOpts)
	} else {
		path, err := filepath.Abs(logDest)
		if err != nil {
			logger.Fatal(err)
		}

Erick Hitter's avatar
Erick Hitter committed
343
		logFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
Erick Hitter's avatar
Erick Hitter committed
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
		if err != nil {
			log.Fatal(err)
		}

		logger = log.New(logFile, "", logOpts)
	}
}

func validatePath(path *string) {
	if len(*path) > 1 {
		var err error
		*path, err = filepath.Abs(*path)

		if err != nil {
			fmt.Printf("Error: %s", err.Error())
			os.Exit(3)
		}

		if _, err = os.Stat(*path); os.IsNotExist(err) {
			usage()
		}
	} else {
		usage()
	}
}

func usage() {
	flag.Usage()
	os.Exit(3)
}