diff --git a/pgqueue/manager/run.go b/pgqueue/manager/run.go index 6b2e15e..9d070c7 100644 --- a/pgqueue/manager/run.go +++ b/pgqueue/manager/run.go @@ -83,6 +83,7 @@ func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error { // Process as many tasks as we have capacity for, until there are // no more tasks or an error occurs. + processed := false for i := 0; i < runtime.GOMAXPROCS(0); i++ { // Get next task for the queue var task *schema.Task @@ -94,13 +95,16 @@ func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error { if err != nil { return false, err } else if task == nil { - // No more tasks - return false, nil + // No more tasks can be retained right now. If we processed at + // least one task in this pass, ask the scheduler to check again + // soon to keep draining the queue. + return processed, nil } // Run the task callback - results are logged in the callback, // so we don't need to do anything with them here. manager.queues.RunQueueTask(child, task, results) + processed = true } // Return success