package gocron
import (
"context"
"sync"
"time"
"go.uber.org/atomic"
)
const (
RescheduleMode limitMode = iota
WaitMode
)
type executor struct {
jobFunctions chan jobFunction
ctx context .Context
cancel context .CancelFunc
wg *sync .WaitGroup
jobsWg *sync .WaitGroup
singletonWgs *sync .Map
skipExecution *atomic .Bool
limitMode limitMode
limitModeMaxRunningJobs int
limitModeFuncsRunning *atomic .Int64
limitModeFuncWg *sync .WaitGroup
limitModeQueue chan jobFunction
limitModeQueueMu *sync .Mutex
limitModeRunningJobs *atomic .Int64
stopped *atomic .Bool
distributedLocker Locker
distributedElector Elector
}
func newExecutor() executor {
e := executor {
jobFunctions : make (chan jobFunction , 1 ),
singletonWgs : &sync .Map {},
limitModeFuncsRunning : atomic .NewInt64 (0 ),
limitModeFuncWg : &sync .WaitGroup {},
limitModeRunningJobs : atomic .NewInt64 (0 ),
limitModeQueueMu : &sync .Mutex {},
}
return e
}
func runJob(f jobFunction ) {
panicHandlerMutex .RLock ()
defer panicHandlerMutex .RUnlock ()
if panicHandler != nil {
defer func () {
if r := recover (); r != nil {
panicHandler (f .funcName , r )
}
}()
}
f .runStartCount .Add (1 )
f .isRunning .Store (true )
callJobFunc (f .eventListeners .onBeforeJobExecution )
_ = callJobFuncWithParams (f .eventListeners .beforeJobRuns , []interface {}{f .getName ()})
err := callJobFuncWithParams (f .function , f .parameters )
if err != nil {
_ = callJobFuncWithParams (f .eventListeners .onError , []interface {}{f .getName (), err })
} else {
_ = callJobFuncWithParams (f .eventListeners .noError , []interface {}{f .getName ()})
}
_ = callJobFuncWithParams (f .eventListeners .afterJobRuns , []interface {}{f .getName ()})
callJobFunc (f .eventListeners .onAfterJobExecution )
f .isRunning .Store (false )
f .runFinishCount .Add (1 )
}
func (jf *jobFunction ) singletonRunner () {
jf .singletonRunnerOn .Store (true )
jf .singletonWgMu .Lock ()
jf .singletonWg .Add (1 )
jf .singletonWgMu .Unlock ()
for {
select {
case <- jf .ctx .Done ():
jf .singletonWg .Done ()
jf .singletonRunnerOn .Store (false )
jf .singletonQueueMu .Lock ()
jf .singletonQueue = make (chan struct {}, 1000 )
jf .singletonQueueMu .Unlock ()
jf .stopped .Store (false )
return
case <- jf .singletonQueue :
if !jf .stopped .Load () {
runJob (*jf )
}
}
}
}
func (e *executor ) limitModeRunner () {
for {
select {
case <- e .ctx .Done ():
e .limitModeFuncsRunning .Inc ()
e .limitModeFuncWg .Done ()
return
case jf := <- e .limitModeQueue :
if !e .stopped .Load () {
select {
case <- jf .ctx .Done ():
default :
e .runJob (jf )
}
}
}
}
}
func (e *executor ) start () {
e .wg = &sync .WaitGroup {}
e .wg .Add (1 )
stopCtx , cancel := context .WithCancel (context .Background ())
e .ctx = stopCtx
e .cancel = cancel
e .jobsWg = &sync .WaitGroup {}
e .stopped = atomic .NewBool (false )
e .skipExecution = atomic .NewBool (false )
e .limitModeQueueMu .Lock ()
e .limitModeQueue = make (chan jobFunction , 1000 )
e .limitModeQueueMu .Unlock ()
go e .run ()
}
func (e *executor ) runJob (f jobFunction ) {
defer func () {
if e .limitMode == RescheduleMode && e .limitModeMaxRunningJobs > 0 {
e .limitModeRunningJobs .Add (-1 )
}
}()
switch f .runConfig .mode {
case defaultMode :
lockKey := f .jobName
if lockKey == "" {
lockKey = f .funcName
}
if e .distributedElector != nil {
err := e .distributedElector .IsLeader (e .ctx )
if err != nil {
return
}
runJob (f )
return
}
if e .distributedLocker != nil {
l , err := e .distributedLocker .Lock (f .ctx , lockKey )
if err != nil || l == nil {
return
}
defer func () {
durationToNextRun := time .Until (f .jobFuncNextRun )
if durationToNextRun > time .Second *5 {
durationToNextRun = time .Second * 5
}
delay := time .Duration (float64 (durationToNextRun ) * 0.9 )
if e .limitModeMaxRunningJobs > 0 {
time .AfterFunc (delay , func () {
_ = l .Unlock (f .ctx )
})
return
}
if durationToNextRun > time .Millisecond *100 {
timer := time .NewTimer (delay )
defer timer .Stop ()
select {
case <- e .ctx .Done ():
case <- timer .C :
}
}
_ = l .Unlock (f .ctx )
}()
runJob (f )
return
}
runJob (f )
case singletonMode :
e .singletonWgs .Store (f .singletonWg , f .singletonWgMu )
if !f .singletonRunnerOn .Load () {
go f .singletonRunner ()
}
f .singletonQueueMu .Lock ()
f .singletonQueue <- struct {}{}
f .singletonQueueMu .Unlock ()
}
}
func (e *executor ) run () {
for {
select {
case f := <- e .jobFunctions :
if e .stopped .Load () || e .skipExecution .Load () {
continue
}
if e .limitModeMaxRunningJobs > 0 {
countRunning := e .limitModeFuncsRunning .Load ()
if countRunning < int64 (e .limitModeMaxRunningJobs ) {
diff := int64 (e .limitModeMaxRunningJobs ) - countRunning
for i := int64 (0 ); i < diff ; i ++ {
e .limitModeFuncWg .Add (1 )
go e .limitModeRunner ()
e .limitModeFuncsRunning .Inc ()
}
}
}
e .jobsWg .Add (1 )
go func () {
defer e .jobsWg .Done ()
if e .limitModeMaxRunningJobs > 0 {
switch e .limitMode {
case RescheduleMode :
if e .limitModeRunningJobs .Load () < int64 (e .limitModeMaxRunningJobs ) {
select {
case e .limitModeQueue <- f :
e .limitModeRunningJobs .Inc ()
case <- e .ctx .Done ():
}
}
case WaitMode :
select {
case e .limitModeQueue <- f :
case <- e .ctx .Done ():
}
}
return
}
e .runJob (f )
}()
case <- e .ctx .Done ():
e .jobsWg .Wait ()
e .wg .Done ()
return
}
}
}
func (e *executor ) stop () {
e .stopped .Store (true )
e .cancel ()
e .wg .Wait ()
if e .singletonWgs != nil {
e .singletonWgs .Range (func (key , value interface {}) bool {
wg , wgOk := key .(*sync .WaitGroup )
mu , muOk := value .(*sync .Mutex )
if wgOk && muOk {
mu .Lock ()
wg .Wait ()
mu .Unlock ()
}
return true
})
}
if e .limitModeMaxRunningJobs > 0 {
e .limitModeFuncWg .Wait ()
e .limitModeQueueMu .Lock ()
e .limitModeQueue = nil
e .limitModeQueueMu .Unlock ()
}
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .