From fb16d4d081f0783b0e21f4216166cc235deba68e Mon Sep 17 00:00:00 2001 From: Csaba Toth Date: Wed, 10 Jun 2026 20:25:57 +0200 Subject: [PATCH] feat(renderer+rules): recursive section chunking + adoption gate for rerun-discovered rules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Recursive chunking: a section chunk still over 8 KB with >=2 subsections splits one level deeper (topic -> section -> entry), the section file becoming a sub-index. OpenMeter validation: data-models/models.md went from 85 KB to a 9.8 KB routing sub-index + 84 per-model files; largest non-index chunk is now ~6 KB. Stale-cleanup prunes nested empty dirs. Adoption gate: on a deep-scan rerun (rules.json already populated), extract_output cmd_rules routes brand-new rule ids to proposed_rules.json instead of activating them — the user adopts/rejects in the viewer's Rules card (existing flow) before hooks enforce. Updates to active ids still apply directly; ignored/pending ids are not re-proposed. First scan keeps auto-adopting the baseline. Co-Authored-By: Claude Fable 5 --- .../deep-scan/steps/step-6-rule-synthesis.md | 7 ++ archie/standalone/extract_output.py | 69 +++++++++++--- archie/standalone/renderer.py | 94 +++++++++++++++---- npm-package/assets/extract_output.py | 69 +++++++++++--- npm-package/assets/renderer.py | 94 +++++++++++++++---- .../deep-scan/steps/step-6-rule-synthesis.md | 7 ++ tests/test_renderer_chunking.py | 19 +++- tests/test_rule_shape.py | 67 ++++++++++++- 8 files changed, 355 insertions(+), 71 deletions(-) diff --git a/archie/assets/workflow/deep-scan/steps/step-6-rule-synthesis.md b/archie/assets/workflow/deep-scan/steps/step-6-rule-synthesis.md index fa1db9f0..65ae51d7 100644 --- a/archie/assets/workflow/deep-scan/steps/step-6-rule-synthesis.md +++ b/archie/assets/workflow/deep-scan/steps/step-6-rule-synthesis.md @@ -356,6 +356,13 @@ python3 .archie/extract_output.py rules .archie/tmp/archie_rules_$PROJECT_NAME.j **IMPORTANT: Do NOT try to extract or parse JSON yourself. Do NOT copy the agent's transcript. Always use the pre-installed scripts on the file the agent already wrote.** +On a rerun (rules.json already had rules), the extractor routes brand-new rule +ids to `.archie/proposed_rules.json` instead of activating them — the user +adopts or rejects them in the viewer's Rules card before hooks enforce them. +If the extractor printed a `NEW rule(s) -> proposed_rules.json` line, tell the +user in your final summary how many rules await review and that they can adopt +them in the Archie viewer's Rules card. + Build the Phase 2 trigger index so the pre-validate hook can narrow candidates fast on every edit: ```bash diff --git a/archie/standalone/extract_output.py b/archie/standalone/extract_output.py index 66563f36..d0db0906 100644 --- a/archie/standalone/extract_output.py +++ b/archie/standalone/extract_output.py @@ -30,6 +30,15 @@ # rules — extract rules JSON from agent output # --------------------------------------------------------------------------- +def _read_rule_ids(path: Path) -> set: + """Rule ids in a {"rules": [...]} file; empty set on missing/malformed.""" + try: + data = json.loads(path.read_text()) + except (OSError, json.JSONDecodeError): + return set() + return {r.get("id") for r in data.get("rules", []) if isinstance(r, dict) and r.get("id")} + + def cmd_rules(input_file: str, output_path: str): """Extract rules JSON from raw agent output, merge with existing rules, save. @@ -37,6 +46,13 @@ def cmd_rules(input_file: str, output_path: str): so downstream tooling and humans can trace lineage even if the model omits the field. Existing `source` values (e.g., `adopted`, `scan`, `scan-amended`) are never overwritten. + + Adoption gate: on a RERUN (output rules.json already has rules), rules with + an id not seen before go to proposed_rules.json — the user adopts or rejects + them in the viewer's Rules card before hooks enforce them. Updates to + already-active ids still apply directly. Ids sitting in proposed_rules.json + or ignored_rules.json are not re-proposed. The first scan (empty baseline) + keeps auto-adopting, otherwise a fresh install would enforce nothing. """ text = Path(input_file).read_text() data = extract_json_from_text(text) @@ -59,25 +75,52 @@ def cmd_rules(input_file: str, output_path: str): # Merge with existing rules — preserve user-adopted rules from prior runs out = Path(output_path) + existing_by_id = {} if out.exists(): try: existing = json.loads(out.read_text()) existing_rules = existing.get("rules", []) - # Index existing rules by id existing_by_id = {r.get("id", ""): r for r in existing_rules if isinstance(r, dict)} - # Index new rules by id - new_by_id = {r.get("id", ""): r for r in new_rules if isinstance(r, dict)} - # Keep existing rules that aren't replaced by new ones (user-adopted rules) - # Also keep existing rules that have source="adopted" — these came from prior incremental runs - preserved = 0 - for rid, rule in existing_by_id.items(): - if rid not in new_by_id: - new_rules.append(rule) - preserved += 1 - if preserved: - print(f" Preserved {preserved} existing rules not in new set", file=sys.stderr) except (json.JSONDecodeError, OSError): - pass + existing_by_id = {} + + if existing_by_id: + # RERUN — route brand-new rules through the proposal queue. + proposed_path = out.parent / "proposed_rules.json" + ignored_ids = _read_rule_ids(out.parent / "ignored_rules.json") + already_proposed = _read_rule_ids(proposed_path) + + active, to_propose = [], [] + for r in new_rules: + rid = r.get("id") if isinstance(r, dict) else None + if rid in existing_by_id: + active.append(r) # update of an already-active rule + elif rid in ignored_ids or rid in already_proposed: + continue # user already rejected it, or it's awaiting review + else: + to_propose.append(r) + + new_by_id = {r.get("id", ""): r for r in active if isinstance(r, dict)} + preserved = 0 + for rid, rule in existing_by_id.items(): + if rid not in new_by_id: + active.append(rule) + preserved += 1 + if preserved: + print(f" Preserved {preserved} existing rules not in new set", file=sys.stderr) + + if to_propose: + try: + proposed = json.loads(proposed_path.read_text()) + except (OSError, json.JSONDecodeError): + proposed = {} + proposed.setdefault("rules", []).extend(to_propose) + proposed_path.write_text(json.dumps(proposed, indent=2)) + print(f" {len(to_propose)} NEW rule(s) -> {proposed_path.name} — " + f"awaiting adoption (review in /archie-viewer Rules card); " + f"hooks will not enforce them until adopted", file=sys.stderr) + + new_rules = active data["rules"] = new_rules out.write_text(json.dumps(data, indent=2)) diff --git a/archie/standalone/renderer.py b/archie/standalone/renderer.py index d308388f..36cdd791 100644 --- a/archie/standalone/renderer.py +++ b/archie/standalone/renderer.py @@ -1309,18 +1309,39 @@ def _est_tokens(text: str) -> int: return max(1, len(text) // _CHARS_PER_TOKEN) -def _chunk_topic_file(rule: dict, level: int = 2) -> dict: - """Return {relative_path: content} for one oversized topic rule: - `.md` index + `/.md` chunks.""" - topic = rule["topic"] - preamble, sections = _split_h2_sections(rule["body"], level) - # A split below H2 leaves the wrapping heading dangling at the end of - # the preamble — drop trailing heading-only lines. - pre_lines = preamble.splitlines() - while pre_lines and (not pre_lines[-1].strip() or pre_lines[-1].startswith("#")): - pre_lines.pop() - preamble = "\n".join(pre_lines).strip() +# An oversized section chunk recurses one heading level deeper (topic → +# section → entry), so e.g. an 85 KB Models section becomes per-model files +# behind a sub-index. Depth is capped: entries below H4 don't split further. +_MAX_CHUNK_DEPTH = 2 + + +def _strip_dangling_headings(preamble: str) -> str: + """A split below the top level leaves the wrapping heading dangling at + the end of the preamble — drop trailing heading-only/blank lines.""" + lines = preamble.splitlines() + while lines and (not lines[-1].strip() or lines[-1].startswith("#")): + lines.pop() + return "\n".join(lines).strip() + +def _chunk_level(rule: dict, title: str, index_title: str, body: str, + rel_dir: str, intro: str, level: int, depth: int) -> dict: + """Chunk `body` at `level` headings into files under `rel_dir`/ and + return {rel_path: content} including `rel_dir`.md as the routing index. + + Recurses one level deeper for sections that are still oversized and have + enough subsections, turning the section file into a sub-index. + """ + preamble = "" + sections: list[tuple[str, str]] = [] + for lv in (level, level + 1): + preamble, sections = _split_h2_sections(body, lv) + if len(sections) >= 2: + level = lv + break + preamble = _strip_dangling_headings(preamble) + + dirname = rel_dir.rsplit("/", 1)[-1] out: dict[str, str] = {} rows: list[str] = [] seen: dict[str, int] = {} @@ -1331,20 +1352,32 @@ def _chunk_topic_file(rule: dict, level: int = 2) -> dict: slug = f"{slug}-{seen[slug]}" else: seen[slug] = 1 - chunk_body = f"# {topic.replace('-', ' ').title()}: {heading}\n\n{text}\n" - out[f"{topic}/{slug}.md"] = _render_claude({**rule, "body": chunk_body}) + chunk_title = f"{title}: {heading}" + chunk_body = f"# {chunk_title}\n\n{text}\n" + rel_path = f"{rel_dir}/{slug}.md" + rendered = _render_claude({**rule, "body": chunk_body}) + _, subsections = _split_h2_sections(text, level + 1) + if (depth < _MAX_CHUNK_DEPTH + and len(rendered.encode("utf-8")) > _CHUNK_THRESHOLD_BYTES + and len(subsections) >= 2): + out.update(_chunk_level( + rule, chunk_title, chunk_title, text, f"{rel_dir}/{slug}", + f"This section is chunked. Load only the entry file(s) under " + f"`{slug}/` relevant to your task — this index is the routing table.", + level + 1, depth + 1, + )) + else: + out[rel_path] = rendered summary = _section_summary(text) rows.append( - f"| {_escape_table_cell(heading)} | [`{topic}/{slug}.md`]({topic}/{slug}.md) " + f"| {_escape_table_cell(heading)} | [`{dirname}/{slug}.md`]({dirname}/{slug}.md) " f"| ~{_est_tokens(chunk_body)} | {_escape_table_cell(summary)} |" ) index_lines = [ - f"# {rule.get('description') or topic}", + f"# {index_title}", "", - f"This topic is chunked. Load only the section file(s) under " - f"`.claude/rules/{topic}/` relevant to your task — this index is the " - f"routing table.", + intro, "", "| Section | File | ~Tokens | Contains |", "|---------|------|---------|----------|", @@ -1352,12 +1385,31 @@ def _chunk_topic_file(rule: dict, level: int = 2) -> dict: ] if preamble: index_lines += ["", preamble] - out[f"{topic}.md"] = _render_claude( + out[f"{rel_dir}.md"] = _render_claude( {**rule, "body": "\n".join(index_lines).rstrip() + "\n"} ) return out +def _chunk_topic_file(rule: dict, level: int = 2) -> dict: + """Return {relative_path: content} for one oversized topic rule: + `.md` index + `/.md` chunks (recursing into + `/
/.md` when a section is itself oversized).""" + topic = rule["topic"] + return _chunk_level( + rule, + topic.replace("-", " ").title(), + rule.get("description") or topic, + rule["body"], + topic, + f"This topic is chunked. Load only the section file(s) under " + f"`.claude/rules/{topic}/` relevant to your task — this index is the " + f"routing table.", + level, + 1, + ) + + def _render_topic_files(rule: dict) -> dict: """Render one topic rule into its output file(s), chunking when the rendered body crosses the size threshold and has enough H2 sections.""" @@ -2127,6 +2179,10 @@ def _rm(p: Path): rel = str(md.relative_to(project_root)) if rel not in files: _rm(md) + # Prune empty dirs bottom-up (nested entry dirs first, then the topic dir). + for sub in sorted((d for d in chunk_dir.rglob("*") if d.is_dir()), reverse=True): + if not any(sub.iterdir()): + sub.rmdir() if not any(chunk_dir.iterdir()): chunk_dir.rmdir() # Stale enforcement by-topic files (topic disappeared from rules.json). diff --git a/npm-package/assets/extract_output.py b/npm-package/assets/extract_output.py index 66563f36..d0db0906 100644 --- a/npm-package/assets/extract_output.py +++ b/npm-package/assets/extract_output.py @@ -30,6 +30,15 @@ # rules — extract rules JSON from agent output # --------------------------------------------------------------------------- +def _read_rule_ids(path: Path) -> set: + """Rule ids in a {"rules": [...]} file; empty set on missing/malformed.""" + try: + data = json.loads(path.read_text()) + except (OSError, json.JSONDecodeError): + return set() + return {r.get("id") for r in data.get("rules", []) if isinstance(r, dict) and r.get("id")} + + def cmd_rules(input_file: str, output_path: str): """Extract rules JSON from raw agent output, merge with existing rules, save. @@ -37,6 +46,13 @@ def cmd_rules(input_file: str, output_path: str): so downstream tooling and humans can trace lineage even if the model omits the field. Existing `source` values (e.g., `adopted`, `scan`, `scan-amended`) are never overwritten. + + Adoption gate: on a RERUN (output rules.json already has rules), rules with + an id not seen before go to proposed_rules.json — the user adopts or rejects + them in the viewer's Rules card before hooks enforce them. Updates to + already-active ids still apply directly. Ids sitting in proposed_rules.json + or ignored_rules.json are not re-proposed. The first scan (empty baseline) + keeps auto-adopting, otherwise a fresh install would enforce nothing. """ text = Path(input_file).read_text() data = extract_json_from_text(text) @@ -59,25 +75,52 @@ def cmd_rules(input_file: str, output_path: str): # Merge with existing rules — preserve user-adopted rules from prior runs out = Path(output_path) + existing_by_id = {} if out.exists(): try: existing = json.loads(out.read_text()) existing_rules = existing.get("rules", []) - # Index existing rules by id existing_by_id = {r.get("id", ""): r for r in existing_rules if isinstance(r, dict)} - # Index new rules by id - new_by_id = {r.get("id", ""): r for r in new_rules if isinstance(r, dict)} - # Keep existing rules that aren't replaced by new ones (user-adopted rules) - # Also keep existing rules that have source="adopted" — these came from prior incremental runs - preserved = 0 - for rid, rule in existing_by_id.items(): - if rid not in new_by_id: - new_rules.append(rule) - preserved += 1 - if preserved: - print(f" Preserved {preserved} existing rules not in new set", file=sys.stderr) except (json.JSONDecodeError, OSError): - pass + existing_by_id = {} + + if existing_by_id: + # RERUN — route brand-new rules through the proposal queue. + proposed_path = out.parent / "proposed_rules.json" + ignored_ids = _read_rule_ids(out.parent / "ignored_rules.json") + already_proposed = _read_rule_ids(proposed_path) + + active, to_propose = [], [] + for r in new_rules: + rid = r.get("id") if isinstance(r, dict) else None + if rid in existing_by_id: + active.append(r) # update of an already-active rule + elif rid in ignored_ids or rid in already_proposed: + continue # user already rejected it, or it's awaiting review + else: + to_propose.append(r) + + new_by_id = {r.get("id", ""): r for r in active if isinstance(r, dict)} + preserved = 0 + for rid, rule in existing_by_id.items(): + if rid not in new_by_id: + active.append(rule) + preserved += 1 + if preserved: + print(f" Preserved {preserved} existing rules not in new set", file=sys.stderr) + + if to_propose: + try: + proposed = json.loads(proposed_path.read_text()) + except (OSError, json.JSONDecodeError): + proposed = {} + proposed.setdefault("rules", []).extend(to_propose) + proposed_path.write_text(json.dumps(proposed, indent=2)) + print(f" {len(to_propose)} NEW rule(s) -> {proposed_path.name} — " + f"awaiting adoption (review in /archie-viewer Rules card); " + f"hooks will not enforce them until adopted", file=sys.stderr) + + new_rules = active data["rules"] = new_rules out.write_text(json.dumps(data, indent=2)) diff --git a/npm-package/assets/renderer.py b/npm-package/assets/renderer.py index d308388f..36cdd791 100644 --- a/npm-package/assets/renderer.py +++ b/npm-package/assets/renderer.py @@ -1309,18 +1309,39 @@ def _est_tokens(text: str) -> int: return max(1, len(text) // _CHARS_PER_TOKEN) -def _chunk_topic_file(rule: dict, level: int = 2) -> dict: - """Return {relative_path: content} for one oversized topic rule: - `.md` index + `/.md` chunks.""" - topic = rule["topic"] - preamble, sections = _split_h2_sections(rule["body"], level) - # A split below H2 leaves the wrapping heading dangling at the end of - # the preamble — drop trailing heading-only lines. - pre_lines = preamble.splitlines() - while pre_lines and (not pre_lines[-1].strip() or pre_lines[-1].startswith("#")): - pre_lines.pop() - preamble = "\n".join(pre_lines).strip() +# An oversized section chunk recurses one heading level deeper (topic → +# section → entry), so e.g. an 85 KB Models section becomes per-model files +# behind a sub-index. Depth is capped: entries below H4 don't split further. +_MAX_CHUNK_DEPTH = 2 + + +def _strip_dangling_headings(preamble: str) -> str: + """A split below the top level leaves the wrapping heading dangling at + the end of the preamble — drop trailing heading-only/blank lines.""" + lines = preamble.splitlines() + while lines and (not lines[-1].strip() or lines[-1].startswith("#")): + lines.pop() + return "\n".join(lines).strip() + +def _chunk_level(rule: dict, title: str, index_title: str, body: str, + rel_dir: str, intro: str, level: int, depth: int) -> dict: + """Chunk `body` at `level` headings into files under `rel_dir`/ and + return {rel_path: content} including `rel_dir`.md as the routing index. + + Recurses one level deeper for sections that are still oversized and have + enough subsections, turning the section file into a sub-index. + """ + preamble = "" + sections: list[tuple[str, str]] = [] + for lv in (level, level + 1): + preamble, sections = _split_h2_sections(body, lv) + if len(sections) >= 2: + level = lv + break + preamble = _strip_dangling_headings(preamble) + + dirname = rel_dir.rsplit("/", 1)[-1] out: dict[str, str] = {} rows: list[str] = [] seen: dict[str, int] = {} @@ -1331,20 +1352,32 @@ def _chunk_topic_file(rule: dict, level: int = 2) -> dict: slug = f"{slug}-{seen[slug]}" else: seen[slug] = 1 - chunk_body = f"# {topic.replace('-', ' ').title()}: {heading}\n\n{text}\n" - out[f"{topic}/{slug}.md"] = _render_claude({**rule, "body": chunk_body}) + chunk_title = f"{title}: {heading}" + chunk_body = f"# {chunk_title}\n\n{text}\n" + rel_path = f"{rel_dir}/{slug}.md" + rendered = _render_claude({**rule, "body": chunk_body}) + _, subsections = _split_h2_sections(text, level + 1) + if (depth < _MAX_CHUNK_DEPTH + and len(rendered.encode("utf-8")) > _CHUNK_THRESHOLD_BYTES + and len(subsections) >= 2): + out.update(_chunk_level( + rule, chunk_title, chunk_title, text, f"{rel_dir}/{slug}", + f"This section is chunked. Load only the entry file(s) under " + f"`{slug}/` relevant to your task — this index is the routing table.", + level + 1, depth + 1, + )) + else: + out[rel_path] = rendered summary = _section_summary(text) rows.append( - f"| {_escape_table_cell(heading)} | [`{topic}/{slug}.md`]({topic}/{slug}.md) " + f"| {_escape_table_cell(heading)} | [`{dirname}/{slug}.md`]({dirname}/{slug}.md) " f"| ~{_est_tokens(chunk_body)} | {_escape_table_cell(summary)} |" ) index_lines = [ - f"# {rule.get('description') or topic}", + f"# {index_title}", "", - f"This topic is chunked. Load only the section file(s) under " - f"`.claude/rules/{topic}/` relevant to your task — this index is the " - f"routing table.", + intro, "", "| Section | File | ~Tokens | Contains |", "|---------|------|---------|----------|", @@ -1352,12 +1385,31 @@ def _chunk_topic_file(rule: dict, level: int = 2) -> dict: ] if preamble: index_lines += ["", preamble] - out[f"{topic}.md"] = _render_claude( + out[f"{rel_dir}.md"] = _render_claude( {**rule, "body": "\n".join(index_lines).rstrip() + "\n"} ) return out +def _chunk_topic_file(rule: dict, level: int = 2) -> dict: + """Return {relative_path: content} for one oversized topic rule: + `.md` index + `/.md` chunks (recursing into + `/
/.md` when a section is itself oversized).""" + topic = rule["topic"] + return _chunk_level( + rule, + topic.replace("-", " ").title(), + rule.get("description") or topic, + rule["body"], + topic, + f"This topic is chunked. Load only the section file(s) under " + f"`.claude/rules/{topic}/` relevant to your task — this index is the " + f"routing table.", + level, + 1, + ) + + def _render_topic_files(rule: dict) -> dict: """Render one topic rule into its output file(s), chunking when the rendered body crosses the size threshold and has enough H2 sections.""" @@ -2127,6 +2179,10 @@ def _rm(p: Path): rel = str(md.relative_to(project_root)) if rel not in files: _rm(md) + # Prune empty dirs bottom-up (nested entry dirs first, then the topic dir). + for sub in sorted((d for d in chunk_dir.rglob("*") if d.is_dir()), reverse=True): + if not any(sub.iterdir()): + sub.rmdir() if not any(chunk_dir.iterdir()): chunk_dir.rmdir() # Stale enforcement by-topic files (topic disappeared from rules.json). diff --git a/npm-package/assets/workflow/deep-scan/steps/step-6-rule-synthesis.md b/npm-package/assets/workflow/deep-scan/steps/step-6-rule-synthesis.md index fa1db9f0..65ae51d7 100644 --- a/npm-package/assets/workflow/deep-scan/steps/step-6-rule-synthesis.md +++ b/npm-package/assets/workflow/deep-scan/steps/step-6-rule-synthesis.md @@ -356,6 +356,13 @@ python3 .archie/extract_output.py rules .archie/tmp/archie_rules_$PROJECT_NAME.j **IMPORTANT: Do NOT try to extract or parse JSON yourself. Do NOT copy the agent's transcript. Always use the pre-installed scripts on the file the agent already wrote.** +On a rerun (rules.json already had rules), the extractor routes brand-new rule +ids to `.archie/proposed_rules.json` instead of activating them — the user +adopts or rejects them in the viewer's Rules card before hooks enforce them. +If the extractor printed a `NEW rule(s) -> proposed_rules.json` line, tell the +user in your final summary how many rules await review and that they can adopt +them in the Archie viewer's Rules card. + Build the Phase 2 trigger index so the pre-validate hook can narrow candidates fast on every edit: ```bash diff --git a/tests/test_renderer_chunking.py b/tests/test_renderer_chunking.py index 29d354ab..8e207d3d 100644 --- a/tests/test_renderer_chunking.py +++ b/tests/test_renderer_chunking.py @@ -53,13 +53,28 @@ def test_large_topic_chunks_into_index_plus_sections() -> None: chunks = [p for p in files if p.startswith(".claude/rules/patterns/")] assert ".claude/rules/patterns/communication-patterns.md" in chunks - # Index must reference every chunk it emitted. + # Every chunk must be reachable from its parent index (top index for + # section files, the section sub-index for recursed entry files). for p in chunks: - assert p.removeprefix(".claude/rules/") in index + parent_index = files[str(Path(p).parent) + ".md"] + assert f"/{Path(p).name})" in parent_index # Index is small relative to the would-be monolith. assert len(index.encode()) < renderer._CHUNK_THRESHOLD_BYTES +def test_oversized_section_recurses_into_entry_chunks() -> None: + files = renderer.generate_all(_patterns_blueprint(30)) + sub_index = files[".claude/rules/patterns/communication-patterns.md"] + assert "This section is chunked" in sub_index + entries = [p for p in files + if p.startswith(".claude/rules/patterns/communication-patterns/")] + assert len(entries) == 30 + entry = files[".claude/rules/patterns/communication-patterns/pattern-0.md"] + assert entry.startswith("# Patterns: Communication Patterns: Pattern 0") + # Recursion is depth-capped: entry files never spawn their own dirs. + assert not any(p.count("/") > 4 for p in entries) + + def test_chunk_carries_topic_and_section_heading() -> None: files = renderer.generate_all(_patterns_blueprint(30)) chunk = files[".claude/rules/patterns/communication-patterns.md"] diff --git a/tests/test_rule_shape.py b/tests/test_rule_shape.py index 94ddc1da..b5bf0827 100644 --- a/tests/test_rule_shape.py +++ b/tests/test_rule_shape.py @@ -283,10 +283,9 @@ def test_cmd_rules_stamps_missing_source(tmp_path: Path) -> None: assert by_id["explicit"]["source"] == "scan", "existing source got overwritten" -def test_cmd_rules_preserves_adopted_rules(tmp_path: Path) -> None: - """Existing rules with id not in new set should be preserved (today's - behavior). New behavior: still preserved AND any of those without - source remain untouched (cmd_rules only stamps NEW rules).""" +def test_cmd_rules_preserves_adopted_rules_and_proposes_new(tmp_path: Path) -> None: + """On a rerun, existing rules are preserved; brand-new ids are NOT + auto-adopted — they land in proposed_rules.json awaiting user review.""" out_path = tmp_path / "rules.json" out_path.write_text(json.dumps({"rules": [ {"id": "old-1", "description": "kept", "source": "adopted"}, @@ -301,7 +300,65 @@ def test_cmd_rules_preserves_adopted_rules(tmp_path: Path) -> None: saved = {r["id"]: r for r in json.loads(out_path.read_text())["rules"]} assert "old-1" in saved assert saved["old-1"]["source"] == "adopted" - assert saved["new-1"]["source"] == "deep_scan" + assert "new-1" not in saved, "rerun must not auto-adopt a brand-new rule" + + proposed = {r["id"]: r for r in + json.loads((tmp_path / "proposed_rules.json").read_text())["rules"]} + assert proposed["new-1"]["source"] == "deep_scan" + + +def test_cmd_rules_first_scan_auto_adopts(tmp_path: Path) -> None: + """First scan (no/empty rules.json) keeps auto-adopting the baseline.""" + raw_input = tmp_path / "raw.txt" + raw_input.write_text(json.dumps({"rules": [{"id": "r1", "description": "x"}]})) + out_path = tmp_path / "rules.json" + + _extract_output.cmd_rules(str(raw_input), str(out_path)) + + saved = {r["id"] for r in json.loads(out_path.read_text())["rules"]} + assert saved == {"r1"} + assert not (tmp_path / "proposed_rules.json").exists() + + +def test_cmd_rules_rerun_updates_active_rule_in_place(tmp_path: Path) -> None: + """A re-emitted rule whose id is already active updates rules.json + directly (refinement, not a new proposal).""" + out_path = tmp_path / "rules.json" + out_path.write_text(json.dumps({"rules": [ + {"id": "r1", "description": "old wording", "source": "deep_scan"}, + ]})) + raw_input = tmp_path / "raw.txt" + raw_input.write_text(json.dumps({"rules": [ + {"id": "r1", "description": "refined wording", "source": "deep_scan"}, + ]})) + + _extract_output.cmd_rules(str(raw_input), str(out_path)) + + saved = {r["id"]: r for r in json.loads(out_path.read_text())["rules"]} + assert saved["r1"]["description"] == "refined wording" + assert not (tmp_path / "proposed_rules.json").exists() + + +def test_cmd_rules_rerun_skips_ignored_and_already_proposed(tmp_path: Path) -> None: + """Rejected rules are never re-proposed; pending proposals don't duplicate.""" + out_path = tmp_path / "rules.json" + out_path.write_text(json.dumps({"rules": [{"id": "active-1", "source": "deep_scan"}]})) + (tmp_path / "ignored_rules.json").write_text(json.dumps({"rules": [{"id": "rejected-1"}]})) + (tmp_path / "proposed_rules.json").write_text(json.dumps({"rules": [{"id": "pending-1"}]})) + raw_input = tmp_path / "raw.txt" + raw_input.write_text(json.dumps({"rules": [ + {"id": "rejected-1", "description": "came back"}, + {"id": "pending-1", "description": "still pending"}, + {"id": "fresh-1", "description": "genuinely new"}, + ]})) + + _extract_output.cmd_rules(str(raw_input), str(out_path)) + + proposed_ids = [r["id"] for r in + json.loads((tmp_path / "proposed_rules.json").read_text())["rules"]] + assert proposed_ids == ["pending-1", "fresh-1"], "no dup, no resurrected reject" + saved_ids = {r["id"] for r in json.loads(out_path.read_text())["rules"]} + assert saved_ids == {"active-1"} # ---------------------------------------------------------------------------