package gocron

import (
	
	
	

	
)

const (
	// RescheduleMode - the default is that if a limit on maximum
	// concurrent jobs is set and the limit is reached, a job will
	// skip it's run and try again on the next occurrence in the schedule
	RescheduleMode limitMode = iota

	// WaitMode - if a limit on maximum concurrent jobs is set
	// and the limit is reached, a job will wait to try and run
	// until a spot in the limit is freed up.
	//
	// Note: this mode can produce unpredictable results as
	// job execution order isn't guaranteed. For example, a job that
	// executes frequently may pile up in the wait queue and be executed
	// many times back to back when the queue opens.
	//
	// Warning: do not use this mode if your jobs will continue to stack
	// up beyond the ability of the limit workers to keep up. An example of
	// what NOT to do:
	//
	//     s.Every("1s").Do(func() {
	//         // this will result in an ever-growing number of goroutines
	//    	   // blocked trying to send to the buffered channel
	//         time.Sleep(10 * time.Minute)
	//     })
	WaitMode
)

type executor struct {
	jobFunctions  chan jobFunction   // the chan upon which the jobFunctions are passed in from the scheduler
	ctx           context.Context    // used to tell the executor to stop
	cancel        context.CancelFunc // used to tell the executor to stop
	wg            *sync.WaitGroup    // used by the scheduler to wait for the executor to stop
	jobsWg        *sync.WaitGroup    // used by the executor to wait for all jobs to finish
	singletonWgs  *sync.Map          // used by the executor to wait for the singleton runners to complete
	skipExecution *atomic.Bool       // used to pause the execution of jobs

	limitMode               limitMode        // when SetMaxConcurrentJobs() is set upon the scheduler
	limitModeMaxRunningJobs int              // stores the maximum number of concurrently running jobs
	limitModeFuncsRunning   *atomic.Int64    // tracks the count of limited mode funcs running
	limitModeFuncWg         *sync.WaitGroup  // allow the executor to wait for limit mode functions to wrap up
	limitModeQueue          chan jobFunction // pass job functions to the limit mode workers
	limitModeQueueMu        *sync.Mutex      // for protecting the limitModeQueue
	limitModeRunningJobs    *atomic.Int64    // tracks the count of running jobs to check against the max
	stopped                 *atomic.Bool     // allow workers to drain the buffered limitModeQueue

	distributedLocker  Locker  // support running jobs across multiple instances
	distributedElector Elector // support running jobs across multiple instances
}

func newExecutor() executor {
	 := executor{
		jobFunctions:          make(chan jobFunction, 1),
		singletonWgs:          &sync.Map{},
		limitModeFuncsRunning: atomic.NewInt64(0),
		limitModeFuncWg:       &sync.WaitGroup{},
		limitModeRunningJobs:  atomic.NewInt64(0),
		limitModeQueueMu:      &sync.Mutex{},
	}
	return 
}

func runJob( jobFunction) {
	panicHandlerMutex.RLock()
	defer panicHandlerMutex.RUnlock()

	if panicHandler != nil {
		defer func() {
			if  := recover();  != nil {
				panicHandler(.funcName, )
			}
		}()
	}
	.runStartCount.Add(1)
	.isRunning.Store(true)
	callJobFunc(.eventListeners.onBeforeJobExecution)
	_ = callJobFuncWithParams(.eventListeners.beforeJobRuns, []interface{}{.getName()})
	 := callJobFuncWithParams(.function, .parameters)
	if  != nil {
		_ = callJobFuncWithParams(.eventListeners.onError, []interface{}{.getName(), })
	} else {
		_ = callJobFuncWithParams(.eventListeners.noError, []interface{}{.getName()})
	}
	_ = callJobFuncWithParams(.eventListeners.afterJobRuns, []interface{}{.getName()})
	callJobFunc(.eventListeners.onAfterJobExecution)
	.isRunning.Store(false)
	.runFinishCount.Add(1)
}

func ( *jobFunction) () {
	.singletonRunnerOn.Store(true)
	.singletonWgMu.Lock()
	.singletonWg.Add(1)
	.singletonWgMu.Unlock()
	for {
		select {
		case <-.ctx.Done():
			.singletonWg.Done()
			.singletonRunnerOn.Store(false)
			.singletonQueueMu.Lock()
			.singletonQueue = make(chan struct{}, 1000)
			.singletonQueueMu.Unlock()
			.stopped.Store(false)
			return
		case <-.singletonQueue:
			if !.stopped.Load() {
				runJob(*)
			}
		}
	}
}

func ( *executor) () {
	for {
		select {
		case <-.ctx.Done():
			.limitModeFuncsRunning.Inc()
			.limitModeFuncWg.Done()
			return
		case  := <-.limitModeQueue:
			if !.stopped.Load() {
				select {
				case <-.ctx.Done():
				default:
					.runJob()
				}
			}
		}
	}
}

func ( *executor) () {
	.wg = &sync.WaitGroup{}
	.wg.Add(1)

	,  := context.WithCancel(context.Background())
	.ctx = 
	.cancel = 

	.jobsWg = &sync.WaitGroup{}

	.stopped = atomic.NewBool(false)
	.skipExecution = atomic.NewBool(false)

	.limitModeQueueMu.Lock()
	.limitModeQueue = make(chan jobFunction, 1000)
	.limitModeQueueMu.Unlock()
	go .run()
}

func ( *executor) ( jobFunction) {
	defer func() {
		if .limitMode == RescheduleMode && .limitModeMaxRunningJobs > 0 {
			.limitModeRunningJobs.Add(-1)
		}
	}()
	switch .runConfig.mode {
	case defaultMode:
		 := .jobName
		if  == "" {
			 = .funcName
		}
		if .distributedElector != nil {
			 := .distributedElector.IsLeader(.ctx)
			if  != nil {
				return
			}
			runJob()
			return
		}
		if .distributedLocker != nil {
			,  := .distributedLocker.Lock(.ctx, )
			if  != nil ||  == nil {
				return
			}
			defer func() {
				 := time.Until(.jobFuncNextRun)
				if  > time.Second*5 {
					 = time.Second * 5
				}

				 := time.Duration(float64() * 0.9)
				if .limitModeMaxRunningJobs > 0 {
					time.AfterFunc(, func() {
						_ = .Unlock(.ctx)
					})
					return
				}

				if  > time.Millisecond*100 {
					 := time.NewTimer()
					defer .Stop()

					select {
					case <-.ctx.Done():
					case <-.C:
					}
				}
				_ = .Unlock(.ctx)
			}()
			runJob()
			return
		}
		runJob()
	case singletonMode:
		.singletonWgs.Store(.singletonWg, .singletonWgMu)

		if !.singletonRunnerOn.Load() {
			go .singletonRunner()
		}
		.singletonQueueMu.Lock()
		.singletonQueue <- struct{}{}
		.singletonQueueMu.Unlock()
	}
}

func ( *executor) () {
	for {
		select {
		case  := <-.jobFunctions:
			if .stopped.Load() || .skipExecution.Load() {
				continue
			}

			if .limitModeMaxRunningJobs > 0 {
				 := .limitModeFuncsRunning.Load()
				if  < int64(.limitModeMaxRunningJobs) {
					 := int64(.limitModeMaxRunningJobs) - 
					for  := int64(0);  < ; ++ {
						.limitModeFuncWg.Add(1)
						go .limitModeRunner()
						.limitModeFuncsRunning.Inc()
					}
				}
			}

			.jobsWg.Add(1)
			go func() {
				defer .jobsWg.Done()

				if .limitModeMaxRunningJobs > 0 {
					switch .limitMode {
					case RescheduleMode:
						if .limitModeRunningJobs.Load() < int64(.limitModeMaxRunningJobs) {
							select {
							case .limitModeQueue <- :
								.limitModeRunningJobs.Inc()
							case <-.ctx.Done():
							}
						}
					case WaitMode:
						select {
						case .limitModeQueue <- :
						case <-.ctx.Done():
						}
					}
					return
				}

				.runJob()
			}()
		case <-.ctx.Done():
			.jobsWg.Wait()
			.wg.Done()
			return
		}
	}
}

func ( *executor) () {
	.stopped.Store(true)
	.cancel()
	.wg.Wait()
	if .singletonWgs != nil {
		.singletonWgs.Range(func(,  interface{}) bool {
			,  := .(*sync.WaitGroup)
			,  := .(*sync.Mutex)
			if  &&  {
				.Lock()
				.Wait()
				.Unlock()
			}
			return true
		})
	}
	if .limitModeMaxRunningJobs > 0 {
		.limitModeFuncWg.Wait()
		.limitModeQueueMu.Lock()
		.limitModeQueue = nil
		.limitModeQueueMu.Unlock()
	}
}