Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pgqueue/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error {

// Process as many tasks as we have capacity for, until there are
// no more tasks or an error occurs.
processed := false
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
// Get next task for the queue
Comment on lines +86 to 88
var task *schema.Task
Expand All @@ -94,13 +95,16 @@ func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error {
if err != nil {
return false, err
} else if task == nil {
// No more tasks
return false, nil
// No more tasks can be retained right now. If we processed at
// least one task in this pass, ask the scheduler to check again
// soon to keep draining the queue.
return processed, nil
}

// Run the task callback - results are logged in the callback,
// so we don't need to do anything with them here.
manager.queues.RunQueueTask(child, task, results)
processed = true
}

// Return success
Expand Down
Loading