diff --git a/apps/api/src/cora/api/_run_supervisor.py b/apps/api/src/cora/api/_run_supervisor.py index 9a6b0dc25e2..a1a96e05041 100644 --- a/apps/api/src/cora/api/_run_supervisor.py +++ b/apps/api/src/cora/api/_run_supervisor.py @@ -102,6 +102,7 @@ RunBeamAvailabilityUnknownError, RunCannotHoldError, RunCannotResumeError, + RunCannotTruncateError, RunClearanceCoverageMismatchError, RunEnclosureCoverageMismatchError, RunNotFoundError, @@ -116,6 +117,7 @@ from cora.run.features.hold_run import HoldRun from cora.run.features.list_runs import ListRuns from cora.run.features.resume_run import ResumeRun +from cora.run.features.truncate_run import TruncateRun from cora.run.ports import InMemoryRunChannelLookup from cora.shared.identity import ActorId @@ -134,6 +136,7 @@ from cora.run.features.list_runs.handler import Handler as ListRunsHandler from cora.run.features.list_runs.query import RunStatusFilter from cora.run.features.resume_run.handler import Handler as ResumeRunHandler + from cora.run.features.truncate_run.handler import Handler as TruncateRunHandler from cora.run.ports import RunChannelLookup _log = get_logger(__name__) @@ -365,6 +368,12 @@ def _reasoning_for(choice: str) -> str: "every needed supply is Available, and the shutters are open); resumed " "the Run the supervisor itself held." ) + if choice == "Truncate": + return ( + "The Run stayed Running implausibly long past the operator run-age " + "ceiling, without progressing, for the full settle window; truncated it " + "as de-facto hung to free the beamline. Partial data may be salvageable." + ) return ( "Beam still unavailable but the operator resumed the Run; deferring to " "the operator (no re-hold for this outage)." @@ -493,6 +502,68 @@ async def _record_supervision_advice( _log.info("run_supervisor.decision_already_written", choice=choice) +async def _record_truncate_decision( + deps: Kernel, + *, + decision_id: UUID, + run_id: UUID, + running_seconds: int, + ceiling_seconds: float, + settle_count: int, +) -> None: + """Append one DecisionRegistered for an autonomous liveness truncate (the act + rung): context=RunSupervision, choice=Truncate. + + Beam-free (the liveness rule runs before the beam read; the evidence is the + Run age vs the operator ceiling, not beam state). Accepts the decision_id so + the issued TruncateRun links back via decided_by_decision_id (mirroring the + beam-Hold path). A ConcurrencyError on a re-derived id is treated as success + (same posture as `_record_decision`).""" + now = deps.clock.now() + domain_event = DecisionRegistered( + decision_id=decision_id, + decided_by=ActorId(RUN_SUPERVISOR_AGENT_ID), + context=DecisionContext(DECISION_CONTEXT_RUN_SUPERVISION).value, + choice=DecisionChoice("Truncate").value, + parent_id=None, + override_kind=None, + rule=DecisionRule(_RULE).value, + reasoning=validate_reasoning(_reasoning_for("Truncate")), + confidence=validate_confidence(None), + confidence_source=DecisionConfidenceSource.SELF_REPORTED, + alternatives=(), + inputs=validate_inputs( + { + "run_id": str(run_id), + "running_seconds": str(running_seconds), + "ceiling_seconds": str(ceiling_seconds), + "settle_ticks": str(settle_count), + } + ), + reasoning_signature=None, + occurred_at=now, + ) + new_event = to_new_event( + event_type=event_type_name(domain_event), + payload=to_payload(domain_event), + occurred_at=now, + event_id=uuid5(decision_id, "event:0"), + command_name=_COMMAND_NAME, + correlation_id=deps.id_generator.new_id(), + causation_id=None, + principal_id=RUN_SUPERVISOR_AGENT_ID, + ) + try: + await deps.event_store.append( + stream_type=_STREAM_TYPE, + stream_id=decision_id, + expected_version=0, + events=[new_event], + ) + except ConcurrencyError: + _log.info("run_supervisor.decision_already_written", choice="Truncate") + + async def _issue_hold( deps: Kernel, hold_run: HoldRunHandler, @@ -543,6 +614,45 @@ async def _issue_resume( _log.warning("run_supervisor.resume_unauthorized", run_id=str(run_id)) +async def _issue_truncate( + deps: Kernel, + truncate_run: TruncateRunHandler, + *, + run_id: UUID, + running_seconds: int, + ceiling_seconds: float, + decision_id: UUID, +) -> None: + """Issue TruncateRun through the authorized handler; benign no-op on state race. + + The terminal act rung of the run-liveness rule. `interrupted_at` is None: the + supervisor knows the Run is hung but not the exact interruption instant.""" + try: + await truncate_run( + TruncateRun( + run_id=run_id, + reason=( + f"Autonomously truncated by the RunSupervisor: the Run stayed Running " + f"{running_seconds}s, past the operator run-age ceiling of " + f"{int(ceiling_seconds)}s, without progressing (de-facto hung)." + ), + interrupted_at=None, + decided_by_decision_id=decision_id, + ), + principal_id=RUN_SUPERVISOR_AGENT_ID, + correlation_id=deps.id_generator.new_id(), + surface_id=NIL_SENTINEL_ID, + ) + except (RunCannotTruncateError, RunNotFoundError) as exc: + # The Run changed under us between read and issue (someone else acted, or + # it already terminated): a benign no-op, not an error. + _log.info("run_supervisor.truncate_skipped", run_id=str(run_id), reason=type(exc).__name__) + except UnauthorizedError: + # Configuration fault: the supervisor principal is not granted TruncateRun. + # Log loudly; take no autonomous action (the Run keeps Running). + _log.warning("run_supervisor.truncate_unauthorized", run_id=str(run_id)) + + async def _assemble_and_check_envelope( deps: Kernel, item: RunSummaryItem, @@ -796,12 +906,14 @@ async def _supervise_tick( list_runs: ListRunsHandler, hold_run: HoldRunHandler, resume_run: ResumeRunHandler, + truncate_run: TruncateRunHandler, beam_lookup: BeamAvailabilityLookup, channel_lookup: RunChannelLookup, rules_config: ObservationRuleConfig, memory: dict[UUID, str], settle: dict[UUID, int], liveness: set[UUID], + truncate_settle: dict[UUID, int], quality: set[UUID], stall: set[UUID], stall_streak: dict[UUID, int], @@ -809,6 +921,8 @@ async def _supervise_tick( resume_enabled: bool, resume_settle_ticks: int, liveness_ceiling_seconds: float | None, + truncate_enabled: bool, + truncate_settle_ticks: int, advise_enabled: bool, ) -> None: """One supervision pass over all in-flight Runs (hold + gated resume + @@ -840,6 +954,9 @@ async def _supervise_tick( for run_id in list(liveness): if run_id not in inflight_ids: liveness.discard(run_id) + for run_id in list(truncate_settle): + if run_id not in inflight_ids: + del truncate_settle[run_id] for run_id in list(stall_streak): if run_id not in inflight_ids: del stall_streak[run_id] @@ -853,46 +970,79 @@ async def _supervise_tick( if run_id not in inflight_ids: feed_dead_warned.discard(run_id) - # Shadow run-liveness pass (the run-liveness rule, v1): OBSERVE-ONLY. It logs - # which Running Runs it WOULD flag as implausibly long (now - running_since - # past the operator ceiling) and records nothing, issues no command. Run - # before the beam read so it is independent of beam I/O. Off unless the - # operator set a ceiling. Edge-triggered via `liveness` (a set walled off - # from the beam-Hold `memory`): log once per stall episode; clearing on - # not-stale lets it re-log if a resumed Run goes stale again. + # Run-liveness pass (the run-liveness rule): flags a Running Run that has + # been Running implausibly long (now - running_since past the operator + # ceiling). Runs before the beam read so it is independent of beam I/O. Off + # unless the operator set a ceiling. Three rungs, each a further opt-in: + # - SHADOW (always, when a ceiling is set): log `run_liveness.would_flag` + # once per stall episode, edge-triggered via `liveness`. + # - ADVISE (advise_enabled): record one Decision(choice=SupervisionQuieted) + # on the same edge, still no command. + # - ACT (truncate_enabled): count CONSECUTIVE stale ticks in + # `truncate_settle`; once the settle window elapses, record one + # Decision(choice=Truncate) and issue TruncateRun (terminal). The settle + # window is the fail-safe: a transiently-stale or recovering Run, which + # clears the counter on its first non-stale tick, is never truncated. if liveness_ceiling_seconds is not None: now = deps.clock.now() for item in running: running_since = item.running_since - if running_since is not None and is_run_stale( + if running_since is None or not is_run_stale( running_since, now, liveness_ceiling_seconds ): - if item.run_id not in liveness: - liveness.add(item.run_id) - running_seconds = int((now - running_since).total_seconds()) - _log.info( - "run_liveness.would_flag", - run_id=str(item.run_id), + liveness.discard(item.run_id) + truncate_settle.pop(item.run_id, None) + continue + running_seconds = int((now - running_since).total_seconds()) + if item.run_id not in liveness: + liveness.add(item.run_id) + _log.info( + "run_liveness.would_flag", + run_id=str(item.run_id), + running_seconds=running_seconds, + ceiling_seconds=liveness_ceiling_seconds, + ) + if advise_enabled: + await _record_supervision_advice( + deps, + run_id=item.run_id, + choice="SupervisionQuieted", + inputs={ + "running_seconds": str(running_seconds), + "ceiling_seconds": str(liveness_ceiling_seconds), + }, + reasoning=( + "The Run has been Running far past the operator run-age " + "ceiling without progressing; flagged for a human to check " + "whether it is hung. No command issued (advise rung)." + ), + ) + if truncate_enabled: + tick_count = truncate_settle.get(item.run_id, 0) + 1 + truncate_settle[item.run_id] = tick_count + if tick_count >= truncate_settle_ticks: + decision_id = deps.id_generator.new_id() + await _record_truncate_decision( + deps, + decision_id=decision_id, + run_id=item.run_id, running_seconds=running_seconds, ceiling_seconds=liveness_ceiling_seconds, + settle_count=tick_count, ) - if advise_enabled: - await _record_supervision_advice( - deps, - run_id=item.run_id, - choice="SupervisionQuieted", - inputs={ - "running_seconds": str(running_seconds), - "ceiling_seconds": str(liveness_ceiling_seconds), - }, - reasoning=( - "The Run has been Running far past the operator run-age " - "ceiling without progressing; flagged for a human to check " - "whether it is hung. No command issued (advise rung)." - ), - ) - else: - liveness.discard(item.run_id) + await _issue_truncate( + deps, + truncate_run, + run_id=item.run_id, + running_seconds=running_seconds, + ceiling_seconds=liveness_ceiling_seconds, + decision_id=decision_id, + ) + # Leave the counter: the Run should leave `running` next tick + # as Truncated; the GC drops the entry once it is no longer + # in-flight. If the truncate was a benign no-op (state race), + # a still-Running Run re-counts from here next tick. + truncate_settle.pop(item.run_id, None) # Own-holds-only: only a Held Run the supervisor itself holds is a resume # candidate. Empty unless the wind-up is explicitly enabled. @@ -979,6 +1129,7 @@ async def _supervise_loop( list_runs: ListRunsHandler, hold_run: HoldRunHandler, resume_run: ResumeRunHandler, + truncate_run: TruncateRunHandler, beam_lookup: BeamAvailabilityLookup, channel_lookup: RunChannelLookup, rules_config: ObservationRuleConfig, @@ -986,12 +1137,15 @@ async def _supervise_loop( resume_enabled: bool, resume_settle_ticks: int, liveness_ceiling_seconds: float | None, + truncate_enabled: bool, + truncate_settle_ticks: int, advise_enabled: bool, ) -> None: """Periodic supervision loop. A failed tick is logged; the next tick retries.""" memory: dict[UUID, str] = {} settle: dict[UUID, int] = {} liveness: set[UUID] = set() + truncate_settle: dict[UUID, int] = {} quality: set[UUID] = set() stall: set[UUID] = set() stall_streak: dict[UUID, int] = {} @@ -1004,12 +1158,14 @@ async def _supervise_loop( list_runs=list_runs, hold_run=hold_run, resume_run=resume_run, + truncate_run=truncate_run, beam_lookup=beam_lookup, channel_lookup=channel_lookup, rules_config=rules_config, memory=memory, settle=settle, liveness=liveness, + truncate_settle=truncate_settle, quality=quality, stall=stall, stall_streak=stall_streak, @@ -1017,6 +1173,8 @@ async def _supervise_loop( resume_enabled=resume_enabled, resume_settle_ticks=resume_settle_ticks, liveness_ceiling_seconds=liveness_ceiling_seconds, + truncate_enabled=truncate_enabled, + truncate_settle_ticks=truncate_settle_ticks, advise_enabled=advise_enabled, ) if read_denied: @@ -1057,6 +1215,7 @@ async def run_supervisor_lifespan( list_runs: ListRunsHandler, hold_run: HoldRunHandler, resume_run: ResumeRunHandler, + truncate_run: TruncateRunHandler, beam_lookup: BeamAvailabilityLookup | None = None, channel_lookup: RunChannelLookup | None = None, interval_seconds: float | None = None, @@ -1066,8 +1225,10 @@ async def run_supervisor_lifespan( No-op unless `settings.run_supervisor_enabled` is True (default off, so a deployment opts in explicitly). The gated wind-up is a separate opt-in (`run_supervisor_resume_enabled`, also default off) so a deployment may - run auto-hold without auto-resume. The shadow observation rules are a - further opt-in (their channel-name settings, default None). + run auto-hold without auto-resume. The run-liveness act rung (autonomous + TruncateRun) is a further opt-in (`run_supervisor_truncate_enabled`, default + off, inert unless a `run_liveness_ceiling_seconds` is set). The shadow + observation rules are a further opt-in (their channel-name settings, None). """ if not deps.settings.run_supervisor_enabled: _log.info("run_supervisor.skipped", reason="disabled") @@ -1094,6 +1255,8 @@ async def run_supervisor_lifespan( resume_enabled = deps.settings.run_supervisor_resume_enabled resume_settle_ticks = deps.settings.run_supervisor_resume_settle_ticks liveness_ceiling_seconds = deps.settings.run_liveness_ceiling_seconds + truncate_enabled = deps.settings.run_supervisor_truncate_enabled + truncate_settle_ticks = deps.settings.run_supervisor_truncate_settle_ticks advise_enabled = deps.settings.run_supervisor_advise_enabled rules_config = ObservationRuleConfig( quality_channel_name=deps.settings.run_quality_channel_name, @@ -1107,6 +1270,7 @@ async def run_supervisor_lifespan( interval_seconds=interval, resume_enabled=resume_enabled, liveness_ceiling_seconds=liveness_ceiling_seconds, + truncate_enabled=truncate_enabled, quality_channel=rules_config.quality_channel_name, stall_channel=rules_config.stall_channel_name, advise_enabled=advise_enabled, @@ -1117,6 +1281,7 @@ async def run_supervisor_lifespan( list_runs, hold_run, resume_run, + truncate_run, lookup, channels, rules_config, @@ -1124,6 +1289,8 @@ async def run_supervisor_lifespan( resume_enabled, resume_settle_ticks, liveness_ceiling_seconds, + truncate_enabled, + truncate_settle_ticks, advise_enabled, ), name="run-supervisor", diff --git a/apps/api/src/cora/api/main.py b/apps/api/src/cora/api/main.py index f26d74b07ec..681a206ab55 100644 --- a/apps/api/src/cora/api/main.py +++ b/apps/api/src/cora/api/main.py @@ -880,6 +880,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: list_runs=app.state.run.list_runs, hold_run=app.state.run.hold_run, resume_run=app.state.run.resume_run, + truncate_run=app.state.run.truncate_run, ), run_initiator_lifespan( deps, diff --git a/apps/api/src/cora/decision/aggregates/decision/state.py b/apps/api/src/cora/decision/aggregates/decision/state.py index 8590b9446bc..7ea133c51c4 100644 --- a/apps/api/src/cora/decision/aggregates/decision/state.py +++ b/apps/api/src/cora/decision/aggregates/decision/state.py @@ -303,10 +303,12 @@ # Closed `choice` value set for `context = "RunSupervision"` Decisions. # Projection-validated, not domain-enforced (the open-string -# `DecisionContext` + `DecisionChoice` shape is preserved). Ten values: +# `DecisionContext` + `DecisionChoice` shape is preserved). Eleven values: # five beam-Hold/Resume + two audit-fallback + three advise-rung -# observe->advise dispositions (Quieted / Stalled / Breached), the last -# three Decision-only (one per breach edge, never a command). +# observe->advise dispositions (Quieted / Stalled / Breached, Decision-only, +# one per breach edge) + one liveness act rung (Truncate) that escalates the +# run-liveness advise (Quieted) to a terminal command once a Run stays +# implausibly long for the operator settle window. # # - `Continue` -- no wind-down trigger met; no command # issued (the NoAction-bias default). @@ -338,6 +340,13 @@ # (Rule Q). Decision-only. (Named by naming-r3: # the limit was breached, an objective edge, # not the supervisor's epistemic state.) +# - `Truncate` -- liveness act rung: issues TruncateRun (the +# terminal partial-data exit) for a Run that has +# stayed implausibly long past the operator +# ceiling for the settle window; the escalation of +# the SupervisionQuieted advise. A bare verb +# mirroring the TruncateRun command, as Hold / +# Resume mirror HoldRun / ResumeRun. RunSupervisionChoice = Literal[ "Continue", "Hold", @@ -349,6 +358,7 @@ "SupervisionQuieted", "SupervisionStalled", "SupervisionBreached", + "Truncate", ] RUN_SUPERVISION_CHOICES: Final = frozenset( { @@ -362,6 +372,7 @@ "SupervisionQuieted", "SupervisionStalled", "SupervisionBreached", + "Truncate", } ) diff --git a/apps/api/src/cora/infrastructure/config.py b/apps/api/src/cora/infrastructure/config.py index 99ad14b4a81..07251393ce2 100644 --- a/apps/api/src/cora/infrastructure/config.py +++ b/apps/api/src/cora/infrastructure/config.py @@ -228,6 +228,19 @@ class Settings(BaseSettings): # Decision and issues no command. run_liveness_ceiling_seconds: float | None = None + # `run_supervisor_truncate_enabled` is the ACT rung of the run-liveness rule: + # a SEPARATE opt-in (default off) above `run_liveness_ceiling_seconds` that + # lets the supervisor autonomously issue TruncateRun (the terminal, + # partial-data exit) for a Run that has stayed past the operator ceiling. It + # is inert unless the ceiling is set (the rule's own gate) AND + # run_supervisor_enabled is on. `run_supervisor_truncate_settle_ticks` is the + # anti-flap settle window: the Run must read liveness-stale for this many + # CONSECUTIVE ticks before the terminal truncate fires (>= 1), so a + # transiently-stale or recovering Run is never killed. Truncate is terminal, + # so the default settle is higher than the resume wind-up's. + run_supervisor_truncate_enabled: bool = False + run_supervisor_truncate_settle_ticks: int = 3 + # Observation-signal closed-loop rules (SHADOW, inside the RunSupervisor # loop; [[project_observation_signal_port_design]]). Both default OFF and # are a second off-gate above run_supervisor_enabled. @@ -602,6 +615,18 @@ def _validate_run_initiator_max_in_flight(cls, value: int) -> int: raise ValueError(msg) return value + @field_validator("run_supervisor_truncate_settle_ticks") + @classmethod + def _validate_run_supervisor_truncate_settle_ticks(cls, value: int) -> int: + """Floor of 1: a terminal truncate needs at least one stale read first.""" + if value < 1: + msg = ( + f"run_supervisor_truncate_settle_ticks must be >= 1, got {value}; " + "an autonomous truncate requires at least one liveness-stale read" + ) + raise ValueError(msg) + return value + @field_validator("run_liveness_ceiling_seconds") @classmethod def _validate_run_liveness_ceiling_seconds(cls, value: float | None) -> float | None: diff --git a/apps/api/src/cora/run/aggregates/run/events.py b/apps/api/src/cora/run/aggregates/run/events.py index c421c50e5c4..e0af32db1fc 100644 --- a/apps/api/src/cora/run/aggregates/run/events.py +++ b/apps/api/src/cora/run/aggregates/run/events.py @@ -572,16 +572,24 @@ class RunTruncated: Stopped vs Truncated (lifecycle-layer distinction): Stopped is a controlled exit while the system is responsive; Truncated - is a cleanup mechanism for known-dead Runs. The system itself - does not detect de-facto-dead Runs (separate liveness concern, - out of scope for 6f-4); operators must invoke truncate - explicitly. + is a cleanup mechanism for known-dead Runs. Truncate is operator- + driven, OR autonomous: the RunSupervisor's run-liveness act rung + truncates a Run that has stayed implausibly long past the operator + ceiling (the de-facto-dead detection the original truncate left to + a human), attributed via the event principal and `decided_by_decision_id`. + + `decided_by_decision_id` (mirrors RunHeld / RunAdjusted): optional + Decision-causation link, set when an agent (or a Decision-justified + operator action) truncated the Run; None for a bare operator truncate. + Forward-compat via `payload.get("decided_by_decision_id")` for legacy + streams. """ run_id: UUID reason: str interrupted_at: datetime | None occurred_at: datetime + decided_by_decision_id: UUID | None = None # Discriminated union of every event the Run aggregate emits. @@ -737,6 +745,7 @@ def to_payload(event: RunEvent) -> dict[str, Any]: reason=reason, interrupted_at=interrupted_at, occurred_at=occurred_at, + decided_by_decision_id=decided_by_decision_id, ): interrupted_at_iso = interrupted_at.isoformat() if interrupted_at is not None else None return { @@ -744,6 +753,9 @@ def to_payload(event: RunEvent) -> dict[str, Any]: "reason": reason, "interrupted_at": interrupted_at_iso, "occurred_at": occurred_at.isoformat(), + "decided_by_decision_id": ( + str(decided_by_decision_id) if decided_by_decision_id is not None else None + ), } case RunAdjusted( run_id=run_id, @@ -962,6 +974,7 @@ def _build_run_stopped() -> RunStopped: def _build_run_truncated() -> RunTruncated: raw_interrupted_at = payload["interrupted_at"] + raw_decided_by_truncated = payload.get("decided_by_decision_id") return RunTruncated( run_id=UUID(payload["run_id"]), reason=payload["reason"], @@ -971,6 +984,11 @@ def _build_run_truncated() -> RunTruncated: else None ), occurred_at=datetime.fromisoformat(payload["occurred_at"]), + decided_by_decision_id=( + UUID(raw_decided_by_truncated) + if raw_decided_by_truncated is not None + else None + ), ) return deserialize_or_raise("RunTruncated", _build_run_truncated) diff --git a/apps/api/src/cora/run/features/truncate_run/command.py b/apps/api/src/cora/run/features/truncate_run/command.py index 848a891cdfc..bc28842b5cf 100644 --- a/apps/api/src/cora/run/features/truncate_run/command.py +++ b/apps/api/src/cora/run/features/truncate_run/command.py @@ -26,3 +26,10 @@ class TruncateRun: run_id: UUID reason: str interrupted_at: datetime | None + # Optional Decision-causation link (mirrors HoldRun / AbortRun / + # AdjustRun / StartRun). Lets an autonomous truncate (the RunSupervisor + # run-liveness act rung) point back to the Decision that justified it; an + # operator-driven truncate leaves it None. No cross-BC existence check at + # the decider, per the eventual-consistency stance. Additive default so + # existing callers (REST / MCP, which do not expose it) are unaffected. + decided_by_decision_id: UUID | None = None diff --git a/apps/api/src/cora/run/features/truncate_run/decider.py b/apps/api/src/cora/run/features/truncate_run/decider.py index ef4ee489ce4..53f06e1b208 100644 --- a/apps/api/src/cora/run/features/truncate_run/decider.py +++ b/apps/api/src/cora/run/features/truncate_run/decider.py @@ -70,5 +70,6 @@ def decide( reason=reason.value, interrupted_at=command.interrupted_at, occurred_at=now, + decided_by_decision_id=command.decided_by_decision_id, ) ] diff --git a/apps/api/tests/integration/scenarios/test_2bm_run_supervisor_auto_resume.py b/apps/api/tests/integration/scenarios/test_2bm_run_supervisor_auto_resume.py index ad744fb3a84..b0ca06045cf 100644 --- a/apps/api/tests/integration/scenarios/test_2bm_run_supervisor_auto_resume.py +++ b/apps/api/tests/integration/scenarios/test_2bm_run_supervisor_auto_resume.py @@ -50,6 +50,7 @@ from cora.run.features.resume_run import bind as bind_resume_run from cora.run.features.start_run import StartRun from cora.run.features.start_run import bind as bind_start_run +from cora.run.features.truncate_run import bind as bind_truncate_run from cora.run.ports import InMemoryRunChannelLookup from cora.safety._projections import register_safety_projections from cora.safety.adapters import PostgresClearanceLookup @@ -369,6 +370,7 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool) list_runs = bind_list_runs(deps) hold_run = bind_hold_run(deps) resume_run = bind_resume_run(deps) + truncate_run = bind_truncate_run(deps) memory: dict[UUID, str] = {} settle: dict[UUID, int] = {} @@ -388,6 +390,10 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool) stall=set(), stall_streak={}, feed_dead_warned=set(), + truncate_run=truncate_run, + truncate_settle={}, + truncate_enabled=False, + truncate_settle_ticks=3, liveness_ceiling_seconds=None, advise_enabled=False, resume_enabled=True, @@ -413,6 +419,10 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool) stall=set(), stall_streak={}, feed_dead_warned=set(), + truncate_run=truncate_run, + truncate_settle={}, + truncate_enabled=False, + truncate_settle_ticks=3, liveness_ceiling_seconds=None, advise_enabled=False, resume_enabled=True, @@ -447,6 +457,7 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo list_runs = bind_list_runs(deps) hold_run = bind_hold_run(deps) resume_run = bind_resume_run(deps) + truncate_run = bind_truncate_run(deps) memory: dict[UUID, str] = {} settle: dict[UUID, int] = {} @@ -466,6 +477,10 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo stall=set(), stall_streak={}, feed_dead_warned=set(), + truncate_run=truncate_run, + truncate_settle={}, + truncate_enabled=False, + truncate_settle_ticks=3, liveness_ceiling_seconds=None, advise_enabled=False, resume_enabled=True, @@ -498,6 +513,10 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo stall=set(), stall_streak={}, feed_dead_warned=set(), + truncate_run=truncate_run, + truncate_settle={}, + truncate_enabled=False, + truncate_settle_ticks=3, liveness_ceiling_seconds=None, advise_enabled=False, resume_enabled=True, diff --git a/apps/api/tests/integration/test_truncate_run_handler_postgres.py b/apps/api/tests/integration/test_truncate_run_handler_postgres.py index f9dbcef8847..5666f6cd918 100644 --- a/apps/api/tests/integration/test_truncate_run_handler_postgres.py +++ b/apps/api/tests/integration/test_truncate_run_handler_postgres.py @@ -223,6 +223,7 @@ async def test_truncate_run_persists_with_interrupted_at_and_round_trips_to_trun "reason": "weekend power loss; abandoned at projection 487 of 1500", "interrupted_at": _INTERRUPTED_AT.isoformat(), "occurred_at": _NOW.isoformat(), + "decided_by_decision_id": None, } # The two timestamps stay distinct through jsonb (operator- # supplied interrupted_at on Saturday, system-recorded diff --git a/apps/api/tests/unit/api/test_run_supervisor.py b/apps/api/tests/unit/api/test_run_supervisor.py index b51b752e2c4..7525ca14d8c 100644 --- a/apps/api/tests/unit/api/test_run_supervisor.py +++ b/apps/api/tests/unit/api/test_run_supervisor.py @@ -51,7 +51,12 @@ BeamAvailabilityLookupResult, ) from cora.infrastructure.routing import NIL_SENTINEL_ID -from cora.run.aggregates.run import RunCannotResumeError, RunNotFoundError, RunStatus +from cora.run.aggregates.run import ( + RunCannotResumeError, + RunCannotTruncateError, + RunNotFoundError, + RunStatus, +) from cora.run.errors import UnauthorizedError from cora.run.features.hold_run import HoldRun from cora.run.features.hold_run.handler import Handler as HoldRunHandler @@ -59,6 +64,8 @@ from cora.run.features.list_runs.handler import Handler as ListRunsHandler from cora.run.features.resume_run import ResumeRun from cora.run.features.resume_run.handler import Handler as ResumeRunHandler +from cora.run.features.truncate_run import TruncateRun +from cora.run.features.truncate_run.handler import Handler as TruncateRunHandler from cora.run.ports import InMemoryRunChannelLookup, RunChannelLookup from cora.shared.identity import ActorId @@ -301,6 +308,22 @@ async def resume_run( return resume_run, calls +def _make_recording_truncate() -> tuple[TruncateRunHandler, list[TruncateRun]]: + calls: list[TruncateRun] = [] + + async def truncate_run( + command: TruncateRun, + *, + principal_id: UUID, + correlation_id: UUID, + causation_id: UUID | None = None, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> None: + calls.append(command) + + return truncate_run, calls + + def _rules_off() -> ObservationRuleConfig: """Observation rules disabled (channel names None): the default for tests that exercise only the beam-Hold / resume / liveness behavior.""" @@ -333,21 +356,30 @@ async def _tick( stall_streak: dict[UUID, int] | None = None, feed_dead_warned: set[UUID] | None = None, advise_enabled: bool = False, + truncate_run: TruncateRunHandler | None = None, + truncate_settle: dict[UUID, int] | None = None, + truncate_enabled: bool = False, + truncate_settle_ticks: int = 3, ) -> None: - """Call _supervise_tick, defaulting the resume wiring (off) for hold-only tests.""" + """Call _supervise_tick, defaulting the resume + truncate wiring (off) for + hold-only tests.""" if resume_run is None: resume_run, _ = _make_recording_resume() + if truncate_run is None: + truncate_run, _ = _make_recording_truncate() await _supervise_tick( deps=kernel, list_runs=list_runs, hold_run=hold_run, resume_run=resume_run, + truncate_run=truncate_run, beam_lookup=beam_lookup, channel_lookup=channel_lookup if channel_lookup is not None else InMemoryRunChannelLookup(), rules_config=rules_config if rules_config is not None else _rules_off(), memory=memory, settle=settle if settle is not None else {}, liveness=liveness if liveness is not None else set(), + truncate_settle=truncate_settle if truncate_settle is not None else {}, quality=quality if quality is not None else set(), stall=stall if stall is not None else set(), stall_streak=stall_streak if stall_streak is not None else {}, @@ -355,6 +387,8 @@ async def _tick( resume_enabled=resume_enabled, resume_settle_ticks=resume_settle_ticks, liveness_ceiling_seconds=liveness_ceiling_seconds, + truncate_enabled=truncate_enabled, + truncate_settle_ticks=truncate_settle_ticks, advise_enabled=advise_enabled, ) @@ -417,7 +451,11 @@ async def test_lifespan_is_noop_when_disabled() -> None: resume_run, _ = _make_recording_resume() async with run_supervisor_lifespan( - kernel, list_runs=list_runs, hold_run=hold_run, resume_run=resume_run + kernel, + list_runs=list_runs, + hold_run=hold_run, + resume_run=resume_run, + truncate_run=_make_recording_truncate()[0], ): pass @@ -545,6 +583,7 @@ async def test_lifespan_enabled_runs_the_loop_and_holds() -> None: list_runs=list_runs, hold_run=hold_run, resume_run=resume_run, + truncate_run=_make_recording_truncate()[0], beam_lookup=_BeamDown(), interval_seconds=0.01, ): @@ -595,6 +634,7 @@ async def test_loop_survives_a_failing_tick() -> None: list_runs=_make_failing_list_runs(), hold_run=hold_run, resume_run=resume_run, + truncate_run=_make_recording_truncate()[0], beam_lookup=_BeamDown(), interval_seconds=0.01, ): @@ -1952,6 +1992,7 @@ async def flaky_list_runs( list_runs=flaky_list_runs, hold_run=hold_run, resume_run=resume_run, + truncate_run=_make_recording_truncate()[0], beam_lookup=_BeamDown(), interval_seconds=0.01, ): @@ -1963,3 +2004,243 @@ async def flaky_list_runs( assert "run_supervisor.tick_failed" not in events assert "run_supervisor.read_authorized_recovered" in events assert hold_calls == [] + + +# ---------- Run-liveness act rung (autonomous truncate) ---------- + + +_CEILING = 3600.0 +_STALE_SINCE = _NOW - timedelta(hours=2) # 7200s past the 3600s ceiling -> stale + + +def _make_raising_truncate(exc: Exception) -> TruncateRunHandler: + async def truncate_run( + command: TruncateRun, + *, + principal_id: UUID, + correlation_id: UUID, + causation_id: UUID | None = None, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> None: + raise exc + + return truncate_run + + +@pytest.mark.unit +def test_run_supervisor_truncate_settle_ticks_rejects_zero() -> None: + with pytest.raises(ValueError, match="run_supervisor_truncate_settle_ticks"): + Settings(run_supervisor_truncate_settle_ticks=0) # type: ignore[call-arg] + + +@pytest.mark.unit +def test_run_supervisor_truncate_settle_ticks_accepts_valid() -> None: + settings = Settings(run_supervisor_truncate_settle_ticks=5) # type: ignore[call-arg] + assert settings.run_supervisor_truncate_settle_ticks == 5 + + +@pytest.mark.unit +async def test_truncate_disabled_never_truncates_a_stale_run() -> None: + """A stale Run is flagged (shadow), but with the act rung off no command issues.""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, running_since=_STALE_SINCE)]) + truncate_run, truncate_calls = _make_recording_truncate() + + await _tick( + kernel, + list_runs=list_runs, + hold_run=_make_recording_hold()[0], + beam_lookup=_BeamOpen(), + memory={}, + truncate_run=truncate_run, + truncate_enabled=False, + truncate_settle_ticks=1, + liveness_ceiling_seconds=_CEILING, + ) + + assert truncate_calls == [] + + +@pytest.mark.unit +async def test_truncate_does_not_fire_without_a_ceiling() -> None: + """truncate_enabled but no liveness ceiling: the liveness rule is off (even + with the act rung enabled), so no truncate is issued.""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, running_since=_STALE_SINCE)]) + truncate_run, truncate_calls = _make_recording_truncate() + + await _tick( + kernel, + list_runs=list_runs, + hold_run=_make_recording_hold()[0], + beam_lookup=_BeamOpen(), + memory={}, + truncate_run=truncate_run, + truncate_enabled=True, + truncate_settle_ticks=1, + liveness_ceiling_seconds=None, + ) + + assert truncate_calls == [] + + +@pytest.mark.unit +async def test_truncate_fires_only_after_settle_window_and_links_decision() -> None: + """A stale Run is truncated only once it stays stale for the settle window; the + issued TruncateRun links the recorded Truncate Decision.""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, running_since=_STALE_SINCE)]) + truncate_run, truncate_calls = _make_recording_truncate() + liveness: set[UUID] = set() + truncate_settle: dict[UUID, int] = {} + + async def _do_tick() -> None: + await _tick( + kernel, + list_runs=list_runs, + hold_run=_make_recording_hold()[0], + beam_lookup=_BeamOpen(), + memory={}, + liveness=liveness, + truncate_run=truncate_run, + truncate_settle=truncate_settle, + truncate_enabled=True, + truncate_settle_ticks=2, + liveness_ceiling_seconds=_CEILING, + ) + + # Tick 1: stale, but the settle window (2) is not met -> no truncate yet. + await _do_tick() + assert truncate_calls == [] + assert truncate_settle[run_id] == 1 + + # Tick 2: still stale, settle window met -> truncate fires. + await _do_tick() + assert len(truncate_calls) == 1 + assert truncate_calls[0].run_id == run_id + assert truncate_calls[0].interrupted_at is None + decision_id = truncate_calls[0].decided_by_decision_id + assert decision_id is not None + + decision = await load_decision(kernel.event_store, decision_id) + assert decision is not None + assert decision.context.value == "RunSupervision" + assert decision.choice.value == "Truncate" + assert decision.decided_by == ActorId(RUN_SUPERVISOR_AGENT_ID) + + +@pytest.mark.unit +async def test_truncate_settle_clears_when_run_recovers() -> None: + """A Run that reads stale then recovers (not stale) clears its settle counter, + so a flapping Run is never truncated (fail-safe).""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + truncate_run, truncate_calls = _make_recording_truncate() + liveness: set[UUID] = set() + truncate_settle: dict[UUID, int] = {} + + # Tick 1: stale -> settle counter 1. + await _tick( + kernel, + list_runs=_make_list_runs([_running_item(run_id, running_since=_STALE_SINCE)]), + hold_run=_make_recording_hold()[0], + beam_lookup=_BeamOpen(), + memory={}, + liveness=liveness, + truncate_run=truncate_run, + truncate_settle=truncate_settle, + truncate_enabled=True, + truncate_settle_ticks=2, + liveness_ceiling_seconds=_CEILING, + ) + assert truncate_settle.get(run_id) == 1 + + # Tick 2: the Run is no longer stale (fresh running_since) -> counter reset. + await _tick( + kernel, + list_runs=_make_list_runs([_running_item(run_id, running_since=_NOW)]), + hold_run=_make_recording_hold()[0], + beam_lookup=_BeamOpen(), + memory={}, + liveness=liveness, + truncate_run=truncate_run, + truncate_settle=truncate_settle, + truncate_enabled=True, + truncate_settle_ticks=2, + liveness_ceiling_seconds=_CEILING, + ) + assert run_id not in truncate_settle + assert truncate_calls == [] + + +@pytest.mark.unit +async def test_truncate_swallows_state_race() -> None: + """A Run that changed under us (RunCannotTruncateError) is a benign no-op.""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + truncate_run = _make_raising_truncate( + RunCannotTruncateError(run_id, current_status=RunStatus.COMPLETED) + ) + + await _tick( + kernel, + list_runs=_make_list_runs([_running_item(run_id, running_since=_STALE_SINCE)]), + hold_run=_make_recording_hold()[0], + beam_lookup=_BeamOpen(), + memory={}, + truncate_run=truncate_run, + truncate_enabled=True, + truncate_settle_ticks=1, + liveness_ceiling_seconds=_CEILING, + ) # no raise == benign no-op + + +@pytest.mark.unit +async def test_truncate_swallows_missing_run() -> None: + """A Run that terminated under us (RunNotFoundError) is a benign no-op.""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + truncate_run = _make_raising_truncate(RunNotFoundError(run_id)) + + await _tick( + kernel, + list_runs=_make_list_runs([_running_item(run_id, running_since=_STALE_SINCE)]), + hold_run=_make_recording_hold()[0], + beam_lookup=_BeamOpen(), + memory={}, + truncate_run=truncate_run, + truncate_enabled=True, + truncate_settle_ticks=1, + liveness_ceiling_seconds=_CEILING, + ) # no raise == benign no-op + + +@pytest.mark.unit +async def test_truncate_swallows_unauthorized() -> None: + """An Authorize Deny (the supervisor principal lacks TruncateRun) is logged, + not raised; no autonomous crash.""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + truncate_run = _make_raising_truncate(UnauthorizedError("supervisor not granted TruncateRun")) + + await _tick( + kernel, + list_runs=_make_list_runs([_running_item(run_id, running_since=_STALE_SINCE)]), + hold_run=_make_recording_hold()[0], + beam_lookup=_BeamOpen(), + memory={}, + truncate_run=truncate_run, + truncate_enabled=True, + truncate_settle_ticks=1, + liveness_ceiling_seconds=_CEILING, + ) # no raise diff --git a/apps/api/tests/unit/decision/test_run_supervision_vocab.py b/apps/api/tests/unit/decision/test_run_supervision_vocab.py index 0f3bda17d6e..b5d3eb9cb5d 100644 --- a/apps/api/tests/unit/decision/test_run_supervision_vocab.py +++ b/apps/api/tests/unit/decision/test_run_supervision_vocab.py @@ -39,6 +39,7 @@ def test_run_supervision_choices_closed_set() -> None: "SupervisionQuieted", "SupervisionStalled", "SupervisionBreached", + "Truncate", } ) == RUN_SUPERVISION_CHOICES