Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pgqueue/manager/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (exec *exec) RunQueueTask(ctx context.Context, task *schema.Task, result ch
exec.RLock()
defer exec.RUnlock()

if task.DiesAt.IsZero() {
if task.DiesAt == nil {
result <- &Result{Queue: task.Queue, Task: task, Error: httpresponse.ErrBadRequest.With("missing task deadline")}
return
}
Expand All @@ -165,7 +165,7 @@ func (exec *exec) RunQueueTask(ctx context.Context, task *schema.Task, result ch
}

// Run the task function with the provided payload and deadline
child, cancel := context.WithDeadline(ctx, task.DiesAt.UTC())
child, cancel := context.WithDeadline(ctx, (*task.DiesAt).UTC())
Comment on lines 167 to +168
exec.wg.Go(func() {
defer cancel()

Expand Down
2 changes: 1 addition & 1 deletion pgqueue/manager/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestRunQueueTaskUsesTaskTTLDeadline(t *testing.T) {
exec := NewExec(nil)
results := make(chan *Result, 1)
diesAt := time.Now().Add(2 * time.Second)
task := &schema.Task{Id: 42, Queue: "queue_deadline", DiesAt: diesAt}
task := &schema.Task{Id: 42, Queue: "queue_deadline", DiesAt: &diesAt}

require.NoError(t, exec.RegisterTask(task.Queue, func(ctx context.Context, _ json.RawMessage) (any, error) {
deadline, ok := ctx.Deadline()
Expand Down
7 changes: 5 additions & 2 deletions pgqueue/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error {
defer func() { endSpan(err) }()

// Process as many tasks as we have capacity for, until there are
// no more tasks or an error occurs.
// no more tasks or an error occurs. Cap at GOMAXPROCS per round so
// we don't starve the event loop when concurrency=0 (unlimited).
processed := false
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
// Get next task for the queue
Expand All @@ -107,7 +108,7 @@ func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error {
processed = true
}

// Return success
// Return success — more tasks may still be waiting.
return true, nil
}

Expand Down Expand Up @@ -176,6 +177,8 @@ func (manager *Manager) Run(ctx context.Context, log *slog.Logger) error {
} else {
log.InfoContext(resultCtx, "RunQueueTask result", "queue", result.Queue, "task", result.Task.Id, "status", status, "result", result)
}
// A slot just freed up — immediately try to pick up another task.
queueTimer.Reset(0)
continue
Comment on lines +180 to 182
case result != nil && result.Ticker != "":
resultCtx := ctx
Expand Down
2 changes: 1 addition & 1 deletion pgqueue/manager/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ func TestCreateTask(t *testing.T) {
assert.Equal(t, queue.Queue, task.Queue)
assert.JSONEq(t, string(payload), string(task.Payload))
assert.NotZero(t, task.Id)
assert.False(t, task.DiesAt.IsZero())
assert.Nil(t, task.DiesAt) // dies_at is only set when a worker locks the task
}
51 changes: 21 additions & 30 deletions pgqueue/schema/objects.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,13 @@ CREATE TABLE IF NOT EXISTS ${"schema"}."task" (
"delayed_at" TIMESTAMPTZ,
"started_at" TIMESTAMPTZ,
"finished_at" TIMESTAMPTZ,
"dies_at" TIMESTAMPTZ NOT NULL,
"dies_at" TIMESTAMPTZ,
"retries" INTEGER NOT NULL CHECK ("retries" >= 0),
"initial_retries" INTEGER NOT NULL CHECK ("initial_retries" >= 0),
PRIMARY KEY ("id"),
FOREIGN KEY ("queue") REFERENCES ${"schema"}."queue" ("queue") ON DELETE CASCADE
) PARTITION BY RANGE (id);

UPDATE ${"schema"}."task" t
SET "dies_at" = COALESCE(t."finished_at", t."delayed_at", t."created_at", NOW()) + q."ttl"
FROM ${"schema"}."queue" q
WHERE t."queue" = q."queue"
AND t."dies_at" IS NULL;

ALTER TABLE ${"schema"}."task"
ALTER COLUMN "dies_at" SET NOT NULL;

-- pgqueue.queue_status_index
-- Covers 'new' and 'retry' states
CREATE INDEX IF NOT EXISTS "task_queue_status_idx"
Expand Down Expand Up @@ -97,7 +88,7 @@ CREATE OR REPLACE FUNCTION ${"schema"}.queue_task_status(
) RETURNS ${"schema"}.task_status AS $$
SELECT CASE
WHEN started_at IS NOT NULL AND finished_at IS NOT NULL THEN 'done'::${"schema"}.task_status
WHEN dies_at < NOW() THEN 'expired'::${"schema"}.task_status
WHEN started_at IS NOT NULL AND finished_at IS NULL AND dies_at IS NOT NULL AND dies_at < NOW() THEN 'expired'::${"schema"}.task_status
WHEN started_at IS NULL AND finished_at IS NULL AND retries = 0 THEN 'failed'::${"schema"}.task_status
WHEN started_at IS NULL AND finished_at IS NULL AND retries = initial_retries
THEN 'new'::${"schema"}.task_status
Expand All @@ -111,30 +102,26 @@ $$ LANGUAGE SQL STABLE;
CREATE OR REPLACE FUNCTION ${"schema"}.queue_insert(q TEXT, p JSONB, delayed_at TIMESTAMPTZ) RETURNS BIGINT AS $$
DECLARE
v_retries INTEGER;
v_dies_at TIMESTAMPTZ;
v_id BIGINT;
BEGIN
-- Get queue defaults, raise if queue doesn't exist
SELECT
"retries", CASE
WHEN delayed_at IS NULL OR delayed_at < NOW() THEN NOW() + "ttl"
ELSE delayed_at + "ttl"
END
"retries"
INTO STRICT
v_retries, v_dies_at
v_retries
FROM
${"schema"}."queue"
WHERE
"queue" = q;

-- Insert the task
INSERT INTO ${"schema"}."task" ("queue", "payload", "delayed_at", "retries", "initial_retries", "dies_at")
INSERT INTO ${"schema"}."task" ("queue", "payload", "delayed_at", "retries", "initial_retries")
VALUES (
q, p, CASE
WHEN delayed_at IS NULL THEN NULL
WHEN delayed_at < NOW() THEN NOW()
ELSE delayed_at
END, v_retries, v_retries, v_dies_at
END, v_retries, v_retries
)
RETURNING "id" INTO v_id;

Expand Down Expand Up @@ -163,13 +150,10 @@ CREATE OR REPLACE TRIGGER queue_insert_trigger

-- pgqueue.queue_lock_func
CREATE OR REPLACE FUNCTION ${"schema"}.queue_lock(q TEXT[], w TEXT) RETURNS BIGINT AS $$
UPDATE ${"schema"}."task" SET
"started_at" = NOW(),
"worker" = w,
"result" = 'null'
WHERE "id" = (
WITH selected AS (
SELECT
t."id"
t."id",
queue."ttl"
FROM
${"schema"}."task" t
JOIN
Expand All @@ -195,8 +179,6 @@ WHERE "id" = (
(CARDINALITY(q) = 0 OR t."queue" = ANY(q))
AND
(t."started_at" IS NULL AND t."finished_at" IS NULL)
AND
t."dies_at" > NOW()
AND
(t."delayed_at" IS NULL OR t."delayed_at" <= NOW())
AND
Expand All @@ -207,14 +189,22 @@ WHERE "id" = (
t."created_at"
FOR UPDATE OF t SKIP LOCKED LIMIT 1
)
RETURNING "id";
UPDATE ${"schema"}."task" SET
"started_at" = NOW(),
"worker" = w,
"result" = 'null',
"dies_at" = NOW() + selected."ttl"
FROM selected
WHERE ${"schema"}."task"."id" = selected."id"
RETURNING ${"schema"}."task"."id";
$$ LANGUAGE SQL;

-- pgqueue.queue_unlock_func
CREATE OR REPLACE FUNCTION ${"schema"}.queue_unlock(tid BIGINT, r JSONB) RETURNS BIGINT AS $$
UPDATE ${"schema"}."task" SET
"finished_at" = NOW(),
"result" = r
"result" = r,
"dies_at" = NULL
WHERE
"id" = tid
AND
Expand Down Expand Up @@ -246,7 +236,8 @@ CREATE OR REPLACE FUNCTION ${"schema"}.queue_fail(tid BIGINT, r JSONB) RETURNS B
"result" = r,
"started_at" = NULL,
"finished_at" = NULL,
"delayed_at" = ${"schema"}.queue_backoff(tid)
"delayed_at" = ${"schema"}.queue_backoff(tid),
"dies_at" = NULL
WHERE
"id" = tid
AND
Expand Down
2 changes: 1 addition & 1 deletion pgqueue/schema/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Task struct {
CreatedAt *time.Time `json:"created_at,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
DiesAt time.Time `json:"dies_at,omitempty"`
DiesAt *time.Time `json:"dies_at,omitempty"`
Retries *uint64 `json:"retries,omitempty"`
Comment on lines 43 to 46
}

Expand Down
Loading