package gocron
import (
"context"
"fmt"
"math/rand"
"sort"
"sync"
"time"
"github.com/google/uuid"
"github.com/robfig/cron/v3"
"go.uber.org/atomic"
)
type Job struct {
mu *jobMutex
jobFunction
interval int
random
duration time .Duration
unit schedulingUnit
startsImmediately bool
atTimes []time .Duration
startAtTime time .Time
error error
scheduledWeekdays []time .Weekday
daysOfTheMonth []int
tags []string
timer *time .Timer
cronSchedule cron .Schedule
runWithDetails bool
}
type jobRunTimes struct {
jobRunTimesMu *sync .Mutex
previousRun time .Time
lastRun time .Time
nextRun time .Time
}
type random struct {
rand *rand .Rand
randomizeInterval bool
randomIntervalRange [2 ]int
}
type jobFunction struct {
id uuid .UUID
*jobRunTimes
eventListeners
function interface {}
parameters []interface {}
parametersLen int
jobName string
funcName string
runConfig runConfig
singletonQueueMu *sync .Mutex
singletonQueue chan struct {}
singletonRunnerOn *atomic .Bool
ctx context .Context
cancel context .CancelFunc
isRunning *atomic .Bool
runStartCount *atomic .Int64
runFinishCount *atomic .Int64
singletonWg *sync .WaitGroup
singletonWgMu *sync .Mutex
stopped *atomic .Bool
jobFuncNextRun time .Time
}
type eventListeners struct {
onAfterJobExecution interface {}
onBeforeJobExecution interface {}
beforeJobRuns func (jobName string )
afterJobRuns func (jobName string )
onError func (jobName string , err error )
noError func (jobName string )
}
type jobMutex struct {
sync .RWMutex
}
func (jf *jobFunction ) copy () jobFunction {
cp := jobFunction {
id : jf .id ,
jobRunTimes : jf .jobRunTimes ,
eventListeners : jf .eventListeners ,
function : jf .function ,
parameters : nil ,
parametersLen : jf .parametersLen ,
funcName : jf .funcName ,
jobName : jf .jobName ,
runConfig : jf .runConfig ,
singletonQueue : jf .singletonQueue ,
singletonQueueMu : jf .singletonQueueMu ,
ctx : jf .ctx ,
cancel : jf .cancel ,
isRunning : jf .isRunning ,
runStartCount : jf .runStartCount ,
runFinishCount : jf .runFinishCount ,
singletonWg : jf .singletonWg ,
singletonWgMu : jf .singletonWgMu ,
singletonRunnerOn : jf .singletonRunnerOn ,
stopped : jf .stopped ,
jobFuncNextRun : jf .jobFuncNextRun ,
}
cp .parameters = append (cp .parameters , jf .parameters ...)
return cp
}
func (jf *jobFunction ) getName () string {
if jf .jobName != "" {
return jf .jobName
}
return jf .funcName
}
type runConfig struct {
finiteRuns bool
maxRuns int
mode mode
}
type mode int8
const (
defaultMode mode = iota
singletonMode
)
func newJob(interval int , startImmediately bool , singletonMode bool ) *Job {
ctx , cancel := context .WithCancel (context .Background ())
job := &Job {
mu : &jobMutex {},
interval : interval ,
unit : seconds ,
jobFunction : jobFunction {
id : uuid .New (),
jobRunTimes : &jobRunTimes {
jobRunTimesMu : &sync .Mutex {},
lastRun : time .Time {},
nextRun : time .Time {},
},
ctx : ctx ,
cancel : cancel ,
isRunning : atomic .NewBool (false ),
runStartCount : atomic .NewInt64 (0 ),
runFinishCount : atomic .NewInt64 (0 ),
singletonRunnerOn : atomic .NewBool (false ),
stopped : atomic .NewBool (false ),
},
tags : []string {},
startsImmediately : startImmediately ,
}
if singletonMode {
job .SingletonMode ()
}
return job
}
func (j *Job ) Name (name string ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .jobName = name
}
func (j *Job ) GetName () string {
j .mu .Lock ()
defer j .mu .Unlock ()
return j .jobFunction .getName ()
}
func (j *Job ) setRandomInterval (a , b int ) {
j .random .rand = rand .New (rand .NewSource (time .Now ().UnixNano ()))
j .random .randomizeInterval = true
if a < b {
j .random .randomIntervalRange [0 ] = a
j .random .randomIntervalRange [1 ] = b + 1
} else {
j .random .randomIntervalRange [0 ] = b
j .random .randomIntervalRange [1 ] = a + 1
}
}
func (j *Job ) getRandomInterval () int {
randNum := j .rand .Intn (j .randomIntervalRange [1 ] - j .randomIntervalRange [0 ])
return j .randomIntervalRange [0 ] + randNum
}
func (j *Job ) getInterval () int {
if j .randomizeInterval {
return j .getRandomInterval ()
}
return j .interval
}
func (j *Job ) neverRan () bool {
jobLastRun := j .LastRun ()
return jobLastRun .IsZero ()
}
func (j *Job ) getStartsImmediately () bool {
return j .startsImmediately
}
func (j *Job ) setStartsImmediately (b bool ) {
j .startsImmediately = b
}
func (j *Job ) setTimer (t *time .Timer ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .timer = t
}
func (j *Job ) getFirstAtTime () time .Duration {
var t time .Duration
if len (j .atTimes ) > 0 {
t = j .atTimes [0 ]
}
return t
}
func (j *Job ) getAtTime (lastRun time .Time ) time .Duration {
if len (j .atTimes ) == 0 {
return 0
}
r := j .atTimes [0 ]
if len (j .atTimes ) == 1 || lastRun .IsZero () {
return r
}
for _ , d := range j .atTimes {
nt := time .Date (lastRun .Year (), lastRun .Month (), lastRun .Day (), 0 , 0 , 0 , 0 , lastRun .Location ()).Add (d )
if nt .After (lastRun ) {
r = d
break
}
}
return r
}
func (j *Job ) addAtTime (t time .Duration ) {
if len (j .atTimes ) == 0 {
j .atTimes = append (j .atTimes , t )
return
}
exist := false
index := sort .Search (len (j .atTimes ), func (i int ) bool {
atTime := j .atTimes [i ]
b := atTime >= t
if b {
exist = atTime == t
}
return b
})
if exist {
return
}
j .atTimes = append (j .atTimes , time .Duration (0 ))
copy (j .atTimes [index +1 :], j .atTimes [index :])
j .atTimes [index ] = t
}
func (j *Job ) getStartAtTime () time .Time {
j .mu .RLock ()
defer j .mu .RUnlock ()
return j .startAtTime
}
func (j *Job ) setStartAtTime (t time .Time ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .startAtTime = t
}
func (j *Job ) getUnit () schedulingUnit {
j .mu .RLock ()
defer j .mu .RUnlock ()
return j .unit
}
func (j *Job ) setUnit (t schedulingUnit ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .unit = t
}
func (j *Job ) getDuration () time .Duration {
j .mu .RLock ()
defer j .mu .RUnlock ()
return j .duration
}
func (j *Job ) setDuration (t time .Duration ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .duration = t
}
func (j *Job ) setInterval (i int ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .interval = i
}
func (j *Job ) hasTags (tags ...string ) bool {
jobTags := map [string ]int {}
for _ , tag := range j .tags {
jobTags [tag ] = 0
}
for _ , tag := range tags {
_ , ok := jobTags [tag ]
if !ok {
return false
}
}
return true
}
func (j *Job ) Error () error {
return j .error
}
func (j *Job ) Context () context .Context {
return j .ctx
}
func (j *Job ) Tag (tags ...string ) {
j .tags = append (j .tags , tags ...)
}
func (j *Job ) Untag (t string ) {
var newTags []string
for _ , tag := range j .tags {
if t != tag {
newTags = append (newTags , tag )
}
}
j .tags = newTags
}
func (j *Job ) Tags () []string {
return j .tags
}
type EventListener func (j *Job )
func BeforeJobRuns (eventListenerFunc func (jobName string )) EventListener {
return func (j *Job ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .eventListeners .beforeJobRuns = eventListenerFunc
}
}
func AfterJobRuns (eventListenerFunc func (jobName string )) EventListener {
return func (j *Job ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .eventListeners .afterJobRuns = eventListenerFunc
}
}
func WhenJobReturnsError (eventListenerFunc func (jobName string , err error )) EventListener {
return func (j *Job ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .eventListeners .onError = eventListenerFunc
}
}
func WhenJobReturnsNoError (eventListenerFunc func (jobName string )) EventListener {
return func (j *Job ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .eventListeners .noError = eventListenerFunc
}
}
func (j *Job ) RegisterEventListeners (eventListeners ...EventListener ) {
for _ , el := range eventListeners {
el (j )
}
}
func (j *Job ) SetEventListeners (onBeforeJobExecution interface {}, onAfterJobExecution interface {}) {
j .eventListeners = eventListeners {
onBeforeJobExecution : onBeforeJobExecution ,
onAfterJobExecution : onAfterJobExecution ,
}
}
func (j *Job ) ScheduledTime () time .Time {
j .mu .RLock ()
defer j .mu .RUnlock ()
return j .nextRun
}
func (j *Job ) ScheduledAtTime () string {
if len (j .atTimes ) == 0 {
return "00:00"
}
return fmt .Sprintf ("%02d:%02d" , j .getFirstAtTime ()/time .Hour , (j .getFirstAtTime ()%time .Hour )/time .Minute )
}
func (j *Job ) ScheduledAtTimes () []string {
r := make ([]string , len (j .atTimes ))
for i , t := range j .atTimes {
r [i ] = fmt .Sprintf ("%02d:%02d" , t /time .Hour , (t %time .Hour )/time .Minute )
}
return r
}
func (j *Job ) Weekday () (time .Weekday , error ) {
if len (j .scheduledWeekdays ) == 0 {
return time .Sunday , ErrNotScheduledWeekday
}
return j .scheduledWeekdays [0 ], nil
}
func (j *Job ) Weekdays () []time .Weekday {
if len (j .scheduledWeekdays ) == 0 {
return []time .Weekday {time .Sunday }
}
sort .Slice (j .scheduledWeekdays , func (i , k int ) bool {
return j .scheduledWeekdays [i ] < j .scheduledWeekdays [k ]
})
return j .scheduledWeekdays
}
func (j *Job ) LimitRunsTo (n int ) {
j .mu .Lock ()
defer j .mu .Unlock ()
j .runConfig .finiteRuns = true
j .runConfig .maxRuns = n
}
func (j *Job ) SingletonMode () {
j .mu .Lock ()
defer j .mu .Unlock ()
j .runConfig .mode = singletonMode
j .jobFunction .singletonWgMu = &sync .Mutex {}
j .jobFunction .singletonWgMu .Lock ()
j .jobFunction .singletonWg = &sync .WaitGroup {}
j .jobFunction .singletonWgMu .Unlock ()
j .jobFunction .singletonQueueMu = &sync .Mutex {}
j .jobFunction .singletonQueueMu .Lock ()
j .jobFunction .singletonQueue = make (chan struct {}, 100 )
j .jobFunction .singletonQueueMu .Unlock ()
}
func (j *Job ) shouldRun () bool {
j .mu .RLock ()
defer j .mu .RUnlock ()
return !j .runConfig .finiteRuns || j .runStartCount .Load () < int64 (j .runConfig .maxRuns )
}
func (j *Job ) LastRun () time .Time {
j .jobRunTimesMu .Lock ()
defer j .jobRunTimesMu .Unlock ()
return j .lastRun
}
func (j *Job ) setLastRun (t time .Time ) {
j .previousRun = j .lastRun
j .lastRun = t
}
func (j *Job ) NextRun () time .Time {
j .jobRunTimesMu .Lock ()
defer j .jobRunTimesMu .Unlock ()
return j .nextRun
}
func (j *Job ) setNextRun (t time .Time ) {
j .jobRunTimesMu .Lock ()
defer j .jobRunTimesMu .Unlock ()
j .nextRun = t
j .jobFunction .jobFuncNextRun = t
}
func (j *Job ) PreviousRun () time .Time {
j .jobRunTimesMu .Lock ()
defer j .jobRunTimesMu .Unlock ()
return j .previousRun
}
func (j *Job ) RunCount () int {
j .mu .Lock ()
defer j .mu .Unlock ()
return int (j .runStartCount .Load ())
}
func (j *Job ) FinishedRunCount () int {
j .mu .Lock ()
defer j .mu .Unlock ()
return int (j .runFinishCount .Load ())
}
func (j *Job ) stop () {
j .mu .Lock ()
defer j .mu .Unlock ()
if j .timer != nil {
j .timer .Stop ()
}
if j .cancel != nil {
j .cancel ()
j .ctx , j .cancel = context .WithCancel (context .Background ())
}
j .stopped .Store (true )
}
func (j *Job ) IsRunning () bool {
return j .isRunning .Load ()
}
func (j *Job ) copy () Job {
return Job {
mu : &jobMutex {},
jobFunction : j .jobFunction ,
interval : j .interval ,
duration : j .duration ,
unit : j .unit ,
startsImmediately : j .startsImmediately ,
atTimes : j .atTimes ,
startAtTime : j .startAtTime ,
error : j .error ,
scheduledWeekdays : j .scheduledWeekdays ,
daysOfTheMonth : j .daysOfTheMonth ,
tags : j .tags ,
timer : j .timer ,
cronSchedule : j .cronSchedule ,
runWithDetails : j .runWithDetails ,
}
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .