Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
WP Plugins
Cron-Control
Commits
2ad567ed
Commit
2ad567ed
authored
Jul 14, 2017
by
Erick Hitter
Browse files
golint fixes
parent
77f5c0ef
Changes
1
Hide whitespace changes
Inline
Side-by-side
runner/runner.go
View file @
2ad567ed
...
...
@@ -14,18 +14,18 @@ import (
"time"
)
type
S
iteInfo
struct
{
type
s
iteInfo
struct
{
Multisite
int
Siteurl
string
Disabled
int
}
type
S
ite
struct
{
U
rl
string
type
s
ite
struct
{
U
RL
string
}
type
E
vent
struct
{
U
rl
string
type
e
vent
struct
{
U
RL
string
Timestamp
int
Action
string
Instance
string
...
...
@@ -50,7 +50,7 @@ var (
debug
bool
)
const
getEventsLoop
time
.
Duration
=
time
.
Minute
const
getEventsLoop
time
.
Duration
=
time
.
Minute
const
getEventsBreak
time
.
Duration
=
time
.
Second
const
runEventsBreak
time
.
Duration
=
time
.
Second
*
10
...
...
@@ -78,8 +78,8 @@ func main() {
signal
.
Notify
(
sig
,
os
.
Interrupt
)
signal
.
Notify
(
sig
,
os
.
Kill
)
sites
:=
make
(
chan
S
ite
)
events
:=
make
(
chan
E
vent
)
sites
:=
make
(
chan
s
ite
)
events
:=
make
(
chan
e
vent
)
go
spawnEventRetrievers
(
sites
,
events
)
go
spawnEventWorkers
(
events
)
...
...
@@ -92,14 +92,14 @@ func main() {
logger
.
Printf
(
"Stopping, got signal %s"
,
caughtSig
)
}
func
spawnEventRetrievers
(
sites
<-
chan
S
ite
,
queue
chan
<-
E
vent
)
{
func
spawnEventRetrievers
(
sites
<-
chan
s
ite
,
queue
chan
<-
e
vent
)
{
for
w
:=
1
;
w
<=
workersGetEvents
;
w
++
{
go
queueSiteEvents
(
w
,
sites
,
queue
)
}
}
func
spawnEventWorkers
(
queue
<-
chan
E
vent
)
{
workerEvents
:=
make
(
chan
E
vent
)
func
spawnEventWorkers
(
queue
<-
chan
e
vent
)
{
workerEvents
:=
make
(
chan
e
vent
)
for
w
:=
1
;
w
<=
workersRunEvents
;
w
++
{
go
runEvents
(
w
,
workerEvents
)
...
...
@@ -112,7 +112,7 @@ func spawnEventWorkers(queue <-chan Event) {
close
(
workerEvents
)
}
func
retrieveSitesPeriodically
(
sites
chan
<-
S
ite
)
{
func
retrieveSitesPeriodically
(
sites
chan
<-
s
ite
)
{
for
true
{
siteList
,
err
:=
getSites
()
if
err
!=
nil
{
...
...
@@ -147,58 +147,58 @@ func heartbeat() {
}
}
func
getSites
()
([]
S
ite
,
error
)
{
func
getSites
()
([]
s
ite
,
error
)
{
siteInfo
,
err
:=
getInstanceInfo
()
if
err
!=
nil
{
return
make
([]
S
ite
,
0
),
err
return
make
([]
s
ite
,
0
),
err
}
if
run
:=
shouldGetSites
(
siteInfo
.
Disabled
);
false
==
run
{
return
make
([]
S
ite
,
0
),
err
return
make
([]
s
ite
,
0
),
err
}
if
siteInfo
.
Multisite
==
1
{
sites
,
err
:=
getMultisiteSites
()
if
err
!=
nil
{
sites
=
make
([]
S
ite
,
0
)
sites
=
make
([]
s
ite
,
0
)
}
return
sites
,
err
}
else
{
// Mock for single site
sites
:=
make
([]
Site
,
0
)
sites
=
append
(
sites
,
Site
{
Url
:
siteInfo
.
Siteurl
})
return
sites
,
nil
}
// Mock for single site
sites
:=
make
([]
site
,
0
)
sites
=
append
(
sites
,
site
{
URL
:
siteInfo
.
Siteurl
})
return
sites
,
nil
}
func
getInstanceInfo
()
(
S
iteInfo
,
error
)
{
func
getInstanceInfo
()
(
s
iteInfo
,
error
)
{
raw
,
err
:=
runWpCliCmd
([]
string
{
"cron-control"
,
"orchestrate"
,
"runner-only"
,
"get-info"
,
"--format=json"
})
if
err
!=
nil
{
return
S
iteInfo
{},
err
return
s
iteInfo
{},
err
}
jsonRes
:=
make
([]
S
iteInfo
,
0
)
jsonRes
:=
make
([]
s
iteInfo
,
0
)
if
err
=
json
.
Unmarshal
([]
byte
(
raw
),
&
jsonRes
);
err
!=
nil
{
logger
.
Println
(
fmt
.
Sprintf
(
"%+v
\n
"
,
err
))
return
S
iteInfo
{},
err
return
s
iteInfo
{},
err
}
return
jsonRes
[
0
],
nil
}
func
shouldGetSites
(
disabled
int
)
(
bool
)
{
func
shouldGetSites
(
disabled
int
)
bool
{
if
disabled
==
0
{
atomic
.
SwapUint64
(
&
disabledLoopCount
,
0
)
return
true
;
return
true
}
disabledCount
,
now
:=
atomic
.
LoadUint64
(
&
disabledLoopCount
),
time
.
Now
()
disabledSleep
:=
time
.
Minute
*
3
*
time
.
Duration
(
disabledCount
)
disabledCount
,
now
:=
atomic
.
LoadUint64
(
&
disabledLoopCount
),
time
.
Now
()
disabledSleep
:=
time
.
Minute
*
3
*
time
.
Duration
(
disabledCount
)
disabledSleepSeconds
:=
int64
(
disabledSleep
)
/
1000
/
1000
/
1000
if
disabled
>
1
&&
(
now
.
Unix
()
+
disabledSleepSeconds
)
>
int64
(
disabled
)
{
if
disabled
>
1
&&
(
now
.
Unix
()
+
disabledSleepSeconds
)
>
int64
(
disabled
)
{
atomic
.
SwapUint64
(
&
disabledLoopCount
,
0
)
}
else
if
disabledSleep
>
time
.
Hour
{
atomic
.
SwapUint64
(
&
disabledLoopCount
,
0
)
...
...
@@ -208,7 +208,7 @@ func shouldGetSites(disabled int) (bool) {
if
disabledSleep
>
0
{
if
debug
{
logger
.
Printf
(
"Automatic execution disabled, sleeping for an additional %d minutes"
,
disabledSleepSeconds
/
60
)
logger
.
Printf
(
"Automatic execution disabled, sleeping for an additional %d minutes"
,
disabledSleepSeconds
/
60
)
}
time
.
Sleep
(
disabledSleep
)
...
...
@@ -216,19 +216,19 @@ func shouldGetSites(disabled int) (bool) {
logger
.
Println
(
"Automatic execution disabled"
)
}
return
false
;
return
false
}
func
getMultisiteSites
()
([]
S
ite
,
error
)
{
func
getMultisiteSites
()
([]
s
ite
,
error
)
{
raw
,
err
:=
runWpCliCmd
([]
string
{
"site"
,
"list"
,
"--fields=url"
,
"--archived=false"
,
"--deleted=false"
,
"--spam=false"
,
"--format=json"
})
if
err
!=
nil
{
return
make
([]
S
ite
,
0
),
err
return
make
([]
s
ite
,
0
),
err
}
jsonRes
:=
make
([]
S
ite
,
0
)
jsonRes
:=
make
([]
s
ite
,
0
)
if
err
=
json
.
Unmarshal
([]
byte
(
raw
),
&
jsonRes
);
err
!=
nil
{
logger
.
Println
(
fmt
.
Sprintf
(
"%+v
\n
"
,
err
))
return
make
([]
S
ite
,
0
),
err
return
make
([]
s
ite
,
0
),
err
}
// Shuffle site order so that none are favored
...
...
@@ -240,20 +240,20 @@ func getMultisiteSites() ([]Site, error) {
return
jsonRes
,
nil
}
func
queueSiteEvents
(
workerI
d
int
,
sites
<-
chan
S
ite
,
queue
chan
<-
E
vent
)
{
func
queueSiteEvents
(
workerI
D
int
,
sites
<-
chan
s
ite
,
queue
chan
<-
e
vent
)
{
for
site
:=
range
sites
{
if
debug
{
logger
.
Printf
(
"getEvents-%d processing %s"
,
workerI
d
,
site
.
U
rl
)
logger
.
Printf
(
"getEvents-%d processing %s"
,
workerI
D
,
site
.
U
RL
)
}
siteE
vents
,
err
:=
getSiteEvents
(
site
.
U
rl
)
e
vents
,
err
:=
getSiteEvents
(
site
.
U
RL
)
if
err
!=
nil
{
time
.
Sleep
(
getEventsBreak
)
continue
}
for
_
,
event
:=
range
siteE
vents
{
event
.
U
rl
=
site
.
U
rl
for
_
,
event
:=
range
e
vents
{
event
.
U
RL
=
site
.
U
RL
queue
<-
event
}
...
...
@@ -261,32 +261,32 @@ func queueSiteEvents(workerId int, sites <-chan Site, queue chan<- Event) {
}
}
func
getSiteEvents
(
site
string
)
([]
E
vent
,
error
)
{
func
getSiteEvents
(
site
string
)
([]
e
vent
,
error
)
{
raw
,
err
:=
runWpCliCmd
([]
string
{
"cron-control"
,
"orchestrate"
,
"runner-only"
,
"list-due-batch"
,
fmt
.
Sprintf
(
"--url=%s"
,
site
),
"--format=json"
})
if
err
!=
nil
{
return
make
([]
E
vent
,
0
),
err
return
make
([]
e
vent
,
0
),
err
}
siteEvents
:=
make
([]
E
vent
,
0
)
siteEvents
:=
make
([]
e
vent
,
0
)
if
err
=
json
.
Unmarshal
([]
byte
(
raw
),
&
siteEvents
);
err
!=
nil
{
logger
.
Println
(
fmt
.
Sprintf
(
"%+v
\n
"
,
err
))
return
make
([]
E
vent
,
0
),
err
return
make
([]
e
vent
,
0
),
err
}
return
siteEvents
,
nil
}
func
runEvents
(
workerI
d
int
,
events
<-
chan
E
vent
)
{
func
runEvents
(
workerI
D
int
,
events
<-
chan
e
vent
)
{
for
event
:=
range
events
{
if
now
:=
time
.
Now
();
event
.
Timestamp
>
int
(
now
.
Unix
())
{
if
debug
{
logger
.
Printf
(
"runEvents-%d skipping premature job %d|%s|%s for %s"
,
workerI
d
,
event
.
Timestamp
,
event
.
Action
,
event
.
Instance
,
event
.
U
rl
)
logger
.
Printf
(
"runEvents-%d skipping premature job %d|%s|%s for %s"
,
workerI
D
,
event
.
Timestamp
,
event
.
Action
,
event
.
Instance
,
event
.
U
RL
)
}
continue
}
subcommand
:=
[]
string
{
"cron-control"
,
"orchestrate"
,
"runner-only"
,
"run"
,
fmt
.
Sprintf
(
"--timestamp=%d"
,
event
.
Timestamp
),
fmt
.
Sprintf
(
"--action=%s"
,
event
.
Action
),
fmt
.
Sprintf
(
"--instance=%s"
,
event
.
Instance
),
fmt
.
Sprintf
(
"--url=%s"
,
event
.
U
rl
)}
subcommand
:=
[]
string
{
"cron-control"
,
"orchestrate"
,
"runner-only"
,
"run"
,
fmt
.
Sprintf
(
"--timestamp=%d"
,
event
.
Timestamp
),
fmt
.
Sprintf
(
"--action=%s"
,
event
.
Action
),
fmt
.
Sprintf
(
"--instance=%s"
,
event
.
Instance
),
fmt
.
Sprintf
(
"--url=%s"
,
event
.
U
RL
)}
_
,
err
:=
runWpCliCmd
(
subcommand
)
...
...
@@ -296,7 +296,7 @@ func runEvents(workerId int, events <-chan Event) {
}
if
debug
{
logger
.
Printf
(
"runEvents-%d finished job %d|%s|%s for %s"
,
workerI
d
,
event
.
Timestamp
,
event
.
Action
,
event
.
Instance
,
event
.
U
rl
)
logger
.
Printf
(
"runEvents-%d finished job %d|%s|%s for %s"
,
workerI
D
,
event
.
Timestamp
,
event
.
Action
,
event
.
Instance
,
event
.
U
RL
)
}
}
else
if
heartbeatInt
>
0
{
atomic
.
AddUint64
(
&
eventRunErrCount
,
1
)
...
...
@@ -335,7 +335,7 @@ func setUpLogger() {
logger
.
Fatal
(
err
)
}
logFile
,
err
:=
os
.
OpenFile
(
path
,
os
.
O_WRONLY
|
os
.
O_CREATE
|
os
.
O_APPEND
,
0644
)
logFile
,
err
:=
os
.
OpenFile
(
path
,
os
.
O_WRONLY
|
os
.
O_CREATE
|
os
.
O_APPEND
,
0644
)
if
err
!=
nil
{
log
.
Fatal
(
err
)
}
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment