package runners import ( "context" "fmt" "os" "runtime" "sync" "codeberg.org/gruf/go-errors/v2" ) // WorkerFunc represents a function processable by a worker in WorkerPool. Note // that implementations absolutely MUST check whether passed context is <-ctx.Done() // otherwise stopping the pool may block indefinitely. type WorkerFunc func(context.Context) // WorkerPool provides a means of enqueuing asynchronous work. type WorkerPool struct { fns chan WorkerFunc svc Service } // Start will start the main WorkerPool management loop in a new goroutine, along // with requested number of child worker goroutines. Returns false if already running. func (pool *WorkerPool) Start(workers int, queue int) bool { // Attempt to start the svc ctx, ok := pool.svc.doStart() if !ok { return false } if workers <= 0 { // Use $GOMAXPROCS as default. workers = runtime.GOMAXPROCS(0) } if queue < 0 { // Use reasonable queue default. queue = workers * 10 } // Allocate pool queue of given size. // // This MUST be set BEFORE we return and NOT in // the launched goroutine, or there is a risk that // the pool may appear as closed for a short time // until the main goroutine has been entered. fns := make(chan WorkerFunc, queue) pool.fns = fns go func() { defer func() { // unlock single wait pool.svc.wait.Unlock() // ensure stopped pool.svc.Stop() }() var wait sync.WaitGroup // Start goroutine worker functions for i := 0; i < workers; i++ { wait.Add(1) go func() { defer wait.Done() // Run worker function (retry on panic) for !worker_run(CancelCtx(ctx), fns) { } }() } // Wait on ctx <-ctx // Drain function queue. // // All functions in the queue MUST be // run, so we pass them a closed context. // // This mainly allows us to block until // the function queue is empty, as worker // functions will also continue draining in // the background with the (now) closed ctx. for !drain_queue(fns) { // retry on panic } // Now the queue is empty, we can // safely close the channel signalling // all of the workers to return. close(fns) wait.Wait() }() return true } // Stop will stop the WorkerPool management loop, blocking until stopped. func (pool *WorkerPool) Stop() bool { return pool.svc.Stop() } // Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping). func (pool *WorkerPool) Running() bool { return pool.svc.Running() } // Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions. func (pool *WorkerPool) Done() <-chan struct{} { return pool.svc.Done() } // Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. // This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be // executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. // WorkerFuncs MUST respect the passed context. func (pool *WorkerPool) Enqueue(fn WorkerFunc) { // Check valid fn if fn == nil { return } select { // Pool ctx cancelled case <-pool.svc.Done(): fn(closedctx) // Placed fn in queue case pool.fns <- fn: } } // EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the // case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc. func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool { // Check valid fn if fn == nil { return false } select { // Caller ctx cancelled case <-ctx.Done(): return false // Pool ctx cancelled case <-pool.svc.Done(): return false // Placed fn in queue case pool.fns <- fn: return true } } // MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case // that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue(). // Return boolean indicates whether function was executed in time before <-ctx.Done() is closed. func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) { // Check valid fn if fn == nil { return false } select { case <-ctx.Done(): // We failed to add this entry to the worker queue before the // incoming context was cancelled. So to ensure processing // we simply queue it asynchronously and return early to caller. go pool.Enqueue(fn) return false case <-pool.svc.Done(): // Pool ctx cancelled fn(closedctx) return false case pool.fns <- fn: // Placed fn in queue return true } } // EnqueueNow attempts Enqueue but returns false if not executed. func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool { // Check valid fn if fn == nil { return false } select { // Pool ctx cancelled case <-pool.svc.Done(): return false // Placed fn in queue case pool.fns <- fn: return true // Queue is full default: return false } } // Queue returns the number of currently queued WorkerFuncs. func (pool *WorkerPool) Queue() int { var l int pool.svc.While(func() { l = len(pool.fns) }) return l } // worker_run is the main worker routine, accepting functions from 'fns' until it is closed. func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool { defer func() { // Recover and drop any panic if r := recover(); r != nil { // Gather calling func frames. pcs := make([]uintptr, 10) n := runtime.Callers(3, pcs) i := runtime.CallersFrames(pcs[:n]) c := gatherFrames(i, n) const msg = "worker_run: recovered panic: %v\n\n%s\n" fmt.Fprintf(os.Stderr, msg, r, c.String()) } }() for { // Wait on next func fn, ok := <-fns if !ok { return true } // Run with ctx fn(ctx) } } // drain_queue will drain and run all functions in worker queue, passing in a closed context. func drain_queue(fns <-chan WorkerFunc) bool { defer func() { // Recover and drop any panic if r := recover(); r != nil { // Gather calling func frames. pcs := make([]uintptr, 10) n := runtime.Callers(3, pcs) i := runtime.CallersFrames(pcs[:n]) c := gatherFrames(i, n) const msg = "worker_run: recovered panic: %v\n\n%s\n" fmt.Fprintf(os.Stderr, msg, r, c.String()) } }() for { select { // Run with closed ctx case fn := <-fns: fn(closedctx) // Queue is empty default: return true } } } // gatherFrames collates runtime frames from a frame iterator. func gatherFrames(iter *runtime.Frames, n int) errors.Callers { if iter == nil { return nil } frames := make([]runtime.Frame, 0, n) for { f, ok := iter.Next() if !ok { break } frames = append(frames, f) } return frames }