Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 83 additions & 46 deletions src/agent_scan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import logging
import sys

import aiohttp
import psutil
import rich
from rich.logging import RichHandler
Expand All @@ -23,7 +22,6 @@
from agent_scan.printer import print_scan_result
from agent_scan.upload import get_hostname
from agent_scan.utils import ensure_unicode_console, parse_headers, suppress_stdout
from agent_scan.verify_api import setup_aiohttp_debug_logging, setup_tcp_connector
from agent_scan.version import version_info

# Configure logging to suppress all output by default
Expand Down Expand Up @@ -388,6 +386,70 @@ def main():
# use the same parser as scan
setup_scan_parser(evo_parser)

# GUARD command
guard_parser = subparsers.add_parser(
"guard",
help="Install, uninstall, or check status of Agent Guard hooks",
description="Manage Agent Guard hooks for Claude Code and Cursor.",
)
guard_subparsers = guard_parser.add_subparsers(
dest="guard_command",
title="Guard commands",
description="Available guard commands (default: show status)",
metavar="GUARD_COMMAND",
)

guard_install_parser = guard_subparsers.add_parser(
"install",
help="Install Agent Guard hooks for a client",
)
guard_install_parser.add_argument(
"client",
choices=["claude", "cursor"],
help="Client to install hooks for",
)
guard_install_parser.add_argument(
"--url",
type=str,
default="https://api.snyk.io",
help="Remote hooks base URL (default: https://api.snyk.io)",
)
guard_install_parser.add_argument(
"--tenant-id",
type=str,
default=None,
dest="tenant_id",
help="Snyk tenant ID (required when minting a push key; not needed if PUSH_KEY is set)",
)
guard_install_parser.add_argument(
"--test",
action="store_true",
default=False,
help="Send a test event to verify connectivity before installing hooks",
)
guard_install_parser.add_argument(
"--file",
type=str,
default=None,
help="Override the config file path (default: client-specific well-known path)",
)

guard_uninstall_parser = guard_subparsers.add_parser(
"uninstall",
help="Remove Agent Guard hooks for a client",
)
guard_uninstall_parser.add_argument(
"client",
choices=["claude", "cursor"],
help="Client to uninstall hooks from",
)
guard_uninstall_parser.add_argument(
"--file",
type=str,
default=None,
help="Override the config file path (default: client-specific well-known path)",
)

# Parse arguments (default to 'scan' if no command provided)
if (len(sys.argv) == 1 or sys.argv[1] not in subparsers.choices) and (
not (len(sys.argv) == 2 and sys.argv[1] == "--help")
Expand Down Expand Up @@ -431,6 +493,10 @@ def main():
elif args.command == "evo":
asyncio.run(evo(args))
sys.exit(0)
elif args.command == "guard":
from agent_scan.guard import run_guard

sys.exit(run_guard(args))

else:
# This shouldn't happen due to argparse's handling
Expand All @@ -447,44 +513,28 @@ async def evo(args):
2. Pushes scan results to the Evo API
3. Revokes the client_id
"""
from agent_scan.pushkeys import mint_push_key, revoke_push_key

rich.print(
"Go to https://app.snyk.io and select the tenant on the left nav bar. Copy the Tenant ID from the URL and paste it here: "
"Go to https://app.snyk.io and select the tenant on the left nav bar. "
"Copy the Tenant ID from the URL and paste it here: "
)
tenant_id = input().strip()
rich.print("Paste the Authorization token from https://app.snyk.io/account (API Token -> KEY -> click to show): ")
token = input().strip()

push_key_url = f"https://api.snyk.io/hidden/tenants/{tenant_id}/mcp-scan/push-key?version=2025-08-28"
push_scan_url = "https://api.snyk.io/hidden/mcp-scan/push?version=2025-08-28"
base_url = "https://api.snyk.io"
push_scan_url = f"{base_url}/hidden/mcp-scan/push?version=2025-08-28"

# create a client_id (shared secret)
client_id = None
skip_ssl_verify = getattr(args, "skip_ssl_verify", False)
trace_configs = setup_aiohttp_debug_logging(verbose=False)
# Mint a push key
try:
async with aiohttp.ClientSession(
trace_configs=trace_configs,
connector=setup_tcp_connector(skip_ssl_verify=skip_ssl_verify),
trust_env=True,
) as session:
async with session.post(
push_key_url, data="", headers={"Content-Type": "application/json", "Authorization": f"token {token}"}
) as resp:
if resp.status not in (200, 201):
text = await resp.text()
rich.print(f"[bold red]Request failed[/bold red]: HTTP {resp.status} - {text}")
return
data = await resp.json()
client_id = data.get("client_id")
if not client_id:
rich.print(f"[bold red]Unexpected response[/bold red]: {data}")
return
rich.print("Client ID created")
except Exception as e:
client_id = mint_push_key(base_url, tenant_id, token)
rich.print("Client ID created")
except RuntimeError as e:
rich.print(f"[bold red]Error calling Snyk API[/bold red]: {e}")
return

# Update the default scan args
# Run scan with the push key
args.control_servers = [
ControlServer(
url=push_scan_url,
Expand All @@ -494,24 +544,11 @@ async def evo(args):
]
await run_scan(args, mode="scan")

# revoke the created client_id
del_headers = {
"Content-Type": "application/json",
"Authorization": f"token {token}",
"x-client-id": client_id,
}
# Revoke the push key
try:
async with aiohttp.ClientSession(
trace_configs=trace_configs,
connector=setup_tcp_connector(skip_ssl_verify=skip_ssl_verify),
trust_env=True,
) as session:
async with session.delete(push_key_url, headers=del_headers) as del_resp:
if del_resp.status not in (200, 204):
text = await del_resp.text()
rich.print(f"[bold red]Failed to revoke client_id[/bold red]: HTTP {del_resp.status} - {text}")
rich.print("Client ID revoked")
except Exception as e:
revoke_push_key(base_url, tenant_id, token, client_id)
rich.print("Client ID revoked")
except RuntimeError as e:
rich.print(f"[bold red]Error revoking client_id[/bold red]: {e}")


Expand Down
Loading
Loading