diff --git a/experiments/napkin_math/.claude/skills/extract-parameters-from-digest/system-prompt.txt b/experiments/napkin_math/.claude/skills/extract-parameters-from-digest/system-prompt.txt index d3730fb7..cc95b78b 100644 --- a/experiments/napkin_math/.claude/skills/extract-parameters-from-digest/system-prompt.txt +++ b/experiments/napkin_math/.claude/skills/extract-parameters-from-digest/system-prompt.txt @@ -903,6 +903,38 @@ When to leave it empty (omit the field or use `[]`): Do not use unmodelled_gates as a dumping ground for risks. Only include gates whose failure would end the plan independently of the financial or operational thresholds the model tests. +Dropped-signal explanations (optional `dropped_signals` field): + +A separate top-level array `dropped_signals` documents prior-iteration or source-stated signals that the current artifact deliberately leaves out. The intent is to let a deterministic source-preservation audit distinguish a defensible cap-pressure drop from a silent regression. Each entry must name a structural reason and reference the current signal it was replaced by, made redundant with, or moved to. + +Use cases (corpus-agnostic): +- A prior iteration declared a primitive that has now been computed from named constituents; record the old id with `reason: "replaced_by"` and `replacement_id` pointing at the current calculation's `output_name` or entry id. +- A prior iteration's signal is mechanically equivalent to a current signal under a different name; record with `reason: "redundant_with"` and `redundant_with_id`. +- The 8 key_values / 5 missing_values / 5 calc / 5 derived / 5 unmodelled_gate caps forced you to drop a candidate; record with `reason: "cap_pressure"` and `cap_kind` naming the capped array. +- A prior signal has been re-categorised as an unmodelled gate; record with `reason: "moved_to_unmodelled_gate"` and `replacement_id` pointing at the new unmodelled_gates entry. +- A prior signal is genuinely out of scope for the current modelling frame; record with `reason: "out_of_scope"` and a one-sentence structural justification. + +For each dropped signal, return: +- id: the prior signal id OR a source_claim_id (12-hex shape `claim_<...>`) — the identifier of what was dropped +- origin: one of `source_digest` or `prior_baseline` +- source_anchor: the source section when known (executive_summary, project_plan, selected_scenario, assumptions, review_plan, premortem, expert_criticism, data_collection) or `prior_baseline` +- expected_section: which section the signal would normally land in (key_values, missing_values_to_estimate, derived_questions, recommended_first_calculations, unmodelled_gates) +- dropped_from: prior section name when `origin == "prior_baseline"`, otherwise null +- reason: one of `replaced_by`, `cap_pressure`, `out_of_scope`, `moved_to_unmodelled_gate`, `redundant_with` +- replacement_id: required for `replaced_by` and `moved_to_unmodelled_gate`; must reference an existing current id or output_name +- redundant_with_id: required for `redundant_with`; must reference an existing current id or output_name +- cap_kind: required for `cap_pressure`; must name a capped array (`key_values`, `missing_values_to_estimate`, `derived_questions`, `recommended_first_calculations`, `unmodelled_gates`) +- rationale: one structural sentence (≤25 words). Plan-neutral wording. Do not use rationale to excuse weak extraction. + +Hard limit: at most 8 `dropped_signals`. If more than 8 would need to be recorded, this is a signal that the extraction itself is too lossy and should be redone rather than confessed in a long list. + +Do not use `dropped_signals` for: +- noise items that the source itself frames as decorative or narrative +- duplicate phrasings of a signal that IS preserved under another id (those are not drops) +- items you never considered emitting in the first place (the audit only checks signals the prior baseline or the source actually emitted) + +Omit the field entirely (or use `[]`) when no prior signal was dropped. Empty is the expected default for first-iteration extractions and for cleanly-preserved iterations. + Return this exact JSON shape: { @@ -969,5 +1001,19 @@ Return this exact JSON shape: "source_anchor": "", "consequence_if_false": "" } + ], + "dropped_signals": [ + { + "id": "", + "origin": "", + "source_anchor": "", + "expected_section": "", + "dropped_from": null, + "reason": "", + "replacement_id": null, + "redundant_with_id": null, + "cap_kind": null, + "rationale": "" + } ] } \ No newline at end of file diff --git a/experiments/napkin_math/.claude/skills/extract-parameters-from-full/system-prompt.txt b/experiments/napkin_math/.claude/skills/extract-parameters-from-full/system-prompt.txt index a24248f8..fcc8dc3d 100644 --- a/experiments/napkin_math/.claude/skills/extract-parameters-from-full/system-prompt.txt +++ b/experiments/napkin_math/.claude/skills/extract-parameters-from-full/system-prompt.txt @@ -862,6 +862,38 @@ When to leave it empty (omit the field or use `[]`): Do not use unmodelled_gates as a dumping ground for risks. Only include gates whose failure would end the plan independently of the financial or operational thresholds the model tests. +Dropped-signal explanations (optional `dropped_signals` field): + +A separate top-level array `dropped_signals` documents prior-iteration or source-stated signals that the current artifact deliberately leaves out. The intent is to let a deterministic source-preservation audit distinguish a defensible cap-pressure drop from a silent regression. Each entry must name a structural reason and reference the current signal it was replaced by, made redundant with, or moved to. + +Use cases (corpus-agnostic): +- A prior iteration declared a primitive that has now been computed from named constituents; record the old id with `reason: "replaced_by"` and `replacement_id` pointing at the current calculation's `output_name` or entry id. +- A prior iteration's signal is mechanically equivalent to a current signal under a different name; record with `reason: "redundant_with"` and `redundant_with_id`. +- The 8 key_values / 5 missing_values / 5 calc / 5 derived / 5 unmodelled_gate caps forced you to drop a candidate; record with `reason: "cap_pressure"` and `cap_kind` naming the capped array. +- A prior signal has been re-categorised as an unmodelled gate; record with `reason: "moved_to_unmodelled_gate"` and `replacement_id` pointing at the new unmodelled_gates entry. +- A prior signal is genuinely out of scope for the current modelling frame; record with `reason: "out_of_scope"` and a one-sentence structural justification. + +For each dropped signal, return: +- id: the prior signal id OR a source_claim_id (12-hex shape `claim_<...>`) — the identifier of what was dropped +- origin: one of `source_digest` or `prior_baseline` +- source_anchor: the source section when known (executive_summary, project_plan, selected_scenario, assumptions, review_plan, premortem, expert_criticism, data_collection) or `prior_baseline` +- expected_section: which section the signal would normally land in (key_values, missing_values_to_estimate, derived_questions, recommended_first_calculations, unmodelled_gates) +- dropped_from: prior section name when `origin == "prior_baseline"`, otherwise null +- reason: one of `replaced_by`, `cap_pressure`, `out_of_scope`, `moved_to_unmodelled_gate`, `redundant_with` +- replacement_id: required for `replaced_by` and `moved_to_unmodelled_gate`; must reference an existing current id or output_name +- redundant_with_id: required for `redundant_with`; must reference an existing current id or output_name +- cap_kind: required for `cap_pressure`; must name a capped array (`key_values`, `missing_values_to_estimate`, `derived_questions`, `recommended_first_calculations`, `unmodelled_gates`) +- rationale: one structural sentence (≤25 words). Plan-neutral wording. Do not use rationale to excuse weak extraction. + +Hard limit: at most 8 `dropped_signals`. If more than 8 would need to be recorded, this is a signal that the extraction itself is too lossy and should be redone rather than confessed in a long list. + +Do not use `dropped_signals` for: +- noise items that the source itself frames as decorative or narrative +- duplicate phrasings of a signal that IS preserved under another id (those are not drops) +- items you never considered emitting in the first place (the audit only checks signals the prior baseline or the source actually emitted) + +Omit the field entirely (or use `[]`) when no prior signal was dropped. Empty is the expected default for first-iteration extractions and for cleanly-preserved iterations. + Return this exact JSON shape: { @@ -928,5 +960,19 @@ Return this exact JSON shape: "source_anchor": "", "consequence_if_false": "" } + ], + "dropped_signals": [ + { + "id": "", + "origin": "", + "source_anchor": "", + "expected_section": "", + "dropped_from": null, + "reason": "", + "replacement_id": null, + "redundant_with_id": null, + "cap_kind": null, + "rationale": "" + } ] } \ No newline at end of file diff --git a/experiments/napkin_math/audit_source_preservation.py b/experiments/napkin_math/audit_source_preservation.py index 614a2f79..0237c05c 100644 --- a/experiments/napkin_math/audit_source_preservation.py +++ b/experiments/napkin_math/audit_source_preservation.py @@ -4,26 +4,42 @@ Implements Fork B of proposal 141 (source-preservation audit). Reads a prior ``parameters.json`` and a current ``parameters.json``, builds the -prior signal set, and classifies each prior id as one of: - - preserved_by_id — same id is in current - preserved_by_output_name — prior id appears as an output_name - preserved_as_formula_dependency — prior id is on a current formula RHS - or in a current depends_on list - likely_renamed — prior id has high snake_case token - overlap with one or more current ids +prior signal set, and classifies each prior signal as one of: + + preserved_by_id — same name is in current ids + preserved_by_output_name — prior name appears as a current + output_name + preserved_as_formula_dependency — prior name is on a current formula + RHS or in a current depends_on list + explained_drop — current parameters.json's + dropped_signals records the prior + signal with a SEMANTICALLY VALID + structural reason (origin = + prior_baseline, references resolve, + cap_pressure claim justified) + likely_renamed — prior name has high snake_case token + Jaccard overlap with one or more + candidates from current ids ∪ + output_names absent_unexplained — no preservation evidence found Advisory only. Exit 0 unless the input is malformed (exit 2). No strict -mode, no CI gating, no schema changes to extract prompts — those land in -later proposal 141 PRs after this audit's findings (false-positive rate, -useful catches) are measured on the corpus. +mode, no CI gating. -Out of scope for this PR (deferred to later proposal 141 PRs): +The audit reuses the same semantic checks as ``validate_parameters.py``'s +``check_dropped_signals_schema``: an entry can be consumed as evidence +of an ``explained_drop`` only when it would also pass validation. A +malformed entry (unknown reason, unresolved replacement_id, unjustified +cap_pressure claim, etc.) is silently ignored by the audit and the prior +signal falls through to ``likely_renamed`` or ``absent_unexplained``. +This prevents an invalid explanation from hiding a real regression. + +Out of scope for this advisory PR (deferred to later proposal 141 PRs): - Fork A (source-digest regex scan against the current artifact). - - The optional ``dropped_signals`` schema in extract prompts. - - LLM rationale parsing of ``dropped_signals`` entries. + - Orchestrator wiring that lets the extract skill see prior baselines + (without it the LLM cannot emit prior_baseline-origin drops). - Strict-mode exit-non-zero policy. + - ``source_claim_ids`` per-entry grounding. """ from __future__ import annotations @@ -72,6 +88,102 @@ # is advisory — the reviewer wants the top few, not an exhaustive list. MAX_RENAME_CANDIDATES: int = 3 +# Closed enum of structural reasons accepted in a dropped_signals entry. +# Matches validate_parameters.DROPPED_SIGNAL_REASONS so a single source +# of truth governs the schema and the audit's consumption of it. +DROPPED_SIGNAL_REASONS: frozenset[str] = frozenset({ + "replaced_by", "cap_pressure", "out_of_scope", + "moved_to_unmodelled_gate", "redundant_with", +}) + +# Reasons whose semantics require a populated replacement_id. +DROPPED_SIGNAL_REASONS_NEEDING_REPLACEMENT: frozenset[str] = frozenset({ + "replaced_by", "moved_to_unmodelled_gate", +}) + +# Section-name → cap. Matches validate_parameters.CAPS; duplicated here +# so the audit can verify a cap_pressure claim without importing the +# validator module (the audit can be invoked standalone via CLI). +DROPPED_SIGNAL_CAPS: dict[str, int] = { + "key_values": 8, + "derived_questions": 5, + "missing_values_to_estimate": 5, + "recommended_first_calculations": 5, + "unmodelled_gates": 5, +} + + +def _collect_unmodelled_gate_ids(params: dict) -> set[str]: + """The set of declared ``unmodelled_gates`` entry ids in a parameters + artifact. Used to validate ``moved_to_unmodelled_gate`` references.""" + out: set[str] = set() + for entry in params.get("unmodelled_gates", []) or []: + if isinstance(entry, dict) and isinstance(entry.get("id"), str): + out.add(entry["id"]) + return out + + +def is_audit_consumable_drop( + entry: dict, current_params: dict, current_index: dict[str, Any] +) -> bool: + """Decide whether a ``dropped_signals`` entry is semantically valid + enough that the audit should consume it as evidence of an + ``explained_drop``. Mirrors the checks ``validate_parameters.py``'s + ``check_dropped_signals_schema`` applies, so an entry that the + validator would reject does not get to hide a real regression in + the audit. + + Required for any consumption: + - ``entry`` is a dict with non-empty string ``id`` + - ``origin == "prior_baseline"`` (Fork B; source_digest drops are + Fork A territory and not consumed here) + - ``reason`` is in the closed enum + + Reason-specific reference resolution: + - ``replaced_by`` — ``replacement_id`` must be a current id or + output_name + - ``redundant_with`` — ``redundant_with_id`` must be a current id + or output_name + - ``moved_to_unmodelled_gate`` — ``replacement_id`` must match an + ``unmodelled_gates`` entry id + - ``cap_pressure`` — ``cap_kind`` must name a capped array AND + that array must actually be at its cap in the current artifact + - ``out_of_scope`` — no extra reference required + """ + if not isinstance(entry, dict): + return False + eid = entry.get("id") + if not isinstance(eid, str) or not eid: + return False + if entry.get("origin") != "prior_baseline": + return False + reason = entry.get("reason") + if reason not in DROPPED_SIGNAL_REASONS: + return False + current_refs = current_index["ids"] | current_index["output_names"] + if reason in DROPPED_SIGNAL_REASONS_NEEDING_REPLACEMENT: + rid = entry.get("replacement_id") + if not isinstance(rid, str) or not rid: + return False + if reason == "replaced_by" and rid not in current_refs: + return False + if reason == "moved_to_unmodelled_gate": + if rid not in _collect_unmodelled_gate_ids(current_params): + return False + if reason == "redundant_with": + rid = entry.get("redundant_with_id") + if not isinstance(rid, str) or not rid: + return False + if rid not in current_refs: + return False + if reason == "cap_pressure": + cap_kind = entry.get("cap_kind") + if cap_kind not in DROPPED_SIGNAL_CAPS: + return False + if len(current_params.get(cap_kind, []) or []) < DROPPED_SIGNAL_CAPS[cap_kind]: + return False + return True + def parse_rhs_tokens(formula: str) -> set[str]: """Extract snake_case identifier tokens from the RHS of a @@ -177,11 +289,51 @@ def find_rename_candidates( return scored[:MAX_RENAME_CANDIDATES] +def build_dropped_signal_index( + params: dict, current_index: dict[str, Any] +) -> dict[str, dict[str, Any]]: + """Index a current artifact's ``dropped_signals`` entries by id, so + the audit can reclassify a prior signal whose disappearance the LLM + has explained. + + Only **semantically valid** entries are indexed — see + ``is_audit_consumable_drop`` for the rules. Malformed entries are + silently skipped (``validate_parameters.py`` is the right place to + surface them as ERRORs). The strict filter prevents an invalid + explanation from hiding a real regression: an entry whose + ``replacement_id`` does not resolve, or whose ``cap_pressure`` claim + is not justified, falls through to ``likely_renamed`` or + ``absent_unexplained`` in the audit's classification. + """ + out: dict[str, dict[str, Any]] = {} + for entry in params.get("dropped_signals", []) or []: + if not is_audit_consumable_drop(entry, params, current_index): + continue + eid = entry["id"] + if eid not in out: + out[eid] = entry + return out + + def classify_prior_signal( - prior_name: str, current_index: dict[str, Any] + prior_name: str, + current_index: dict[str, Any], + dropped_index: dict[str, dict[str, Any]] | None = None, ) -> dict[str, Any]: """Classify one prior signal name by what evidence the current - artifact provides for it. + artifact provides for it. ``dropped_index`` (optional) is a map + from id → dropped_signals entry; when a prior name matches an entry + AND the prior is not otherwise preserved, the audit reclassifies + the disappearance as ``explained_drop`` with the structured reason. + + Preservation precedence: + preserved_by_id > preserved_by_output_name > + preserved_as_formula_dependency > explained_drop > + likely_renamed > absent_unexplained + + ``explained_drop`` ranks above ``likely_renamed`` because the LLM + has named a specific structural reason and reference; the rename- + candidate suggestions become noise once the drop is explained. """ if prior_name in current_index["ids"]: return {"status": "preserved_by_id"} @@ -189,6 +341,15 @@ def classify_prior_signal( return {"status": "preserved_by_output_name"} if prior_name in current_index["formula_tokens"]: return {"status": "preserved_as_formula_dependency"} + if dropped_index and prior_name in dropped_index: + entry = dropped_index[prior_name] + return { + "status": "explained_drop", + "reason": entry.get("reason"), + "replacement_id": entry.get("replacement_id"), + "redundant_with_id": entry.get("redundant_with_id"), + "cap_kind": entry.get("cap_kind"), + } candidate_pool = current_index["ids"] | current_index["output_names"] candidates = find_rename_candidates(prior_name, candidate_pool) if candidates: @@ -228,18 +389,20 @@ def audit(prior_params: dict, current_params: dict) -> dict[str, Any]: """ prior_index = build_signal_index(prior_params) current_index = build_signal_index(current_params) + dropped_index = build_dropped_signal_index(current_params, current_index) summary = { "prior_total": len(prior_index["signals"]), "preserved_by_id": 0, "preserved_by_output_name": 0, "preserved_as_formula_dependency": 0, + "explained_drop": 0, "likely_renamed": 0, "absent_unexplained": 0, } details: list[dict[str, Any]] = [] for prior_name in sorted(prior_index["signals"]): meta = prior_index["signals"][prior_name] - cls = classify_prior_signal(prior_name, current_index) + cls = classify_prior_signal(prior_name, current_index, dropped_index) summary[cls["status"]] = summary[cls["status"]] + 1 details.append({ "prior_name": prior_name, @@ -272,10 +435,25 @@ def render_text_report(report: dict[str, Any]) -> str: f" preserved_by_id : {s['preserved_by_id']}", f" preserved_by_output_name : {s['preserved_by_output_name']}", f" preserved_as_formula_dependency : {s['preserved_as_formula_dependency']}", + f" explained_drop : {s['explained_drop']}", f" likely_renamed : {s['likely_renamed']}", f" absent_unexplained : {s['absent_unexplained']}", "", ] + explained = [d for d in report["details"] if d["status"] == "explained_drop"] + if explained: + lines.append("EXPLAINED DROPS (dropped_signals entries):") + for d in explained: + reason = d.get("reason", "?") + ref_bits: list[str] = [reason] + if d.get("replacement_id"): + ref_bits.append(f"→ {d['replacement_id']}") + if d.get("redundant_with_id"): + ref_bits.append(f"≡ {d['redundant_with_id']}") + if d.get("cap_kind"): + ref_bits.append(f"cap={d['cap_kind']}") + lines.append(f" {_format_signal_label(d)} :: {' '.join(ref_bits)}") + lines.append("") renamed = [d for d in report["details"] if d["status"] == "likely_renamed"] if renamed: lines.append("LIKELY RENAMED:") diff --git a/experiments/napkin_math/tests/run_smoke.py b/experiments/napkin_math/tests/run_smoke.py index da4ff520..8278f6a3 100644 --- a/experiments/napkin_math/tests/run_smoke.py +++ b/experiments/napkin_math/tests/run_smoke.py @@ -223,7 +223,7 @@ def check_summarize_assessment_end_to_end(tmpdir: Path) -> None: def check_validate_parameters_end_to_end(tmpdir: Path) -> None: """Run validate_parameters.py against the smoke fixture and verify it - produces a clean validation.json (exit 0, valid: true, 18 checks listed). + produces a clean validation.json (exit 0, valid: true, 19 checks listed). """ out = tmpdir / "validation.json" validator = NAPKIN_DIR / "validate_parameters.py" @@ -241,8 +241,8 @@ def check_validate_parameters_end_to_end(tmpdir: Path) -> None: body = json.loads(out.read_text()) _check("validation.json valid: true", body.get("valid") is True) _check("validation.json error_count == 0", body.get("error_count") == 0) - _check("validation.json lists 18 checks_performed", - len(body.get("summary", {}).get("checks_performed", [])) == 18) + _check("validation.json lists 19 checks_performed", + len(body.get("summary", {}).get("checks_performed", [])) == 19) def check_prepare_extract_input_imports() -> None: diff --git a/experiments/napkin_math/tests/test_audit_source_preservation.py b/experiments/napkin_math/tests/test_audit_source_preservation.py index 5cccc2db..f90646b8 100644 --- a/experiments/napkin_math/tests/test_audit_source_preservation.py +++ b/experiments/napkin_math/tests/test_audit_source_preservation.py @@ -355,3 +355,252 @@ def test_parse_rhs_tokens_handles_expression_without_assignment() -> None: def test_parse_rhs_tokens_returns_empty_for_non_string() -> None: assert mod.parse_rhs_tokens(None) == set() assert mod.parse_rhs_tokens("") == set() + + +# ─── explained_drop (dropped_signals consumption) ───────────────────────── + +def _dropped(eid: str, **overrides) -> dict: + base = { + "id": eid, + "origin": "prior_baseline", + "source_anchor": "prior_baseline", + "expected_section": "key_values", + "dropped_from": "key_values", + "reason": "replaced_by", + "replacement_id": "alpha", + "redundant_with_id": None, + "cap_kind": None, + "rationale": "replaced by an equivalent computed quantity", + } + base.update(overrides) + return base + + +def test_explained_drop_reclassifies_absent_signal_with_replaced_by() -> None: + """When the current artifact's dropped_signals names a prior id and + points at an existing replacement, the audit reclassifies the prior + signal from absent_unexplained to explained_drop.""" + prior = _build(key_values=[_kv("orphan_token_alpha")]) + current = _build( + key_values=[_kv("alpha")], + dropped_signals=[_dropped("orphan_token_alpha", replacement_id="alpha")], + ) + report = mod.audit(prior, current) + assert report["summary"]["explained_drop"] == 1 + assert report["summary"]["absent_unexplained"] == 0 + detail = next(d for d in report["details"] + if d["status"] == "explained_drop") + assert detail["reason"] == "replaced_by" + assert detail["replacement_id"] == "alpha" + + +def test_explained_drop_outranks_likely_renamed() -> None: + """When a prior id has BOTH a high-token-overlap rename candidate + AND a dropped_signals entry, the explained_drop classification wins + because the LLM named a specific structural reason.""" + prior = _build(key_values=[_kv("alpha_beta_gamma")]) + current = _build( + key_values=[_kv("alpha_beta_gamma_delta")], + dropped_signals=[ + _dropped("alpha_beta_gamma", replacement_id="alpha_beta_gamma_delta"), + ], + ) + report = mod.audit(prior, current) + assert report["summary"]["explained_drop"] == 1 + assert report["summary"]["likely_renamed"] == 0 + + +def test_explained_drop_ignored_when_id_is_actually_preserved() -> None: + """If the LLM over-records (drops_signals names a prior id that IS + in current), the audit prefers the preservation evidence — the + explained_drop entry is silently ignored. Avoids double-counting.""" + prior = _build(key_values=[_kv("alpha")]) + current = _build( + key_values=[_kv("alpha")], + dropped_signals=[_dropped("alpha", replacement_id="alpha")], + ) + report = mod.audit(prior, current) + assert report["summary"]["preserved_by_id"] == 1 + assert report["summary"]["explained_drop"] == 0 + + +def test_explained_drop_silently_skips_malformed_entry() -> None: + """Malformed dropped_signals entries (unknown reason, missing id, + etc.) are silently skipped by the audit — validate_parameters is + the right place to surface them. The audit should not crash or + promote a malformed entry into a false explained_drop.""" + prior = _build(key_values=[_kv("orphan_token_alpha")]) + current = _build( + key_values=[_kv("alpha")], + dropped_signals=[{ + "id": "orphan_token_alpha", + "origin": "prior_baseline", + "reason": "garbage_reason", # not in closed enum + "rationale": "garbage", + }], + ) + report = mod.audit(prior, current) + # Malformed entry ignored → prior signal falls through to + # absent_unexplained (no rename candidate available). + assert report["summary"]["explained_drop"] == 0 + assert report["summary"]["absent_unexplained"] == 1 + + +def test_explained_drop_handles_cap_pressure_reason() -> None: + """A cap_pressure drop is a legitimate explained_drop — but the + audit consumes the claim only when the named capped array is + actually at its cap (matches validator). Here key_values is filled + to its cap of 8 so the cap_pressure claim is justified.""" + prior = _build(key_values=[_kv("orphan_cap_pressured_token")]) + current = _build( + key_values=[_kv(f"kv_{i}") for i in range(8)], + dropped_signals=[ + _dropped( + "orphan_cap_pressured_token", + reason="cap_pressure", + cap_kind="key_values", + replacement_id=None, + rationale="dropped under key_values cap pressure", + ), + ], + ) + report = mod.audit(prior, current) + detail = next(d for d in report["details"] + if d["status"] == "explained_drop") + assert detail["reason"] == "cap_pressure" + assert detail["cap_kind"] == "key_values" + + +def test_explained_drop_handles_moved_to_unmodelled_gate() -> None: + prior = _build(key_values=[_kv("orphan_token_promoted_to_gate")]) + current = _build( + unmodelled_gates=[{ + "id": "orphan_gate_id", + "label": "Promoted gate", + "why_it_matters": "test", + "source_anchor": "assumptions", + "consequence_if_false": "test", + }], + dropped_signals=[ + _dropped( + "orphan_token_promoted_to_gate", + reason="moved_to_unmodelled_gate", + replacement_id="orphan_gate_id", + rationale="re-categorised as binary regulatory gate", + ), + ], + ) + report = mod.audit(prior, current) + detail = next(d for d in report["details"] + if d["status"] == "explained_drop") + assert detail["reason"] == "moved_to_unmodelled_gate" + assert detail["replacement_id"] == "orphan_gate_id" + + +# ─── strict consumption: invalid dropped_signals must not become explained_drop ── + +def test_explained_drop_rejects_unresolved_replacement_id() -> None: + """Review feedback on PR #752: the audit must not accept a + replaced_by entry whose replacement_id does not resolve to a + current id or output_name. Otherwise an invalid explanation can + hide a real regression.""" + prior = _build(key_values=[_kv("orphan_token_alpha")]) + current = _build( + # NB: "alpha" is the prior's renamed candidate by tokens; we + # use it to confirm the audit falls through to likely_renamed, + # not explained_drop, when the replacement_id is bogus. + key_values=[_kv("orphan_token_alpha_renamed")], + dropped_signals=[ + _dropped( + "orphan_token_alpha", + replacement_id="never_declared_anywhere", + ), + ], + ) + report = mod.audit(prior, current) + statuses = {d["prior_name"]: d["status"] for d in report["details"]} + # The malformed dropped_signals entry is ignored; the prior signal + # falls through to likely_renamed (high token overlap with the + # current id) rather than being hidden by the invalid explanation. + assert statuses["orphan_token_alpha"] == "likely_renamed" + assert report["summary"]["explained_drop"] == 0 + + +def test_explained_drop_rejects_source_digest_origin() -> None: + """The Fork B audit consumes only prior_baseline-origin entries. + source_digest-origin drops are Fork A territory and not consumed + here; the prior signal must fall through to its weaker + classification rather than being silently reclassified.""" + prior = _build(key_values=[_kv("orphan_token_alpha")]) + current = _build( + key_values=[_kv("alpha")], + dropped_signals=[ + _dropped( + "orphan_token_alpha", + origin="source_digest", + replacement_id="alpha", + ), + ], + ) + report = mod.audit(prior, current) + assert report["summary"]["explained_drop"] == 0 + + +def test_explained_drop_rejects_unresolved_redundant_with_id() -> None: + prior = _build(key_values=[_kv("orphan_redundant_token")]) + current = _build( + key_values=[_kv("alpha")], + dropped_signals=[ + _dropped( + "orphan_redundant_token", + reason="redundant_with", + replacement_id=None, + redundant_with_id="never_declared", + rationale="redundant with the new thing", + ), + ], + ) + report = mod.audit(prior, current) + assert report["summary"]["explained_drop"] == 0 + + +def test_explained_drop_rejects_unjustified_cap_pressure() -> None: + """cap_pressure must be justified — the named array must actually + be at its cap. An empty array (cap-room available) means the LLM + could have kept the signal.""" + prior = _build(key_values=[_kv("orphan_cap_token")]) + current = _build( + # key_values is at 1 entry; cap is 8 → cap_pressure unjustified. + key_values=[_kv("alpha")], + dropped_signals=[ + _dropped( + "orphan_cap_token", + reason="cap_pressure", + cap_kind="key_values", + replacement_id=None, + rationale="dropped under cap pressure", + ), + ], + ) + report = mod.audit(prior, current) + assert report["summary"]["explained_drop"] == 0 + + +def test_explained_drop_rejects_moved_to_unmodelled_gate_pointing_at_kv() -> None: + """moved_to_unmodelled_gate.replacement_id must resolve to an actual + unmodelled_gates entry — not to a regular key_value.""" + prior = _build(key_values=[_kv("orphan_gate_candidate")]) + current = _build( + key_values=[_kv("alpha")], + unmodelled_gates=[], + dropped_signals=[ + _dropped( + "orphan_gate_candidate", + reason="moved_to_unmodelled_gate", + replacement_id="alpha", # key_value, not an unmodelled_gates id + rationale="moved to unmodelled gates layer", + ), + ], + ) + report = mod.audit(prior, current) + assert report["summary"]["explained_drop"] == 0 diff --git a/experiments/napkin_math/tests/test_validate_parameters.py b/experiments/napkin_math/tests/test_validate_parameters.py index cbc20844..d29a32d1 100644 --- a/experiments/napkin_math/tests/test_validate_parameters.py +++ b/experiments/napkin_math/tests/test_validate_parameters.py @@ -498,10 +498,11 @@ def test_requirement_has_margin_silent_when_no_required_key_value() -> None: # ─── checks_performed enumeration ───────────────────────────────────────── -def test_validate_lists_all_18_checks() -> None: - """The validator must report all 18 checks_performed, including the - two new structural rules. Downstream consumers (summarize_assessment) - use this list to render the validation card.""" +def test_validate_lists_all_19_checks() -> None: + """The validator must report all 19 checks_performed, including the + two PR #746 structural rules and the PR #2 ``dropped_signals_schema`` + rule. Downstream consumers (summarize_assessment) use this list to + render the validation card.""" report = validate({ "plan_summary": PLAN_SUMMARY, "key_values": [], @@ -510,6 +511,238 @@ def test_validate_lists_all_18_checks() -> None: "recommended_first_calculations": [], }) checks = report["summary"]["checks_performed"] - assert len(checks) == 18 + assert len(checks) == 19 assert "aggregate_not_bounded" in checks assert "requirement_has_margin" in checks + assert "dropped_signals_schema" in checks + + +# ─── dropped_signals_schema ────────────────────────────────────────────── + +def _make_dropped(eid: str, **overrides) -> dict: + base = { + "id": eid, + "origin": "prior_baseline", + "source_anchor": "prior_baseline", + "expected_section": "key_values", + "dropped_from": "key_values", + "reason": "replaced_by", + "replacement_id": "alpha", + "redundant_with_id": None, + "cap_kind": None, + "rationale": "replaced by an equivalent computed quantity", + } + base.update(overrides) + return base + + +def test_dropped_signals_absent_is_clean() -> None: + """The field is optional. Artifacts without dropped_signals validate + clean (matches every existing v51 parameters.json).""" + params = { + "plan_summary": PLAN_SUMMARY, + "key_values": [], + "derived_questions": [], + "missing_values_to_estimate": [], + "recommended_first_calculations": [], + } + report = validate(params) + assert _violations_for(report, "dropped_signals_schema") == [] + + +def test_dropped_signals_replaced_by_resolves_clean() -> None: + params = { + "plan_summary": PLAN_SUMMARY, + "key_values": [ + { + "id": "alpha", "label": "alpha", "category": "test", + "value_type": "explicit", "unit": "test", "value": 1.0, + "comment": "", "formula_hint": None, "output_name": None, + "output_unit": None, "depends_on": [], + "modelling_priority": "low", "uncertainty": "low", + "source_text": "", + }, + ], + "derived_questions": [], + "missing_values_to_estimate": [], + "recommended_first_calculations": [], + "dropped_signals": [ + _make_dropped("old_alpha", replacement_id="alpha"), + ], + } + report = validate(params) + assert _violations_for(report, "dropped_signals_schema") == [] + + +def test_dropped_signals_unknown_reason_fires() -> None: + params = { + "plan_summary": PLAN_SUMMARY, + "key_values": [], + "derived_questions": [], + "missing_values_to_estimate": [], + "recommended_first_calculations": [], + "dropped_signals": [_make_dropped("orphan", reason="some_garbage")], + } + report = validate(params) + fired = _violations_for(report, "dropped_signals_schema") + assert any("reason" in v["message"] for v in fired) + + +def test_dropped_signals_unresolved_replacement_id_fires() -> None: + """replacement_id must reference an existing current id or output_name.""" + params = { + "plan_summary": PLAN_SUMMARY, + "key_values": [], + "derived_questions": [], + "missing_values_to_estimate": [], + "recommended_first_calculations": [], + "dropped_signals": [ + _make_dropped("old_alpha", replacement_id="never_declared"), + ], + } + report = validate(params) + fired = _violations_for(report, "dropped_signals_schema") + assert any("replacement_id" in v["message"] for v in fired) + + +def test_dropped_signals_cap_pressure_requires_array_at_cap() -> None: + """A cap_pressure claim is only justified when the capped array is + actually at its cap. If the array has room, the dropped signal could + have been kept — the claim is rejected.""" + params = { + "plan_summary": PLAN_SUMMARY, + # key_values is at 1 entry; cap is 8. cap_pressure is not justified. + "key_values": [ + { + "id": "alpha", "label": "alpha", "category": "test", + "value_type": "explicit", "unit": "test", "value": 1.0, + "comment": "", "formula_hint": None, "output_name": None, + "output_unit": None, "depends_on": [], + "modelling_priority": "low", "uncertainty": "low", + "source_text": "", + }, + ], + "derived_questions": [], + "missing_values_to_estimate": [], + "recommended_first_calculations": [], + "dropped_signals": [ + _make_dropped( + "old_beta", + reason="cap_pressure", + cap_kind="key_values", + replacement_id=None, + rationale="dropped under cap pressure", + ), + ], + } + report = validate(params) + fired = _violations_for(report, "dropped_signals_schema") + assert any("cap_pressure" in v["message"] for v in fired) + + +def test_dropped_signals_redundant_with_unresolved_fires() -> None: + params = { + "plan_summary": PLAN_SUMMARY, + "key_values": [], + "derived_questions": [], + "missing_values_to_estimate": [], + "recommended_first_calculations": [], + "dropped_signals": [ + _make_dropped( + "old_redundant", + reason="redundant_with", + replacement_id=None, + redundant_with_id="never_declared", + rationale="this is redundant with the new thing", + ), + ], + } + report = validate(params) + fired = _violations_for(report, "dropped_signals_schema") + assert any("redundant_with_id" in v["message"] for v in fired) + + +def test_dropped_signals_moved_to_unmodelled_gate_requires_gate_id() -> None: + """moved_to_unmodelled_gate replacement_id must point at an actual + unmodelled_gates entry — not at a regular key_value/output_name.""" + params = { + "plan_summary": PLAN_SUMMARY, + "key_values": [ + { + "id": "alpha", "label": "alpha", "category": "test", + "value_type": "explicit", "unit": "test", "value": 1.0, + "comment": "", "formula_hint": None, "output_name": None, + "output_unit": None, "depends_on": [], + "modelling_priority": "low", "uncertainty": "low", + "source_text": "", + }, + ], + "derived_questions": [], + "missing_values_to_estimate": [], + "recommended_first_calculations": [], + "unmodelled_gates": [], + "dropped_signals": [ + _make_dropped( + "old_gate_candidate", + reason="moved_to_unmodelled_gate", + # alpha is a key_value, not an unmodelled_gates id. + replacement_id="alpha", + rationale="moved to unmodelled gates layer", + ), + ], + } + report = validate(params) + fired = _violations_for(report, "dropped_signals_schema") + assert any("unmodelled_gates" in v["message"] for v in fired) + + +def test_dropped_signals_rationale_word_cap() -> None: + long_rationale = " ".join(["word"] * 40) + params = { + "plan_summary": PLAN_SUMMARY, + "key_values": [ + { + "id": "alpha", "label": "alpha", "category": "test", + "value_type": "explicit", "unit": "test", "value": 1.0, + "comment": "", "formula_hint": None, "output_name": None, + "output_unit": None, "depends_on": [], + "modelling_priority": "low", "uncertainty": "low", + "source_text": "", + }, + ], + "derived_questions": [], + "missing_values_to_estimate": [], + "recommended_first_calculations": [], + "dropped_signals": [ + _make_dropped("old_alpha", replacement_id="alpha", rationale=long_rationale), + ], + } + report = validate(params) + fired = _violations_for(report, "dropped_signals_schema") + assert any("rationale" in v["message"] for v in fired) + + +def test_dropped_signals_over_cap_fires() -> None: + """At most 8 dropped_signals entries; 9 should fire.""" + params = { + "plan_summary": PLAN_SUMMARY, + "key_values": [ + { + "id": "alpha", "label": "alpha", "category": "test", + "value_type": "explicit", "unit": "test", "value": 1.0, + "comment": "", "formula_hint": None, "output_name": None, + "output_unit": None, "depends_on": [], + "modelling_priority": "low", "uncertainty": "low", + "source_text": "", + }, + ], + "derived_questions": [], + "missing_values_to_estimate": [], + "recommended_first_calculations": [], + "dropped_signals": [ + _make_dropped(f"old_{i}", replacement_id="alpha") for i in range(9) + ], + } + report = validate(params) + fired = _violations_for(report, "dropped_signals_schema") + assert any("9 entries" in v["message"] for v in fired) diff --git a/experiments/napkin_math/validate_parameters.py b/experiments/napkin_math/validate_parameters.py index 8f9c5aae..e1394753 100644 --- a/experiments/napkin_math/validate_parameters.py +++ b/experiments/napkin_math/validate_parameters.py @@ -7,7 +7,7 @@ the shape that `summarize_assessment.py` consumes (named `checks_performed` list + per-violation rule_id/severity/path/message/suggested_fix). -18 structural checks are run: +19 structural checks are run: json_parse # implicit (file already parsed) top_level_structure # plan_summary + four arrays @@ -27,6 +27,7 @@ shared_pool_legitimacy # no-op; enforced upstream in the prompt aggregate_not_bounded # sum-formula LHS not in missing_values requirement_has_margin # *_required key_value referenced by a calc + dropped_signals_schema # optional dropped_signals shape + refs `valid` is true iff `error_count == 0`. WARN-level findings do not invalidate the file. Exit code 0 on valid, 1 on invalid, 2 on JSON parse @@ -49,6 +50,7 @@ "output_unit_present_when_formula_hint", "no_dead_end_variables", "threshold_friendly_naming", "shared_pool_legitimacy", "aggregate_not_bounded", "requirement_has_margin", + "dropped_signals_schema", ] CAPS = { @@ -83,8 +85,32 @@ } # Top-level keys the validator REQUIRES (vs optional). unmodelled_gates is -# optional — older parameters.json files won't have it. -OPTIONAL_TOP_LEVEL_KEYS = {"unmodelled_gates"} +# optional — older parameters.json files won't have it. dropped_signals +# is optional and only present when the LLM records prior-iteration or +# source-stated absences (proposal 141 PR 2). +OPTIONAL_TOP_LEVEL_KEYS = {"unmodelled_gates", "dropped_signals"} + +DROPPED_SIGNAL_REASONS: frozenset[str] = frozenset({ + "replaced_by", "cap_pressure", "out_of_scope", + "moved_to_unmodelled_gate", "redundant_with", +}) + +# Hard limit on dropped_signals entries. Above this the extraction +# itself is too lossy and should be redone rather than confessed. +MAX_DROPPED_SIGNALS: int = 8 + +# Max words in a dropped_signal.rationale. +DROPPED_SIGNAL_RATIONALE_WORD_CAP: int = 25 + +# Origin values for a dropped_signal entry. +DROPPED_SIGNAL_ORIGINS: frozenset[str] = frozenset({ + "source_digest", "prior_baseline", +}) + +# Reasons whose semantics require a populated replacement_id. +DROPPED_SIGNAL_REASONS_NEEDING_REPLACEMENT: frozenset[str] = frozenset({ + "replaced_by", "moved_to_unmodelled_gate", +}) # Sections whose entries carry an `id` field. Used by uniqueness, snake_case, # and reference checks. @@ -600,6 +626,136 @@ def check_requirement_has_margin(params: dict, violations: list) -> None: )) +def check_dropped_signals_schema(params: dict, violations: list) -> None: + """Validate the optional ``dropped_signals`` array's shape and + cross-references. The field is absent on first-iteration extractions + and on cleanly-preserved iterations; when present each entry must + name a structural reason from the closed enum and resolve its + replacement / cap-pressure / redundancy references against current + ids or output_names. Malformed entries are not acceptable as + explanations and fire ERROR-level violations. + """ + obj = params.get("dropped_signals") + if obj is None: + return + if not isinstance(obj, list): + violations.append(violation( + "dropped_signals_schema", "ERROR", "$.dropped_signals", + "dropped_signals must be a list (or absent)", + "use an array of objects, or omit the field entirely", + )) + return + if len(obj) > MAX_DROPPED_SIGNALS: + violations.append(violation( + "dropped_signals_schema", "ERROR", "$.dropped_signals", + f"dropped_signals has {len(obj)} entries; cap is {MAX_DROPPED_SIGNALS}", + f"reduce to at most {MAX_DROPPED_SIGNALS} entries — if more drops would need " + f"recording, the extraction itself is too lossy and should be redone", + )) + current_refs = collect_all_ids(params) | collect_output_names(params) + unmodelled_ids: set[str] = set() + for entry in params.get("unmodelled_gates", []) or []: + if isinstance(entry, dict) and isinstance(entry.get("id"), str): + unmodelled_ids.add(entry["id"]) + for i, entry in enumerate(obj): + path = f"$.dropped_signals[{i}]" + if not isinstance(entry, dict): + violations.append(violation( + "dropped_signals_schema", "ERROR", path, + "dropped_signals entry is not an object", + "use an object with the documented fields", + )) + continue + reason = entry.get("reason") + if reason not in DROPPED_SIGNAL_REASONS: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.reason", + f"reason `{reason}` is not in the closed enum", + f"use one of {sorted(DROPPED_SIGNAL_REASONS)}", + )) + origin = entry.get("origin") + if origin not in DROPPED_SIGNAL_ORIGINS: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.origin", + f"origin `{origin}` is not in the closed enum", + f"use one of {sorted(DROPPED_SIGNAL_ORIGINS)}", + )) + eid = entry.get("id") + if not isinstance(eid, str) or not eid: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.id", + "id must be a non-empty string (prior signal id or source_claim_id)", + "supply the prior signal's id", + )) + rationale = entry.get("rationale") + if not isinstance(rationale, str) or not rationale.strip(): + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.rationale", + "rationale must be a non-empty structural sentence", + "name the structural reason in one sentence", + )) + elif len(rationale.split()) > DROPPED_SIGNAL_RATIONALE_WORD_CAP: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.rationale", + f"rationale is {len(rationale.split())} words; cap is " + f"{DROPPED_SIGNAL_RATIONALE_WORD_CAP}", + f"shorten to {DROPPED_SIGNAL_RATIONALE_WORD_CAP} words", + )) + if reason in DROPPED_SIGNAL_REASONS_NEEDING_REPLACEMENT: + rid = entry.get("replacement_id") + if not isinstance(rid, str) or not rid: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.replacement_id", + f"reason `{reason}` requires a non-empty replacement_id", + "set replacement_id to the current id or output_name that replaces this signal", + )) + elif reason == "replaced_by" and rid not in current_refs: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.replacement_id", + f"replacement_id `{rid}` does not match any current id or output_name", + "rename to an existing current id/output_name, or drop the entry", + )) + elif reason == "moved_to_unmodelled_gate" and rid not in unmodelled_ids: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.replacement_id", + f"replacement_id `{rid}` does not match any unmodelled_gates id", + "set replacement_id to an existing unmodelled_gates entry id", + )) + if reason == "redundant_with": + rid = entry.get("redundant_with_id") + if not isinstance(rid, str) or not rid: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.redundant_with_id", + "reason `redundant_with` requires a non-empty redundant_with_id", + "set redundant_with_id to the current id or output_name that subsumes this signal", + )) + elif rid not in current_refs: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.redundant_with_id", + f"redundant_with_id `{rid}` does not match any current id or output_name", + "rename to an existing current id/output_name, or drop the entry", + )) + if reason == "cap_pressure": + cap_kind = entry.get("cap_kind") + if cap_kind not in CAPS: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.cap_kind", + f"cap_kind `{cap_kind}` is not a capped array name", + f"use one of {sorted(CAPS)}", + )) + else: + cap_size = CAPS[cap_kind] + actual_size = len(params.get(cap_kind, []) or []) + if actual_size < cap_size: + violations.append(violation( + "dropped_signals_schema", "ERROR", f"{path}.cap_kind", + f"cap_pressure claim is not justified: `{cap_kind}` has " + f"{actual_size} entries, below cap {cap_size}", + f"drop this dropped_signals entry, or fill the `{cap_kind}` " + f"array to its cap with the dropped signal first", + )) + + CHECK_FUNCTIONS = { "json_parse": None, # implicit; failure handled in main() "top_level_structure": check_top_level_structure, @@ -619,6 +775,7 @@ def check_requirement_has_margin(params: dict, violations: list) -> None: "shared_pool_legitimacy": check_shared_pool_legitimacy, "aggregate_not_bounded": check_aggregate_not_bounded, "requirement_has_margin": check_requirement_has_margin, + "dropped_signals_schema": check_dropped_signals_schema, }