package gocron
import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/robfig/cron/v3"
"go.uber.org/atomic"
)
type limitMode int8
type Scheduler struct {
jobsMutex sync .RWMutex
jobs map [uuid .UUID ]*Job
locationMutex sync .RWMutex
location *time .Location
running *atomic .Bool
time TimeWrapper
timer func (d time .Duration , f func ()) *time .Timer
executor *executor
tags sync .Map
tagsUnique bool
updateJob bool
waitForInterval bool
singletonMode bool
startBlockingStopChanMutex sync .Mutex
startBlockingStopChan chan struct {}
inScheduleChain *uuid .UUID
}
const allWeekDays = 7
func NewScheduler (loc *time .Location ) *Scheduler {
executor := newExecutor ()
s := &Scheduler {
location : loc ,
running : atomic .NewBool (false ),
time : &trueTime {},
executor : &executor ,
tagsUnique : false ,
timer : afterFunc ,
}
s .jobsMutex .Lock ()
s .jobs = map [uuid .UUID ]*Job {}
s .jobsMutex .Unlock ()
return s
}
func (s *Scheduler ) SetMaxConcurrentJobs (n int , mode limitMode ) {
s .executor .limitModeMaxRunningJobs = n
s .executor .limitMode = mode
}
func (s *Scheduler ) StartBlocking () {
s .StartAsync ()
s .startBlockingStopChanMutex .Lock ()
s .startBlockingStopChan = make (chan struct {}, 1 )
s .startBlockingStopChanMutex .Unlock ()
<-s .startBlockingStopChan
s .startBlockingStopChanMutex .Lock ()
s .startBlockingStopChan = nil
s .startBlockingStopChanMutex .Unlock ()
}
func (s *Scheduler ) StartAsync () {
if !s .IsRunning () {
s .start ()
}
}
func (s *Scheduler ) start () {
s .executor .start ()
s .setRunning (true )
s .runJobs ()
}
func (s *Scheduler ) runJobs () {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
for _ , job := range s .jobs {
ctx , cancel := context .WithCancel (context .Background ())
job .mu .Lock ()
job .ctx = ctx
job .cancel = cancel
job .mu .Unlock ()
s .runContinuous (job )
}
}
func (s *Scheduler ) setRunning (b bool ) {
s .running .Store (b )
}
func (s *Scheduler ) IsRunning () bool {
return s .running .Load ()
}
func (s *Scheduler ) Jobs () []*Job {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
jobs := make ([]*Job , len (s .jobs ))
var counter int
for _ , job := range s .jobs {
jobs [counter ] = job
counter ++
}
return jobs
}
func (s *Scheduler ) JobsMap () map [uuid .UUID ]*Job {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
jobs := make (map [uuid .UUID ]*Job , len (s .jobs ))
for id , job := range s .jobs {
jobs [id ] = job
}
return jobs
}
func (s *Scheduler ) Name (name string ) *Scheduler {
job := s .getCurrentJob ()
job .jobName = name
return s
}
func (s *Scheduler ) Len () int {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
return len (s .jobs )
}
func (s *Scheduler ) ChangeLocation (newLocation *time .Location ) {
s .locationMutex .Lock ()
defer s .locationMutex .Unlock ()
s .location = newLocation
}
func (s *Scheduler ) Location () *time .Location {
s .locationMutex .RLock ()
defer s .locationMutex .RUnlock ()
return s .location
}
type nextRun struct {
duration time .Duration
dateTime time .Time
}
func (s *Scheduler ) scheduleNextRun (job *Job ) (bool , nextRun ) {
now := s .now ()
if !s .jobPresent (job ) {
return false , nextRun {}
}
lastRun := now
if job .neverRan () {
if !job .startAtTime .IsZero () && job .startAtTime .Before (now ) {
dur := s .durationToNextRun (job .startAtTime , job ).duration
job .setStartAtTime (job .startAtTime .Add (dur ))
if job .startAtTime .Before (now ) {
diff := now .Sub (job .startAtTime )
dur := s .durationToNextRun (job .startAtTime , job ).duration
var count time .Duration
if dur != 0 {
count = diff / dur
if diff %dur != 0 {
count ++
}
}
job .setStartAtTime (job .startAtTime .Add (dur * count ))
}
}
} else {
lastRun = job .NextRun ()
}
if !job .shouldRun () {
_ = s .RemoveByID (job )
return false , nextRun {}
}
next := s .durationToNextRun (lastRun , job )
jobNextRun := job .NextRun ()
if jobNextRun .After (now ) {
job .setLastRun (now )
} else {
job .setLastRun (jobNextRun )
}
if next .dateTime .IsZero () {
next .dateTime = lastRun .Add (next .duration )
job .setNextRun (next .dateTime )
} else {
job .setNextRun (next .dateTime )
}
return true , next
}
func (s *Scheduler ) durationToNextRun (lastRun time .Time , job *Job ) nextRun {
if job .getFirstAtTime () == 0 && job .getStartAtTime ().After (lastRun ) {
sa := job .getStartAtTime ()
if job .unit == days || job .unit == weeks || job .unit == months {
job .addAtTime (
time .Duration (sa .Hour ())*time .Hour +
time .Duration (sa .Minute ())*time .Minute +
time .Duration (sa .Second ())*time .Second ,
)
}
return nextRun {duration : sa .Sub (s .now ()), dateTime : sa }
}
var next nextRun
switch job .getUnit () {
case milliseconds , seconds , minutes , hours :
next .duration = s .calculateDuration (job )
case days :
next = s .calculateDays (job , lastRun )
case weeks :
if len (job .scheduledWeekdays ) != 0 {
next = s .calculateWeekday (job , lastRun )
} else {
next = s .calculateWeeks (job , lastRun )
}
if next .dateTime .Before (job .getStartAtTime ()) {
return s .durationToNextRun (job .getStartAtTime (), job )
}
case months :
next = s .calculateMonths (job , lastRun )
case duration :
next .duration = job .getDuration ()
case crontab :
next .dateTime = job .cronSchedule .Next (lastRun )
next .duration = next .dateTime .Sub (lastRun )
}
return next
}
func (s *Scheduler ) calculateMonths (job *Job , lastRun time .Time ) nextRun {
if len (job .daysOfTheMonth ) == 1 && job .daysOfTheMonth [0 ] < 0 {
return calculateNextRunForLastDayOfMonth (s , job , lastRun , job .daysOfTheMonth [0 ])
}
if len (job .daysOfTheMonth ) != 0 {
nextRunDateMap := make (map [int ]nextRun )
for _ , day := range job .daysOfTheMonth {
nextRunDateMap [day ] = calculateNextRunForMonth (s , job , lastRun , day )
}
nextRunResult := nextRun {}
for _ , val := range nextRunDateMap {
if nextRunResult .dateTime .IsZero () {
nextRunResult = val
} else if nextRunResult .dateTime .Sub (val .dateTime ).Milliseconds () > 0 {
nextRunResult = val
}
}
return nextRunResult
}
next := s .roundToMidnightAndAddDSTAware (lastRun , job .getFirstAtTime ()).AddDate (0 , job .getInterval (), 0 )
return nextRun {duration : until (lastRun , next ), dateTime : next }
}
func calculateNextRunForLastDayOfMonth(s *Scheduler , job *Job , lastRun time .Time , dayBeforeLastOfMonth int ) nextRun {
addMonth := job .getInterval ()
atTime := job .getAtTime (lastRun )
if testDate := lastRun .AddDate (0 , 0 , -dayBeforeLastOfMonth ); testDate .Month () != lastRun .Month () &&
!s .roundToMidnightAndAddDSTAware (lastRun , atTime ).After (lastRun ) {
addMonth ++
atTime = job .getFirstAtTime ()
}
next := time .Date (lastRun .Year (), lastRun .Month (), 1 , 0 , 0 , 0 , 0 , s .Location ()).
Add (atTime ).
AddDate (0 , addMonth , 0 ).
AddDate (0 , 0 , dayBeforeLastOfMonth )
return nextRun {duration : until (lastRun , next ), dateTime : next }
}
func calculateNextRunForMonth(s *Scheduler , job *Job , lastRun time .Time , dayOfMonth int ) nextRun {
atTime := job .getAtTime (lastRun )
natTime := atTime
hours , minutes , seconds := s .deconstructDuration (atTime )
jobDay := time .Date (lastRun .Year (), lastRun .Month (), dayOfMonth , hours , minutes , seconds , 0 , s .Location ())
difference := absDuration (lastRun .Sub (jobDay ))
next := lastRun
if jobDay .Before (lastRun ) {
next = next .AddDate (0 , job .getInterval (), -0 )
next = next .Add (-difference )
natTime = job .getFirstAtTime ()
} else {
if job .getInterval () == 1 && !jobDay .Equal (lastRun ) {
next = next .AddDate (0 , job .getInterval ()-1 , 0 )
} else {
next = next .AddDate (0 , job .getInterval (), 0 )
natTime = job .getFirstAtTime ()
}
next = next .Add (difference )
}
if atTime != natTime {
next = next .Add (-atTime ).Add (natTime )
}
return nextRun {duration : until (lastRun , next ), dateTime : next }
}
func (s *Scheduler ) calculateWeekday (job *Job , lastRun time .Time ) nextRun {
daysToWeekday := s .remainingDaysToWeekday (lastRun , job )
totalDaysDifference := s .calculateTotalDaysDifference (lastRun , daysToWeekday , job )
acTime := job .getAtTime (lastRun )
if totalDaysDifference > 0 {
acTime = job .getFirstAtTime ()
}
next := s .roundToMidnightAndAddDSTAware (lastRun , acTime ).AddDate (0 , 0 , totalDaysDifference )
return nextRun {duration : until (lastRun , next ), dateTime : next }
}
func (s *Scheduler ) calculateWeeks (job *Job , lastRun time .Time ) nextRun {
totalDaysDifference := int (job .getInterval ()) * 7
var next time .Time
atTimes := job .atTimes
for _ , at := range atTimes {
n := s .roundToMidnightAndAddDSTAware (lastRun , at )
if n .After (s .now ()) {
next = n
break
}
}
if next .IsZero () {
next = s .roundToMidnightAndAddDSTAware (lastRun , job .getFirstAtTime ()).AddDate (0 , 0 , totalDaysDifference )
}
return nextRun {duration : until (lastRun , next ), dateTime : next }
}
func (s *Scheduler ) calculateTotalDaysDifference (lastRun time .Time , daysToWeekday int , job *Job ) int {
if job .getInterval () > 1 {
weekDays := job .Weekdays ()
if job .lastRun .Weekday () != weekDays [len (weekDays )-1 ] {
return daysToWeekday
}
if daysToWeekday > 0 {
return int (job .getInterval ())*7 - (allWeekDays - daysToWeekday )
}
return int (job .getInterval ()) * 7
}
if daysToWeekday == 0 {
lastRunAtTime := time .Date (lastRun .Year (), lastRun .Month (), lastRun .Day (), 0 , 0 , 0 , 0 , s .Location ()).Add (job .getAtTime (lastRun ))
if lastRun .Before (lastRunAtTime ) {
return 0
}
return 7
}
return daysToWeekday
}
func (s *Scheduler ) calculateDays (job *Job , lastRun time .Time ) nextRun {
nextRunAtTime := s .roundToMidnightAndAddDSTAware (lastRun , job .getAtTime (lastRun )).In (s .Location ())
if s .now ().After (nextRunAtTime ) || s .now () == nextRunAtTime {
nextRunAtTime = nextRunAtTime .AddDate (0 , 0 , job .getInterval ())
}
return nextRun {duration : until (lastRun , nextRunAtTime ), dateTime : nextRunAtTime }
}
func until(from time .Time , until time .Time ) time .Duration {
return until .Sub (from )
}
func in(scheduleWeekdays []time .Weekday , weekday time .Weekday ) bool {
in := false
for _ , weekdayInSchedule := range scheduleWeekdays {
if int (weekdayInSchedule ) == int (weekday ) {
in = true
break
}
}
return in
}
func (s *Scheduler ) calculateDuration (job *Job ) time .Duration {
interval := job .getInterval ()
switch job .getUnit () {
case milliseconds :
return time .Duration (interval ) * time .Millisecond
case seconds :
return time .Duration (interval ) * time .Second
case minutes :
return time .Duration (interval ) * time .Minute
default :
return time .Duration (interval ) * time .Hour
}
}
func (s *Scheduler ) remainingDaysToWeekday (lastRun time .Time , job *Job ) int {
weekDays := job .Weekdays ()
sort .Slice (weekDays , func (i , j int ) bool {
return weekDays [i ] < weekDays [j ]
})
equals := false
lastRunWeekday := lastRun .Weekday ()
index := sort .Search (len (weekDays ), func (i int ) bool {
b := weekDays [i ] >= lastRunWeekday
if b {
equals = weekDays [i ] == lastRunWeekday
}
return b
})
if equals {
if s .roundToMidnightAndAddDSTAware (lastRun , job .getAtTime (lastRun )).After (lastRun ) {
return 0
}
index ++
}
if index < len (weekDays ) {
return int (weekDays [index ] - lastRunWeekday )
}
return int (weekDays [0 ]) + allWeekDays - int (lastRunWeekday )
}
func absDuration(a time .Duration ) time .Duration {
if a >= 0 {
return a
}
return -a
}
func (s *Scheduler ) deconstructDuration (d time .Duration ) (hours int , minutes int , seconds int ) {
hours = int (d .Seconds ()) / int (time .Hour /time .Second )
minutes = (int (d .Seconds ()) % int (time .Hour /time .Second )) / int (time .Minute /time .Second )
seconds = int (d .Seconds ()) % int (time .Minute /time .Second )
return
}
func (s *Scheduler ) roundToMidnightAndAddDSTAware (t time .Time , d time .Duration ) time .Time {
hours , minutes , seconds := s .deconstructDuration (d )
return time .Date (t .Year (), t .Month (), t .Day (), hours , minutes , seconds , 0 , s .Location ())
}
func (s *Scheduler ) NextRun () (*Job , time .Time ) {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
if len (s .jobs ) <= 0 {
return nil , time .Time {}
}
var jobID uuid .UUID
var nearestRun time .Time
for _ , job := range s .jobs {
nr := job .NextRun ()
if (nr .Before (nearestRun ) || nearestRun .IsZero ()) && s .now ().Before (nr ) {
nearestRun = nr
jobID = job .id
}
}
return s .jobs [jobID ], nearestRun
}
func (s *Scheduler ) EveryRandom (lower , upper int ) *Scheduler {
job := s .getCurrentJob ()
job .setRandomInterval (lower , upper )
return s
}
func (s *Scheduler ) Every (interval interface {}) *Scheduler {
job := s .getCurrentJob ()
switch interval := interval .(type ) {
case int :
job .interval = interval
if interval <= 0 {
job .error = wrapOrError (job .error , ErrInvalidInterval )
}
case time .Duration :
if interval <= 0 {
job .error = wrapOrError (job .error , ErrInvalidInterval )
}
job .setInterval (0 )
job .setDuration (interval )
job .setUnit (duration )
case string :
d , err := time .ParseDuration (interval )
if err != nil {
job .error = wrapOrError (job .error , err )
}
if d <= 0 {
job .error = wrapOrError (job .error , ErrInvalidInterval )
}
job .setDuration (d )
job .setUnit (duration )
default :
job .error = wrapOrError (job .error , ErrInvalidIntervalType )
}
return s
}
func (s *Scheduler ) run (job *Job ) {
if !s .IsRunning () {
return
}
job .mu .Lock ()
if job .function == nil {
job .mu .Unlock ()
s .Remove (job )
return
}
defer job .mu .Unlock ()
if job .runWithDetails {
switch len (job .parameters ) {
case job .parametersLen :
job .parameters = append (job .parameters , job .copy ())
case job .parametersLen + 1 :
job .parameters [job .parametersLen ] = job .copy ()
default :
job .error = wrapOrError (job .error , ErrInvalidFunctionParameters )
return
}
}
s .executor .jobFunctions <- job .jobFunction .copy ()
}
func (s *Scheduler ) runContinuous (job *Job ) {
shouldRun , next := s .scheduleNextRun (job )
if !shouldRun {
return
}
if !job .getStartsImmediately () {
job .setStartsImmediately (true )
} else {
s .run (job )
}
nr := next .dateTime .Sub (s .now ())
if nr < 0 {
job .setLastRun (s .now ())
shouldRun , next := s .scheduleNextRun (job )
if !shouldRun {
return
}
nr = next .dateTime .Sub (s .now ())
}
job .setTimer (s .timer (nr , func () {
if !next .dateTime .IsZero () {
for {
n := s .now ().UnixNano () - next .dateTime .UnixNano ()
if n >= 0 {
break
}
select {
case <- s .executor .ctx .Done ():
case <- time .After (time .Duration (n )):
}
}
}
s .runContinuous (job )
}))
}
func (s *Scheduler ) RunAll () {
s .RunAllWithDelay (0 )
}
func (s *Scheduler ) RunAllWithDelay (d time .Duration ) {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
for _ , job := range s .jobs {
s .run (job )
s .time .Sleep (d )
}
}
func (s *Scheduler ) RunByTag (tag string ) error {
return s .RunByTagWithDelay (tag , 0 )
}
func (s *Scheduler ) RunByTagWithDelay (tag string , d time .Duration ) error {
jobs , err := s .FindJobsByTag (tag )
if err != nil {
return err
}
for _ , job := range jobs {
s .run (job )
s .time .Sleep (d )
}
return nil
}
func (s *Scheduler ) Remove (job interface {}) {
fName := getFunctionName (job )
j := s .findJobByTaskName (fName )
s .removeJobsUniqueTags (j )
s .removeByCondition (func (someJob *Job ) bool {
return someJob .funcName == fName
})
}
func (s *Scheduler ) RemoveByReference (job *Job ) {
_ = s .RemoveByID (job )
}
func (s *Scheduler ) findJobByTaskName (name string ) *Job {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
for _ , job := range s .jobs {
if job .funcName == name {
return job
}
}
return nil
}
func (s *Scheduler ) removeJobsUniqueTags (job *Job ) {
if job == nil {
return
}
if s .tagsUnique && len (job .tags ) > 0 {
for _ , tag := range job .tags {
s .tags .Delete (tag )
}
}
}
func (s *Scheduler ) removeByCondition (shouldRemove func (*Job ) bool ) {
s .jobsMutex .Lock ()
defer s .jobsMutex .Unlock ()
for _ , job := range s .jobs {
if shouldRemove (job ) {
s .stopJob (job )
delete (s .jobs , job .id )
}
}
}
func (s *Scheduler ) stopJob (job *Job ) {
job .mu .Lock ()
if job .runConfig .mode == singletonMode {
s .executor .singletonWgs .Delete (job .singletonWg )
}
job .mu .Unlock ()
job .stop ()
}
func (s *Scheduler ) RemoveByTag (tag string ) error {
return s .RemoveByTags (tag )
}
func (s *Scheduler ) RemoveByTags (tags ...string ) error {
jobs , err := s .FindJobsByTag (tags ...)
if err != nil {
return err
}
for _ , job := range jobs {
_ = s .RemoveByID (job )
}
return nil
}
func (s *Scheduler ) RemoveByTagsAny (tags ...string ) error {
var errs error
mJob := make (map [*Job ]struct {})
for _ , tag := range tags {
jobs , err := s .FindJobsByTag (tag )
if err != nil {
errs = wrapOrError (errs , fmt .Errorf ("%s: %s" , err .Error(), tag ))
}
for _ , job := range jobs {
mJob [job ] = struct {}{}
}
}
for job := range mJob {
_ = s .RemoveByID (job )
}
return errs
}
func (s *Scheduler ) RemoveByID (job *Job ) error {
s .jobsMutex .Lock ()
defer s .jobsMutex .Unlock ()
if _ , ok := s .jobs [job .id ]; ok {
s .removeJobsUniqueTags (job )
s .stopJob (job )
delete (s .jobs , job .id )
return nil
}
return ErrJobNotFound
}
func (s *Scheduler ) FindJobsByTag (tags ...string ) ([]*Job , error ) {
var jobs []*Job
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
Jobs :
for _ , job := range s .jobs {
if job .hasTags (tags ...) {
jobs = append (jobs , job )
continue Jobs
}
}
if len (jobs ) > 0 {
return jobs , nil
}
return nil , ErrJobNotFoundWithTag
}
func (s *Scheduler ) MonthFirstWeekday (weekday time .Weekday ) *Scheduler {
_ , month , day := s .time .Now (time .UTC ).Date ()
if day < 7 {
return s .Cron (fmt .Sprintf ("0 0 %d %d %d" , day , month , weekday ))
}
return s .Cron (fmt .Sprintf ("0 0 %d %d %d" , day , month +1 , weekday ))
}
func (s *Scheduler ) LimitRunsTo (i int ) *Scheduler {
job := s .getCurrentJob ()
job .LimitRunsTo (i )
return s
}
func (s *Scheduler ) SingletonMode () *Scheduler {
job := s .getCurrentJob ()
job .SingletonMode ()
return s
}
func (s *Scheduler ) SingletonModeAll () {
s .singletonMode = true
}
func (s *Scheduler ) TaskPresent (j interface {}) bool {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
for _ , job := range s .jobs {
if job .funcName == getFunctionName (j ) {
return true
}
}
return false
}
func (s *Scheduler ) jobPresent (j *Job ) bool {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
if _ , ok := s .jobs [j .id ]; ok {
return true
}
return false
}
func (s *Scheduler ) Clear () {
s .stopJobs ()
s .jobsMutex .Lock ()
defer s .jobsMutex .Unlock ()
s .jobs = make (map [uuid .UUID ]*Job )
if s .tagsUnique {
s .tags .Range (func (key interface {}, value interface {}) bool {
s .tags .Delete (key )
return true
})
}
}
func (s *Scheduler ) Stop () {
if s .IsRunning () {
s .stop ()
}
}
func (s *Scheduler ) stop () {
s .stopJobs ()
s .executor .stop ()
s .StopBlockingChan ()
s .setRunning (false )
}
func (s *Scheduler ) stopJobs () {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
for _ , job := range s .jobs {
job .stop ()
}
}
func (s *Scheduler ) doCommon (jobFun interface {}, params ...interface {}) (*Job , error ) {
job := s .getCurrentJob ()
s .inScheduleChain = nil
jobUnit := job .getUnit ()
jobLastRun := job .LastRun ()
if job .getAtTime (jobLastRun ) != 0 && (jobUnit <= hours || jobUnit >= duration ) {
job .error = wrapOrError (job .error , ErrAtTimeNotSupported )
}
if len (job .scheduledWeekdays ) != 0 && jobUnit != weeks {
job .error = wrapOrError (job .error , ErrWeekdayNotSupported )
}
if job .unit != crontab && job .getInterval () == 0 {
if job .unit != duration {
job .error = wrapOrError (job .error , ErrInvalidInterval )
}
}
if job .error != nil {
_ = s .RemoveByID (job )
return nil , job .error
}
val := reflect .ValueOf (jobFun )
for val .Kind () == reflect .Ptr {
val = val .Elem ()
}
if val .Kind () != reflect .Func {
_ = s .RemoveByID (job )
return nil , ErrNotAFunction
}
var fname string
if val == reflect .ValueOf (jobFun ) {
fname = getFunctionName (jobFun )
} else {
fname = getFunctionNameOfPointer (jobFun )
}
if job .funcName != fname {
job .function = jobFun
if val != reflect .ValueOf (jobFun ) {
job .function = val .Interface ()
}
job .parameters = params
job .funcName = fname
}
expectedParamLength := val .Type ().NumIn ()
if job .runWithDetails {
expectedParamLength --
}
if len (params ) != expectedParamLength {
_ = s .RemoveByID (job )
job .error = wrapOrError (job .error , ErrWrongParams )
return nil , job .error
}
if job .runWithDetails && val .Type ().In (len (params )).Kind () != reflect .ValueOf (*job ).Kind () {
_ = s .RemoveByID (job )
job .error = wrapOrError (job .error , ErrDoWithJobDetails )
return nil , job .error
}
if s .IsRunning () {
s .runContinuous (job )
}
return job , nil
}
func (s *Scheduler ) Do (jobFun interface {}, params ...interface {}) (*Job , error ) {
return s .doCommon (jobFun , params ...)
}
func (s *Scheduler ) DoWithJobDetails (jobFun interface {}, params ...interface {}) (*Job , error ) {
job := s .getCurrentJob ()
job .runWithDetails = true
job .parametersLen = len (params )
return s .doCommon (jobFun , params ...)
}
func (s *Scheduler ) At (i interface {}) *Scheduler {
job := s .getCurrentJob ()
switch t := i .(type ) {
case string :
for _ , tt := range strings .Split (t , ";" ) {
hour , min , sec , err := parseTime (tt )
if err != nil {
job .error = wrapOrError (job .error , err )
return s
}
job .addAtTime (time .Duration (hour )*time .Hour + time .Duration (min )*time .Minute + time .Duration (sec )*time .Second )
}
case time .Time :
job .addAtTime (time .Duration (t .Hour ())*time .Hour + time .Duration (t .Minute ())*time .Minute + time .Duration (t .Second ())*time .Second + time .Duration (t .Nanosecond ())*time .Nanosecond )
default :
job .error = wrapOrError (job .error , ErrUnsupportedTimeFormat )
}
job .startsImmediately = false
return s
}
func (s *Scheduler ) Tag (t ...string ) *Scheduler {
job := s .getCurrentJob ()
if s .tagsUnique {
for _ , tag := range t {
if _ , ok := s .tags .Load (tag ); ok {
job .error = wrapOrError (job .error , ErrTagsUnique (tag ))
return s
}
s .tags .Store (tag , struct {}{})
}
}
job .tags = append (job .tags , t ...)
return s
}
func (s *Scheduler ) GetAllTags () []string {
var tags []string
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
for _ , job := range s .jobs {
tags = append (tags , job .Tags ()...)
}
return tags
}
func (s *Scheduler ) StartAt (t time .Time ) *Scheduler {
job := s .getCurrentJob ()
job .setStartAtTime (t )
job .startsImmediately = false
return s
}
func (s *Scheduler ) setUnit (unit schedulingUnit ) {
job := s .getCurrentJob ()
currentUnit := job .getUnit ()
if currentUnit == duration || currentUnit == crontab {
job .error = wrapOrError (job .error , ErrInvalidIntervalUnitsSelection )
return
}
job .setUnit (unit )
}
func (s *Scheduler ) Millisecond () *Scheduler {
return s .Milliseconds ()
}
func (s *Scheduler ) Milliseconds () *Scheduler {
s .setUnit (milliseconds )
return s
}
func (s *Scheduler ) Second () *Scheduler {
return s .Seconds ()
}
func (s *Scheduler ) Seconds () *Scheduler {
s .setUnit (seconds )
return s
}
func (s *Scheduler ) Minute () *Scheduler {
return s .Minutes ()
}
func (s *Scheduler ) Minutes () *Scheduler {
s .setUnit (minutes )
return s
}
func (s *Scheduler ) Hour () *Scheduler {
return s .Hours ()
}
func (s *Scheduler ) Hours () *Scheduler {
s .setUnit (hours )
return s
}
func (s *Scheduler ) Day () *Scheduler {
return s .Days ()
}
func (s *Scheduler ) Days () *Scheduler {
s .setUnit (days )
return s
}
func (s *Scheduler ) Week () *Scheduler {
s .setUnit (weeks )
return s
}
func (s *Scheduler ) Weeks () *Scheduler {
s .setUnit (weeks )
return s
}
func (s *Scheduler ) Month (daysOfMonth ...int ) *Scheduler {
return s .Months (daysOfMonth ...)
}
func (s *Scheduler ) MonthLastDay (dayCountBeforeLastDayOfMonth ...int ) *Scheduler {
job := s .getCurrentJob ()
switch l := len (dayCountBeforeLastDayOfMonth ); l {
case 0 :
return s .Months (-1 )
case 1 :
count := dayCountBeforeLastDayOfMonth [0 ]
if count >= 0 {
job .error = wrapOrError (job .error , ErrInvalidMonthLastDayEntry )
return s
}
return s .Months (count - 1 )
default :
job .error = wrapOrError (job .error , ErrInvalidMonthLastDayEntry )
return s
}
}
func (s *Scheduler ) Months (daysOfTheMonth ...int ) *Scheduler {
job := s .getCurrentJob ()
if len (daysOfTheMonth ) == 0 {
job .error = wrapOrError (job .error , ErrInvalidDayOfMonthEntry )
} else if len (daysOfTheMonth ) == 1 {
dayOfMonth := daysOfTheMonth [0 ]
if dayOfMonth < -28 || dayOfMonth == 0 || dayOfMonth > 28 {
job .error = wrapOrError (job .error , ErrInvalidDayOfMonthEntry )
}
} else {
repeatMap := make (map [int ]int )
for _ , dayOfMonth := range daysOfTheMonth {
if dayOfMonth < 1 || dayOfMonth > 28 {
job .error = wrapOrError (job .error , ErrInvalidDayOfMonthEntry )
break
}
for _ , dayOfMonthInJob := range job .daysOfTheMonth {
if dayOfMonthInJob == dayOfMonth {
job .error = wrapOrError (job .error , ErrInvalidDaysOfMonthDuplicateValue )
break
}
}
if _ , ok := repeatMap [dayOfMonth ]; ok {
job .error = wrapOrError (job .error , ErrInvalidDaysOfMonthDuplicateValue )
break
}
repeatMap [dayOfMonth ]++
}
}
if job .daysOfTheMonth == nil {
job .daysOfTheMonth = make ([]int , 0 )
}
job .daysOfTheMonth = append (job .daysOfTheMonth , daysOfTheMonth ...)
job .startsImmediately = false
s .setUnit (months )
return s
}
func (s *Scheduler ) Weekday (weekDay time .Weekday ) *Scheduler {
job := s .getCurrentJob ()
if in := in (job .scheduledWeekdays , weekDay ); !in {
job .scheduledWeekdays = append (job .scheduledWeekdays , weekDay )
}
job .startsImmediately = false
s .setUnit (weeks )
return s
}
func (s *Scheduler ) Midday () *Scheduler {
return s .At ("12:00" )
}
func (s *Scheduler ) Monday () *Scheduler {
return s .Weekday (time .Monday )
}
func (s *Scheduler ) Tuesday () *Scheduler {
return s .Weekday (time .Tuesday )
}
func (s *Scheduler ) Wednesday () *Scheduler {
return s .Weekday (time .Wednesday )
}
func (s *Scheduler ) Thursday () *Scheduler {
return s .Weekday (time .Thursday )
}
func (s *Scheduler ) Friday () *Scheduler {
return s .Weekday (time .Friday )
}
func (s *Scheduler ) Saturday () *Scheduler {
return s .Weekday (time .Saturday )
}
func (s *Scheduler ) Sunday () *Scheduler {
return s .Weekday (time .Sunday )
}
func (s *Scheduler ) getCurrentJob () *Job {
if s .inScheduleChain == nil {
s .jobsMutex .Lock ()
j := s .newJob (0 )
s .jobs [j .id ] = j
s .jobsMutex .Unlock ()
s .inScheduleChain = &j .id
return j
}
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
return s .jobs [*s .inScheduleChain ]
}
func (s *Scheduler ) now () time .Time {
return s .time .Now (s .Location ())
}
func (s *Scheduler ) TagsUnique () {
s .tagsUnique = true
}
func (s *Scheduler ) Job (j *Job ) *Scheduler {
if job , ok := s .JobsMap ()[j .id ]; !ok {
return s
} else if job != j {
return s
}
s .inScheduleChain = &j .id
s .updateJob = true
return s
}
func (s *Scheduler ) Update () (*Job , error ) {
job := s .getCurrentJob ()
if !s .updateJob {
return job , wrapOrError (job .error , ErrUpdateCalledWithoutJob )
}
s .updateJob = false
job .stop ()
job .setStartsImmediately (false )
if job .runWithDetails {
params := job .parameters
if len (params ) > 0 {
params = job .parameters [:len (job .parameters )-1 ]
}
return s .DoWithJobDetails (job .function , params ...)
}
if job .runConfig .mode == singletonMode {
job .SingletonMode ()
}
return s .Do (job .function , job .parameters ...)
}
func (s *Scheduler ) Cron (cronExpression string ) *Scheduler {
return s .cron (cronExpression , false )
}
func (s *Scheduler ) CronWithSeconds (cronExpression string ) *Scheduler {
return s .cron (cronExpression , true )
}
func (s *Scheduler ) cron (cronExpression string , withSeconds bool ) *Scheduler {
job := s .getCurrentJob ()
var withLocation string
if strings .HasPrefix (cronExpression , "TZ=" ) || strings .HasPrefix (cronExpression , "CRON_TZ=" ) {
withLocation = cronExpression
} else {
withLocation = fmt .Sprintf ("CRON_TZ=%s %s" , s .location .String (), cronExpression )
}
var (
cronSchedule cron .Schedule
err error
)
if withSeconds {
p := cron .NewParser (cron .Second | cron .Minute | cron .Hour | cron .Dom | cron .Month | cron .Dow | cron .Descriptor )
cronSchedule , err = p .Parse (withLocation )
} else {
cronSchedule , err = cron .ParseStandard (withLocation )
}
if err != nil {
job .error = wrapOrError (err , ErrCronParseFailure )
}
job .cronSchedule = cronSchedule
job .setUnit (crontab )
job .startsImmediately = false
return s
}
func (s *Scheduler ) newJob (interval int ) *Job {
return newJob (interval , !s .waitForInterval , s .singletonMode )
}
func (s *Scheduler ) WaitForScheduleAll () {
s .waitForInterval = true
}
func (s *Scheduler ) WaitForSchedule () *Scheduler {
job := s .getCurrentJob ()
job .startsImmediately = false
return s
}
func (s *Scheduler ) StartImmediately () *Scheduler {
job := s .getCurrentJob ()
job .startsImmediately = true
return s
}
func (s *Scheduler ) CustomTime (customTimeWrapper TimeWrapper ) {
s .time = customTimeWrapper
}
func (s *Scheduler ) CustomTimer (customTimer func (d time .Duration , f func ()) *time .Timer ) {
s .timer = customTimer
}
func (s *Scheduler ) StopBlockingChan () {
s .startBlockingStopChanMutex .Lock ()
if s .IsRunning () && s .startBlockingStopChan != nil {
close (s .startBlockingStopChan )
}
s .startBlockingStopChanMutex .Unlock ()
}
func (s *Scheduler ) WithDistributedLocker (l Locker ) {
s .executor .distributedLocker = l
}
func (s *Scheduler ) WithDistributedElector (e Elector ) {
s .executor .distributedElector = e
}
func (s *Scheduler ) RegisterEventListeners (eventListeners ...EventListener ) {
s .jobsMutex .RLock ()
defer s .jobsMutex .RUnlock ()
for _ , job := range s .jobs {
job .RegisterEventListeners (eventListeners ...)
}
}
func (s *Scheduler ) PauseJobExecution (shouldPause bool ) {
s .executor .skipExecution .Store (shouldPause )
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .