Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ python3 .archie/extract_output.py rules .archie/tmp/archie_rules_$PROJECT_NAME.j

**IMPORTANT: Do NOT try to extract or parse JSON yourself. Do NOT copy the agent's transcript. Always use the pre-installed scripts on the file the agent already wrote.**

On a rerun (rules.json already had rules), the extractor routes brand-new rule
ids to `.archie/proposed_rules.json` instead of activating them — the user
adopts or rejects them in the viewer's Rules card before hooks enforce them.
If the extractor printed a `NEW rule(s) -> proposed_rules.json` line, tell the
user in your final summary how many rules await review and that they can adopt
them in the Archie viewer's Rules card.

Build the Phase 2 trigger index so the pre-validate hook can narrow candidates fast on every edit:

```bash
Expand Down
69 changes: 56 additions & 13 deletions archie/standalone/extract_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,29 @@
# rules — extract rules JSON from agent output
# ---------------------------------------------------------------------------

def _read_rule_ids(path: Path) -> set:
"""Rule ids in a {"rules": [...]} file; empty set on missing/malformed."""
try:
data = json.loads(path.read_text())
except (OSError, json.JSONDecodeError):
return set()
return {r.get("id") for r in data.get("rules", []) if isinstance(r, dict) and r.get("id")}


def cmd_rules(input_file: str, output_path: str):
"""Extract rules JSON from raw agent output, merge with existing rules, save.

Defensively stamps `source: "deep_scan"` on any new rule emitted without one,
so downstream tooling and humans can trace lineage even if the model omits
the field. Existing `source` values (e.g., `adopted`, `scan`, `scan-amended`)
are never overwritten.

Adoption gate: on a RERUN (output rules.json already has rules), rules with
an id not seen before go to proposed_rules.json — the user adopts or rejects
them in the viewer's Rules card before hooks enforce them. Updates to
already-active ids still apply directly. Ids sitting in proposed_rules.json
or ignored_rules.json are not re-proposed. The first scan (empty baseline)
keeps auto-adopting, otherwise a fresh install would enforce nothing.
"""
text = Path(input_file).read_text()
data = extract_json_from_text(text)
Expand All @@ -59,25 +75,52 @@ def cmd_rules(input_file: str, output_path: str):

# Merge with existing rules — preserve user-adopted rules from prior runs
out = Path(output_path)
existing_by_id = {}
if out.exists():
try:
existing = json.loads(out.read_text())
existing_rules = existing.get("rules", [])
# Index existing rules by id
existing_by_id = {r.get("id", ""): r for r in existing_rules if isinstance(r, dict)}
# Index new rules by id
new_by_id = {r.get("id", ""): r for r in new_rules if isinstance(r, dict)}
# Keep existing rules that aren't replaced by new ones (user-adopted rules)
# Also keep existing rules that have source="adopted" — these came from prior incremental runs
preserved = 0
for rid, rule in existing_by_id.items():
if rid not in new_by_id:
new_rules.append(rule)
preserved += 1
if preserved:
print(f" Preserved {preserved} existing rules not in new set", file=sys.stderr)
except (json.JSONDecodeError, OSError):
pass
existing_by_id = {}

if existing_by_id:
# RERUN — route brand-new rules through the proposal queue.
proposed_path = out.parent / "proposed_rules.json"
ignored_ids = _read_rule_ids(out.parent / "ignored_rules.json")
already_proposed = _read_rule_ids(proposed_path)

active, to_propose = [], []
for r in new_rules:
rid = r.get("id") if isinstance(r, dict) else None
if rid in existing_by_id:
active.append(r) # update of an already-active rule
elif rid in ignored_ids or rid in already_proposed:
continue # user already rejected it, or it's awaiting review
else:
to_propose.append(r)

new_by_id = {r.get("id", ""): r for r in active if isinstance(r, dict)}
preserved = 0
for rid, rule in existing_by_id.items():
if rid not in new_by_id:
active.append(rule)
preserved += 1
if preserved:
print(f" Preserved {preserved} existing rules not in new set", file=sys.stderr)

if to_propose:
try:
proposed = json.loads(proposed_path.read_text())
except (OSError, json.JSONDecodeError):
proposed = {}
proposed.setdefault("rules", []).extend(to_propose)
proposed_path.write_text(json.dumps(proposed, indent=2))
print(f" {len(to_propose)} NEW rule(s) -> {proposed_path.name} — "
f"awaiting adoption (review in /archie-viewer Rules card); "
f"hooks will not enforce them until adopted", file=sys.stderr)

new_rules = active

data["rules"] = new_rules
out.write_text(json.dumps(data, indent=2))
Expand Down
94 changes: 75 additions & 19 deletions archie/standalone/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1309,18 +1309,39 @@ def _est_tokens(text: str) -> int:
return max(1, len(text) // _CHARS_PER_TOKEN)


def _chunk_topic_file(rule: dict, level: int = 2) -> dict:
"""Return {relative_path: content} for one oversized topic rule:
`<topic>.md` index + `<topic>/<section-slug>.md` chunks."""
topic = rule["topic"]
preamble, sections = _split_h2_sections(rule["body"], level)
# A split below H2 leaves the wrapping heading dangling at the end of
# the preamble — drop trailing heading-only lines.
pre_lines = preamble.splitlines()
while pre_lines and (not pre_lines[-1].strip() or pre_lines[-1].startswith("#")):
pre_lines.pop()
preamble = "\n".join(pre_lines).strip()
# An oversized section chunk recurses one heading level deeper (topic →
# section → entry), so e.g. an 85 KB Models section becomes per-model files
# behind a sub-index. Depth is capped: entries below H4 don't split further.
_MAX_CHUNK_DEPTH = 2


def _strip_dangling_headings(preamble: str) -> str:
"""A split below the top level leaves the wrapping heading dangling at
the end of the preamble — drop trailing heading-only/blank lines."""
lines = preamble.splitlines()
while lines and (not lines[-1].strip() or lines[-1].startswith("#")):
lines.pop()
return "\n".join(lines).strip()


def _chunk_level(rule: dict, title: str, index_title: str, body: str,
rel_dir: str, intro: str, level: int, depth: int) -> dict:
"""Chunk `body` at `level` headings into files under `rel_dir`/ and
return {rel_path: content} including `rel_dir`.md as the routing index.

Recurses one level deeper for sections that are still oversized and have
enough subsections, turning the section file into a sub-index.
"""
preamble = ""
sections: list[tuple[str, str]] = []
for lv in (level, level + 1):
preamble, sections = _split_h2_sections(body, lv)
if len(sections) >= 2:
level = lv
break
preamble = _strip_dangling_headings(preamble)

dirname = rel_dir.rsplit("/", 1)[-1]
out: dict[str, str] = {}
rows: list[str] = []
seen: dict[str, int] = {}
Expand All @@ -1331,33 +1352,64 @@ def _chunk_topic_file(rule: dict, level: int = 2) -> dict:
slug = f"{slug}-{seen[slug]}"
else:
seen[slug] = 1
chunk_body = f"# {topic.replace('-', ' ').title()}: {heading}\n\n{text}\n"
out[f"{topic}/{slug}.md"] = _render_claude({**rule, "body": chunk_body})
chunk_title = f"{title}: {heading}"
chunk_body = f"# {chunk_title}\n\n{text}\n"
rel_path = f"{rel_dir}/{slug}.md"
rendered = _render_claude({**rule, "body": chunk_body})
_, subsections = _split_h2_sections(text, level + 1)
if (depth < _MAX_CHUNK_DEPTH
and len(rendered.encode("utf-8")) > _CHUNK_THRESHOLD_BYTES
and len(subsections) >= 2):
out.update(_chunk_level(
rule, chunk_title, chunk_title, text, f"{rel_dir}/{slug}",
f"This section is chunked. Load only the entry file(s) under "
f"`{slug}/` relevant to your task — this index is the routing table.",
level + 1, depth + 1,
))
else:
out[rel_path] = rendered
summary = _section_summary(text)
rows.append(
f"| {_escape_table_cell(heading)} | [`{topic}/{slug}.md`]({topic}/{slug}.md) "
f"| {_escape_table_cell(heading)} | [`{dirname}/{slug}.md`]({dirname}/{slug}.md) "
f"| ~{_est_tokens(chunk_body)} | {_escape_table_cell(summary)} |"
)

index_lines = [
f"# {rule.get('description') or topic}",
f"# {index_title}",
"",
f"This topic is chunked. Load only the section file(s) under "
f"`.claude/rules/{topic}/` relevant to your task — this index is the "
f"routing table.",
intro,
"",
"| Section | File | ~Tokens | Contains |",
"|---------|------|---------|----------|",
*rows,
]
if preamble:
index_lines += ["", preamble]
out[f"{topic}.md"] = _render_claude(
out[f"{rel_dir}.md"] = _render_claude(
{**rule, "body": "\n".join(index_lines).rstrip() + "\n"}
)
return out


def _chunk_topic_file(rule: dict, level: int = 2) -> dict:
"""Return {relative_path: content} for one oversized topic rule:
`<topic>.md` index + `<topic>/<section-slug>.md` chunks (recursing into
`<topic>/<section>/<entry>.md` when a section is itself oversized)."""
topic = rule["topic"]
return _chunk_level(
rule,
topic.replace("-", " ").title(),
rule.get("description") or topic,
rule["body"],
topic,
f"This topic is chunked. Load only the section file(s) under "
f"`.claude/rules/{topic}/` relevant to your task — this index is the "
f"routing table.",
level,
1,
)


def _render_topic_files(rule: dict) -> dict:
"""Render one topic rule into its output file(s), chunking when the
rendered body crosses the size threshold and has enough H2 sections."""
Expand Down Expand Up @@ -2127,6 +2179,10 @@ def _rm(p: Path):
rel = str(md.relative_to(project_root))
if rel not in files:
_rm(md)
# Prune empty dirs bottom-up (nested entry dirs first, then the topic dir).
for sub in sorted((d for d in chunk_dir.rglob("*") if d.is_dir()), reverse=True):
if not any(sub.iterdir()):
sub.rmdir()
if not any(chunk_dir.iterdir()):
chunk_dir.rmdir()
# Stale enforcement by-topic files (topic disappeared from rules.json).
Expand Down
69 changes: 56 additions & 13 deletions npm-package/assets/extract_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,29 @@
# rules — extract rules JSON from agent output
# ---------------------------------------------------------------------------

def _read_rule_ids(path: Path) -> set:
"""Rule ids in a {"rules": [...]} file; empty set on missing/malformed."""
try:
data = json.loads(path.read_text())
except (OSError, json.JSONDecodeError):
return set()
return {r.get("id") for r in data.get("rules", []) if isinstance(r, dict) and r.get("id")}


def cmd_rules(input_file: str, output_path: str):
"""Extract rules JSON from raw agent output, merge with existing rules, save.

Defensively stamps `source: "deep_scan"` on any new rule emitted without one,
so downstream tooling and humans can trace lineage even if the model omits
the field. Existing `source` values (e.g., `adopted`, `scan`, `scan-amended`)
are never overwritten.

Adoption gate: on a RERUN (output rules.json already has rules), rules with
an id not seen before go to proposed_rules.json — the user adopts or rejects
them in the viewer's Rules card before hooks enforce them. Updates to
already-active ids still apply directly. Ids sitting in proposed_rules.json
or ignored_rules.json are not re-proposed. The first scan (empty baseline)
keeps auto-adopting, otherwise a fresh install would enforce nothing.
"""
text = Path(input_file).read_text()
data = extract_json_from_text(text)
Expand All @@ -59,25 +75,52 @@ def cmd_rules(input_file: str, output_path: str):

# Merge with existing rules — preserve user-adopted rules from prior runs
out = Path(output_path)
existing_by_id = {}
if out.exists():
try:
existing = json.loads(out.read_text())
existing_rules = existing.get("rules", [])
# Index existing rules by id
existing_by_id = {r.get("id", ""): r for r in existing_rules if isinstance(r, dict)}
# Index new rules by id
new_by_id = {r.get("id", ""): r for r in new_rules if isinstance(r, dict)}
# Keep existing rules that aren't replaced by new ones (user-adopted rules)
# Also keep existing rules that have source="adopted" — these came from prior incremental runs
preserved = 0
for rid, rule in existing_by_id.items():
if rid not in new_by_id:
new_rules.append(rule)
preserved += 1
if preserved:
print(f" Preserved {preserved} existing rules not in new set", file=sys.stderr)
except (json.JSONDecodeError, OSError):
pass
existing_by_id = {}

if existing_by_id:
# RERUN — route brand-new rules through the proposal queue.
proposed_path = out.parent / "proposed_rules.json"
ignored_ids = _read_rule_ids(out.parent / "ignored_rules.json")
already_proposed = _read_rule_ids(proposed_path)

active, to_propose = [], []
for r in new_rules:
rid = r.get("id") if isinstance(r, dict) else None
if rid in existing_by_id:
active.append(r) # update of an already-active rule
elif rid in ignored_ids or rid in already_proposed:
continue # user already rejected it, or it's awaiting review
else:
to_propose.append(r)

new_by_id = {r.get("id", ""): r for r in active if isinstance(r, dict)}
preserved = 0
for rid, rule in existing_by_id.items():
if rid not in new_by_id:
active.append(rule)
preserved += 1
if preserved:
print(f" Preserved {preserved} existing rules not in new set", file=sys.stderr)

if to_propose:
try:
proposed = json.loads(proposed_path.read_text())
except (OSError, json.JSONDecodeError):
proposed = {}
proposed.setdefault("rules", []).extend(to_propose)
proposed_path.write_text(json.dumps(proposed, indent=2))
print(f" {len(to_propose)} NEW rule(s) -> {proposed_path.name} — "
f"awaiting adoption (review in /archie-viewer Rules card); "
f"hooks will not enforce them until adopted", file=sys.stderr)

new_rules = active

data["rules"] = new_rules
out.write_text(json.dumps(data, indent=2))
Expand Down
Loading