Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pgqueue/manager/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,43 @@ func (exec *exec) RunQueueTask(ctx context.Context, task *schema.Task, result ch
}

func run(ctx context.Context, fn schema.TaskFunc, payload json.RawMessage) (resp *Result) {
return runWithGrace(ctx, fn, payload, time.Minute)
}

func runWithGrace(ctx context.Context, fn schema.TaskFunc, payload json.RawMessage, grace time.Duration) (resp *Result) {
result := make(chan *Result, 1)
go func() {
result <- runCallback(ctx, fn, payload)
}()

select {
case resp := <-result:
return resp
case <-ctx.Done():
cancelErr := ctx.Err()
if cancelErr == nil {
cancelErr = context.Canceled
}

// TTL is cooperative via context cancellation. Give callbacks an
// additional grace window to exit before force-failing the task.
if grace <= 0 {
return types.Ptr(Result{Error: cancelErr})
}

timer := time.NewTimer(grace)
defer timer.Stop()

select {
case resp := <-result:
return resp
case <-timer.C:
return types.Ptr(Result{Error: fmt.Errorf("task did not stop after %s grace period following context cancellation: %w", grace, cancelErr)})
}
}
}

func runCallback(ctx context.Context, fn schema.TaskFunc, payload json.RawMessage) (resp *Result) {
defer func() {
if recovered := recover(); recovered != nil {
var err error
Expand Down
59 changes: 59 additions & 0 deletions pgqueue/manager/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,62 @@ func TestRunQueueTaskRequiresTaskDeadline(t *testing.T) {

exec.Close()
}

func TestRunReturnsOnContextDeadlineWhenCallbackBlocks(t *testing.T) {
unblock := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

start := time.Now()
result := runWithGrace(ctx, func(context.Context, json.RawMessage) (any, error) {
<-unblock
return map[string]bool{"ok": true}, nil
}, nil, 20*time.Millisecond)
elapsed := time.Since(start)
close(unblock)

require.NotNil(t, result)
require.Error(t, result.Error)
assert.True(t, errors.Is(result.Error, context.DeadlineExceeded))
assert.Less(t, elapsed, 500*time.Millisecond)
}

func TestRunReturnsCallbackErrorIfItExitsDuringGrace(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

start := time.Now()
result := runWithGrace(ctx, func(ctx context.Context, _ json.RawMessage) (any, error) {
<-ctx.Done()
return nil, ctx.Err()
}, nil, 100*time.Millisecond)
elapsed := time.Since(start)

require.NotNil(t, result)
require.Error(t, result.Error)
assert.True(t, errors.Is(result.Error, context.DeadlineExceeded))
assert.Less(t, elapsed, 500*time.Millisecond)
}

func TestRunWithGracePreservesCanceledCause(t *testing.T) {
unblock := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())

start := time.Now()
go func() {
time.Sleep(25 * time.Millisecond)
cancel()
}()

result := runWithGrace(ctx, func(context.Context, json.RawMessage) (any, error) {
<-unblock
return map[string]bool{"ok": true}, nil
}, nil, 20*time.Millisecond)
elapsed := time.Since(start)
close(unblock)

require.NotNil(t, result)
require.Error(t, result.Error)
assert.True(t, errors.Is(result.Error, context.Canceled))
assert.Less(t, elapsed, 500*time.Millisecond)
}
6 changes: 6 additions & 0 deletions pgqueue/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func New(ctx context.Context, pool pg.PoolConn, opts ...Opt) (*Manager, error) {
self.PoolConn = pool
}

// Ensure at least one task partition exists before any task inserts happen.
if _, err := self.CreateNextPartition(bootstrapCtx); err != nil {
endBootstrapSpan(err)
return nil, err
}

// Register a maintenance ticker
if _, err := self.RegisterTicker(bootstrapCtx, schema.DefaultMaintenanceTickerName, schema.TickerMeta{
Interval: types.Ptr(schema.DefaultMaintenancePeriod),
Expand Down
89 changes: 89 additions & 0 deletions pgqueue/manager/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -324,6 +325,94 @@ func TestNextTaskWithZeroConcurrencyKeepsUnlimitedBehavior(t *testing.T) {
releaseOnce.Do(func() { close(release) })
}

func TestNextTaskReclaimsExpiredRunningTask(t *testing.T) {
mgr, ctx := test.Begin(t)
defer test.End(t)

beforeSeq, err := mgr.GetPartitionSeq(ctx)
require.NoError(t, err)

nextID := beforeSeq + 1
partitions, err := mgr.ListPartitions(ctx)
require.NoError(t, err)

createdPartition := ""
if !partitionContains(partitions, nextID) {
meta := schema.PartitionMeta{
Partition: "task_partition_queue_reclaim_expired",
Start: 1,
End: 1000000,
}
require.NoError(t, mgr.DeletePartition(ctx, meta.Partition))
require.NoError(t, mgr.CreatePartition(ctx, meta))
createdPartition = meta.Partition
}

ttl := 100 * time.Millisecond
concurrency := uint64(1)
release := make(chan struct{})
var releaseOnce sync.Once
t.Cleanup(func() {
releaseOnce.Do(func() { close(release) })
})
started := make(chan struct{}, 2)
var calls atomic.Uint32

queue, err := mgr.RegisterQueue(ctx, "queue_reclaim_expired", schema.QueueMeta{
TTL: &ttl,
Concurrency: &concurrency,
}, func(context.Context, json.RawMessage) (any, error) {
started <- struct{}{}
if calls.Add(1) == 1 {
<-release
}
return nil, nil
})
require.NoError(t, err)
defer func() {
releaseOnce.Do(func() { close(release) })
_, _ = mgr.DeleteQueue(ctx, queue.Queue)
if createdPartition != "" {
_ = mgr.DeletePartition(ctx, createdPartition)
}
}()

firstTask, err := mgr.CreateTask(ctx, queue.Queue, schema.TaskMeta{Payload: json.RawMessage(`{"task":1}`)})
require.NoError(t, err)
_, err = mgr.CreateTask(ctx, queue.Queue, schema.TaskMeta{Payload: json.RawMessage(`{"task":2}`)})
require.NoError(t, err)

select {
case <-started:
case <-time.After(time.Second):
t.Fatal("timed out waiting for first queue task to start")
}

require.Eventually(t, func() bool {
list, err := mgr.ListTasks(ctx, schema.TaskListRequest{Status: "expired"})
if err != nil {
return false
}
for _, item := range list.Body {
if item.Id == firstTask.Id {
return true
}
}
return false
}, 3*time.Second, 25*time.Millisecond, "timed out waiting for first task to expire")

_, err = mgr.CreateTask(ctx, queue.Queue, schema.TaskMeta{Payload: json.RawMessage(`{"task":3}`)})
require.NoError(t, err)

// With the fix, the expired running task no longer blocks queue progress,
// so another task attempt can start after TTL expiry.
select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for queue progress after expired running task")
}
}

func TestRunQueueTaskResultIsReleasedDone(t *testing.T) {
mgr, ctx := test.Begin(t)
defer test.End(t)
Expand Down
11 changes: 9 additions & 2 deletions pgqueue/manager/run.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package manager

import (
"bytes"
"context"
"encoding/json"
"errors"
"log/slog"
"runtime"
"strings"
Expand Down Expand Up @@ -169,7 +171,12 @@ func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error {
}
status := ""
if _, err := manager.ReleaseTask(resultCtx, result.Task.Id, success, releaseResult, &status); err != nil {
log.ErrorContext(resultCtx, "ReleaseTask failed", "queue", result.Queue, "task", result.Task.Id, "error", err.Error())
if errors.Is(err, pg.ErrNotFound) {
log.WarnContext(resultCtx, "Late queue task result ignored", "queue", result.Queue, "task", result.Task.Id)
queueTimer.Reset(0)
} else {
log.ErrorContext(resultCtx, "ReleaseTask failed", "queue", result.Queue, "task", result.Task.Id, "error", err.Error())
}
continue
}
if result.Error != nil {
Expand All @@ -188,7 +195,7 @@ func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error {
// Completed ticker task
if result.Error != nil {
log.ErrorContext(resultCtx, "RunTickerTask result failed", "ticker", result.Ticker, "error", result.Error.Error())
} else {
} else if len(result.Result) > 0 && !bytes.Equal(result.Result, []byte("null")) {
log.InfoContext(resultCtx, "RunTickerTask result", "ticker", result.Ticker, "result", result)
}
}
Expand Down
8 changes: 7 additions & 1 deletion pgqueue/schema/objects.sql
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ WITH selected AS (
"started_at" IS NOT NULL
AND
"finished_at" IS NULL
AND
("dies_at" IS NULL OR "dies_at" >= NOW())
GROUP BY
"queue"
) retained
Expand All @@ -178,7 +180,11 @@ WITH selected AS (
WHERE
(CARDINALITY(q) = 0 OR t."queue" = ANY(q))
AND
(t."started_at" IS NULL AND t."finished_at" IS NULL)
(
(t."started_at" IS NULL AND t."finished_at" IS NULL)
OR
(t."started_at" IS NOT NULL AND t."finished_at" IS NULL AND t."dies_at" IS NOT NULL AND t."dies_at" < NOW())
)
AND
(t."delayed_at" IS NULL OR t."delayed_at" <= NOW())
AND
Expand Down
Loading