Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 200 additions & 33 deletions apps/api/src/cora/api/_run_supervisor.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions apps/api/src/cora/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
list_runs=app.state.run.list_runs,
hold_run=app.state.run.hold_run,
resume_run=app.state.run.resume_run,
truncate_run=app.state.run.truncate_run,
),
run_initiator_lifespan(
deps,
Expand Down
17 changes: 14 additions & 3 deletions apps/api/src/cora/decision/aggregates/decision/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,12 @@

# Closed `choice` value set for `context = "RunSupervision"` Decisions.
# Projection-validated, not domain-enforced (the open-string
# `DecisionContext` + `DecisionChoice` shape is preserved). Ten values:
# `DecisionContext` + `DecisionChoice` shape is preserved). Eleven values:
# five beam-Hold/Resume + two audit-fallback + three advise-rung
# observe->advise dispositions (Quieted / Stalled / Breached), the last
# three Decision-only (one per breach edge, never a command).
# observe->advise dispositions (Quieted / Stalled / Breached, Decision-only,
# one per breach edge) + one liveness act rung (Truncate) that escalates the
# run-liveness advise (Quieted) to a terminal command once a Run stays
# implausibly long for the operator settle window.
#
# - `Continue` -- no wind-down trigger met; no command
# issued (the NoAction-bias default).
Expand Down Expand Up @@ -338,6 +340,13 @@
# (Rule Q). Decision-only. (Named by naming-r3:
# the limit was breached, an objective edge,
# not the supervisor's epistemic state.)
# - `Truncate` -- liveness act rung: issues TruncateRun (the
# terminal partial-data exit) for a Run that has
# stayed implausibly long past the operator
# ceiling for the settle window; the escalation of
# the SupervisionQuieted advise. A bare verb
# mirroring the TruncateRun command, as Hold /
# Resume mirror HoldRun / ResumeRun.
RunSupervisionChoice = Literal[
"Continue",
"Hold",
Expand All @@ -349,6 +358,7 @@
"SupervisionQuieted",
"SupervisionStalled",
"SupervisionBreached",
"Truncate",
]
RUN_SUPERVISION_CHOICES: Final = frozenset(
{
Expand All @@ -362,6 +372,7 @@
"SupervisionQuieted",
"SupervisionStalled",
"SupervisionBreached",
"Truncate",
}
)

Expand Down
25 changes: 25 additions & 0 deletions apps/api/src/cora/infrastructure/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,19 @@ class Settings(BaseSettings):
# Decision and issues no command.
run_liveness_ceiling_seconds: float | None = None

# `run_supervisor_truncate_enabled` is the ACT rung of the run-liveness rule:
# a SEPARATE opt-in (default off) above `run_liveness_ceiling_seconds` that
# lets the supervisor autonomously issue TruncateRun (the terminal,
# partial-data exit) for a Run that has stayed past the operator ceiling. It
# is inert unless the ceiling is set (the rule's own gate) AND
# run_supervisor_enabled is on. `run_supervisor_truncate_settle_ticks` is the
# anti-flap settle window: the Run must read liveness-stale for this many
# CONSECUTIVE ticks before the terminal truncate fires (>= 1), so a
# transiently-stale or recovering Run is never killed. Truncate is terminal,
# so the default settle is higher than the resume wind-up's.
run_supervisor_truncate_enabled: bool = False
run_supervisor_truncate_settle_ticks: int = 3

# Observation-signal closed-loop rules (SHADOW, inside the RunSupervisor
# loop; [[project_observation_signal_port_design]]). Both default OFF and
# are a second off-gate above run_supervisor_enabled.
Expand Down Expand Up @@ -602,6 +615,18 @@ def _validate_run_initiator_max_in_flight(cls, value: int) -> int:
raise ValueError(msg)
return value

@field_validator("run_supervisor_truncate_settle_ticks")
@classmethod
def _validate_run_supervisor_truncate_settle_ticks(cls, value: int) -> int:
"""Floor of 1: a terminal truncate needs at least one stale read first."""
if value < 1:
msg = (
f"run_supervisor_truncate_settle_ticks must be >= 1, got {value}; "
"an autonomous truncate requires at least one liveness-stale read"
)
raise ValueError(msg)
return value

@field_validator("run_liveness_ceiling_seconds")
@classmethod
def _validate_run_liveness_ceiling_seconds(cls, value: float | None) -> float | None:
Expand Down
26 changes: 22 additions & 4 deletions apps/api/src/cora/run/aggregates/run/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,16 +572,24 @@ class RunTruncated:

Stopped vs Truncated (lifecycle-layer distinction): Stopped
is a controlled exit while the system is responsive; Truncated
is a cleanup mechanism for known-dead Runs. The system itself
does not detect de-facto-dead Runs (separate liveness concern,
out of scope for 6f-4); operators must invoke truncate
explicitly.
is a cleanup mechanism for known-dead Runs. Truncate is operator-
driven, OR autonomous: the RunSupervisor's run-liveness act rung
truncates a Run that has stayed implausibly long past the operator
ceiling (the de-facto-dead detection the original truncate left to
a human), attributed via the event principal and `decided_by_decision_id`.

`decided_by_decision_id` (mirrors RunHeld / RunAdjusted): optional
Decision-causation link, set when an agent (or a Decision-justified
operator action) truncated the Run; None for a bare operator truncate.
Forward-compat via `payload.get("decided_by_decision_id")` for legacy
streams.
"""

run_id: UUID
reason: str
interrupted_at: datetime | None
occurred_at: datetime
decided_by_decision_id: UUID | None = None


# Discriminated union of every event the Run aggregate emits.
Expand Down Expand Up @@ -737,13 +745,17 @@ def to_payload(event: RunEvent) -> dict[str, Any]:
reason=reason,
interrupted_at=interrupted_at,
occurred_at=occurred_at,
decided_by_decision_id=decided_by_decision_id,
):
interrupted_at_iso = interrupted_at.isoformat() if interrupted_at is not None else None
return {
"run_id": str(run_id),
"reason": reason,
"interrupted_at": interrupted_at_iso,
"occurred_at": occurred_at.isoformat(),
"decided_by_decision_id": (
str(decided_by_decision_id) if decided_by_decision_id is not None else None
),
}
case RunAdjusted(
run_id=run_id,
Expand Down Expand Up @@ -962,6 +974,7 @@ def _build_run_stopped() -> RunStopped:

def _build_run_truncated() -> RunTruncated:
raw_interrupted_at = payload["interrupted_at"]
raw_decided_by_truncated = payload.get("decided_by_decision_id")
return RunTruncated(
run_id=UUID(payload["run_id"]),
reason=payload["reason"],
Expand All @@ -971,6 +984,11 @@ def _build_run_truncated() -> RunTruncated:
else None
),
occurred_at=datetime.fromisoformat(payload["occurred_at"]),
decided_by_decision_id=(
UUID(raw_decided_by_truncated)
if raw_decided_by_truncated is not None
else None
),
)

return deserialize_or_raise("RunTruncated", _build_run_truncated)
Expand Down
7 changes: 7 additions & 0 deletions apps/api/src/cora/run/features/truncate_run/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,10 @@ class TruncateRun:
run_id: UUID
reason: str
interrupted_at: datetime | None
# Optional Decision-causation link (mirrors HoldRun / AbortRun /
# AdjustRun / StartRun). Lets an autonomous truncate (the RunSupervisor
# run-liveness act rung) point back to the Decision that justified it; an
# operator-driven truncate leaves it None. No cross-BC existence check at
# the decider, per the eventual-consistency stance. Additive default so
# existing callers (REST / MCP, which do not expose it) are unaffected.
decided_by_decision_id: UUID | None = None
1 change: 1 addition & 0 deletions apps/api/src/cora/run/features/truncate_run/decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ def decide(
reason=reason.value,
interrupted_at=command.interrupted_at,
occurred_at=now,
decided_by_decision_id=command.decided_by_decision_id,
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from cora.run.features.resume_run import bind as bind_resume_run
from cora.run.features.start_run import StartRun
from cora.run.features.start_run import bind as bind_start_run
from cora.run.features.truncate_run import bind as bind_truncate_run
from cora.run.ports import InMemoryRunChannelLookup
from cora.safety._projections import register_safety_projections
from cora.safety.adapters import PostgresClearanceLookup
Expand Down Expand Up @@ -369,6 +370,7 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool)
list_runs = bind_list_runs(deps)
hold_run = bind_hold_run(deps)
resume_run = bind_resume_run(deps)
truncate_run = bind_truncate_run(deps)
memory: dict[UUID, str] = {}
settle: dict[UUID, int] = {}

Expand All @@ -388,6 +390,10 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool)
stall=set(),
stall_streak={},
feed_dead_warned=set(),
truncate_run=truncate_run,
truncate_settle={},
truncate_enabled=False,
truncate_settle_ticks=3,
liveness_ceiling_seconds=None,
advise_enabled=False,
resume_enabled=True,
Expand All @@ -413,6 +419,10 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool)
stall=set(),
stall_streak={},
feed_dead_warned=set(),
truncate_run=truncate_run,
truncate_settle={},
truncate_enabled=False,
truncate_settle_ticks=3,
liveness_ceiling_seconds=None,
advise_enabled=False,
resume_enabled=True,
Expand Down Expand Up @@ -447,6 +457,7 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo
list_runs = bind_list_runs(deps)
hold_run = bind_hold_run(deps)
resume_run = bind_resume_run(deps)
truncate_run = bind_truncate_run(deps)
memory: dict[UUID, str] = {}
settle: dict[UUID, int] = {}

Expand All @@ -466,6 +477,10 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo
stall=set(),
stall_streak={},
feed_dead_warned=set(),
truncate_run=truncate_run,
truncate_settle={},
truncate_enabled=False,
truncate_settle_ticks=3,
liveness_ceiling_seconds=None,
advise_enabled=False,
resume_enabled=True,
Expand Down Expand Up @@ -498,6 +513,10 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo
stall=set(),
stall_streak={},
feed_dead_warned=set(),
truncate_run=truncate_run,
truncate_settle={},
truncate_enabled=False,
truncate_settle_ticks=3,
liveness_ceiling_seconds=None,
advise_enabled=False,
resume_enabled=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ async def test_truncate_run_persists_with_interrupted_at_and_round_trips_to_trun
"reason": "weekend power loss; abandoned at projection 487 of 1500",
"interrupted_at": _INTERRUPTED_AT.isoformat(),
"occurred_at": _NOW.isoformat(),
"decided_by_decision_id": None,
}
# The two timestamps stay distinct through jsonb (operator-
# supplied interrupted_at on Saturday, system-recorded
Expand Down
Loading
Loading