diff --git a/components/runners/ambient-runner/ambient_runner/platform/git_guardrails.py b/components/runners/ambient-runner/ambient_runner/platform/git_guardrails.py new file mode 100644 index 000000000..239f55371 --- /dev/null +++ b/components/runners/ambient-runner/ambient_runner/platform/git_guardrails.py @@ -0,0 +1,307 @@ +""" +Git guardrails for detecting and classifying destructive git operations. + +Provides command validation to identify dangerous git and GitHub CLI/API +operations that could cause irreversible damage (branch deletion, force +pushes, history rewriting, etc.). + +These checks are used by the system prompt builder to inject safety +instructions and can be used by future hook-based enforcement layers. +""" + +import logging +import re + +logger = logging.getLogger(__name__) + + +class GitGuardrailViolation: + """Describes a guardrail violation found in a command.""" + + def __init__(self, rule: str, severity: str, command: str, explanation: str) -> None: + self.rule = rule + self.severity = severity # "block" or "warn" + self.command = command + self.explanation = explanation + + def __repr__(self) -> str: + return f"GitGuardrailViolation(rule={self.rule!r}, severity={self.severity!r})" + + +# --------------------------------------------------------------------------- +# Destructive command patterns +# --------------------------------------------------------------------------- + +# Patterns that should be blocked outright (irreversible / high-blast-radius) +_BLOCKED_PATTERNS: list[tuple[str, str, re.Pattern[str]]] = [ + ( + "delete_remote_ref", + "Deleting a remote branch/ref can permanently close associated PRs", + re.compile( + r""" + (?:gh\s+api|curl) # GitHub API call via gh or curl + .* # any intervening flags/args + -X\s*DELETE # HTTP DELETE method + .* # any intervening text + /git/refs/ # targeting a git ref + """, + re.VERBOSE | re.IGNORECASE, + ), + ), + ( + "api_force_update_ref", + "Force-updating a remote ref via the GitHub API bypasses git safety mechanisms", + re.compile( + r""" + (?:gh\s+api|curl) # GitHub API call + .* # any intervening flags/args + (?:PATCH|PUT) # HTTP update method + .* # any intervening text + /git/refs/ # targeting a git ref + .* # any intervening text + ["\']?force["\']?\s* # force parameter + :\s*true # set to true + """, + re.VERBOSE | re.IGNORECASE, + ), + ), + ( + "api_create_commit_on_ref", + "Creating commits directly via the GitHub API bypasses local git safeguards", + re.compile( + r""" + (?:gh\s+api|curl) # GitHub API call + .* # any intervening flags/args + (?:POST|PATCH|PUT) # HTTP write method + .* # any intervening text + /git/(?:commits|trees|blobs) # low-level git data API + """, + re.VERBOSE | re.IGNORECASE, + ), + ), + ( + "force_push", + "Force pushing overwrites remote history and can destroy others' work", + re.compile( + r""" + git\s+push\s+ # git push command + .* # any flags/args + --force(?!\-with\-lease) # --force but NOT --force-with-lease + """, + re.VERBOSE, + ), + ), + ( + "force_push_short", + "Force pushing (-f) overwrites remote history and can destroy others' work", + re.compile( + r""" + git\s+push\s+ # git push command + .* # any flags/args + \s-[a-zA-Z]*f # short flag containing -f + """, + re.VERBOSE, + ), + ), + ( + "push_to_main", + "Pushing directly to main/master can corrupt the default branch", + re.compile( + r""" + git\s+push\s+ # git push command + .* # remote name and flags + \s(?:main|master)\b # targeting main or master branch + """, + re.VERBOSE, + ), + ), + ( + "reset_hard", + "git reset --hard discards all uncommitted changes irreversibly", + re.compile( + r""" + git\s+reset\s+ # git reset command + .* # any flags + --hard # hard reset flag + """, + re.VERBOSE, + ), + ), + ( + "clean_force", + "git clean -fd permanently deletes untracked files and directories", + re.compile( + r""" + git\s+clean\s+ # git clean command + .* # any flags + -[a-zA-Z]*f # force flag (required for clean to run) + """, + re.VERBOSE, + ), + ), + ( + "checkout_discard", + "git checkout -- . discards all unstaged changes irreversibly", + re.compile( + r""" + git\s+checkout\s+ # git checkout command + --\s+\. # discard all changes + """, + re.VERBOSE, + ), + ), + ( + "branch_delete_remote", + "Deleting a remote branch can permanently close associated PRs", + re.compile( + r""" + git\s+push\s+ # git push command + \S+\s+ # remote name + --delete\s+ # delete flag + """, + re.VERBOSE, + ), + ), + ( + "branch_delete_remote_colon", + "Deleting a remote branch via :branch syntax can permanently close associated PRs", + re.compile( + r""" + git\s+push\s+ # git push command + \S+\s+ # remote name + :\S+ # :branch (delete syntax) + """, + re.VERBOSE, + ), + ), +] + +# Patterns that should generate warnings (risky but sometimes necessary) +_WARN_PATTERNS: list[tuple[str, str, re.Pattern[str]]] = [ + ( + "rebase", + "Rebasing rewrites commit history; create a backup branch first", + re.compile( + r""" + git\s+rebase\s+ # git rebase command + """, + re.VERBOSE, + ), + ), + ( + "force_with_lease", + "Force push with lease is safer but still overwrites remote history", + re.compile( + r""" + git\s+push\s+ # git push command + .* # any flags/args + --force-with-lease # safer force push + """, + re.VERBOSE, + ), + ), + ( + "amend_commit", + "Amending commits rewrites history; avoid if already pushed", + re.compile( + r""" + git\s+commit\s+ # git commit command + .* # any flags + --amend # amend flag + """, + re.VERBOSE, + ), + ), +] + + +def check_command(command: str) -> list[GitGuardrailViolation]: + """Check a shell command for git guardrail violations. + + Args: + command: The shell command string to validate. + + Returns: + List of violations found (empty if command is safe). + """ + if not command or not command.strip(): + return [] + + violations: list[GitGuardrailViolation] = [] + + for rule, explanation, pattern in _BLOCKED_PATTERNS: + if pattern.search(command): + violations.append( + GitGuardrailViolation( + rule=rule, + severity="block", + command=command, + explanation=explanation, + ) + ) + + for rule, explanation, pattern in _WARN_PATTERNS: + if pattern.search(command): + violations.append( + GitGuardrailViolation( + rule=rule, + severity="warn", + command=command, + explanation=explanation, + ) + ) + + return violations + + +def has_blocking_violation(command: str) -> bool: + """Return True if the command contains any blocking git guardrail violation.""" + violations = check_command(command) + return any(v.severity == "block" for v in violations) + + +def format_violations(violations: list[GitGuardrailViolation]) -> str: + """Format violations into a human-readable message.""" + if not violations: + return "" + + lines = ["Git guardrail violations detected:"] + for v in violations: + marker = "BLOCKED" if v.severity == "block" else "WARNING" + lines.append(f" [{marker}] {v.rule}: {v.explanation}") + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Token redaction helpers +# --------------------------------------------------------------------------- + +# Patterns that match common token/secret formats in commands +_TOKEN_PATTERNS: list[re.Pattern[str]] = [ + # GitHub PATs (classic and fine-grained) + re.compile(r"ghp_[A-Za-z0-9]{36,}"), + re.compile(r"github_pat_[A-Za-z0-9_]{36,}"), + # GitLab tokens + re.compile(r"glpat-[A-Za-z0-9\-_]{20,}"), + # Generic Bearer/token in URLs + re.compile(r"(?<=://)([^:]+):([^@]+)@", re.IGNORECASE), +] + + +def redact_tokens_in_command(command: str) -> str: + """Redact known token patterns in a command string. + + Args: + command: The command string that may contain tokens. + + Returns: + Command with tokens replaced by [REDACTED]. + """ + result = command + for pattern in _TOKEN_PATTERNS: + if pattern.groups: + # For patterns with groups (like URL credentials), replace the whole match + result = pattern.sub("[REDACTED]@", result) + else: + result = pattern.sub("[REDACTED]", result) + return result diff --git a/components/runners/ambient-runner/ambient_runner/platform/prompts.py b/components/runners/ambient-runner/ambient_runner/platform/prompts.py old mode 100644 new mode 100755 index d30d01031..2958d8348 --- a/components/runners/ambient-runner/ambient_runner/platform/prompts.py +++ b/components/runners/ambient-runner/ambient_runner/platform/prompts.py @@ -68,6 +68,48 @@ "the feature branch (`{branch}`). If push fails, do NOT fall back to main.\n\n" ) +GIT_SAFETY_INSTRUCTIONS = ( + "## Git Safety Guardrails\n\n" + "You MUST follow these rules when performing git operations. Violations can " + "cause **irreversible data loss** including destroyed PRs, lost review history, " + "and corrupted branches.\n\n" + "### Hard Rules (NEVER violate)\n\n" + "1. **NEVER delete remote branches or refs** — deleting a remote branch " + "permanently closes any associated PR and makes it unrestorable. Do NOT use " + "`git push --delete`, `git push origin :branch`, or " + "`gh api -X DELETE .../git/refs/...`.\n\n" + "2. **NEVER manipulate git refs via the GitHub/GitLab REST API** — if " + "`git push` fails, report the failure to the user and stop. Do NOT " + "circumvent push failures by using `gh api` or `curl` to PATCH/POST/DELETE " + "refs, or to create commits/trees/blobs directly via the Git Data API.\n\n" + "3. **NEVER force push** — do not use `git push --force` or `git push -f`. " + "If you must update a remote branch after a rebase, use " + "`git push --force-with-lease` and ONLY after getting explicit user approval.\n\n" + "4. **NEVER modify the user's default/main branch** — treat `main` and " + "`master` as read-only. Never push commits to them, never reset them, " + "never rebase onto them with a force push.\n\n" + "5. **NEVER run destructive local operations without a backup** — before " + "running `git reset --hard`, `git clean -fd`, `git checkout -- .`, or " + "any rebase, ALWAYS create a backup branch first:\n" + " ```\n" + " git branch backup-$(date +%s)\n" + " ```\n\n" + "6. **NEVER embed tokens or credentials in commands** — do not include " + "PATs, API keys, or passwords in git remote URLs, curl commands, or any " + "shell command. Use environment variables (e.g. `$GITHUB_TOKEN`) instead.\n\n" + "### Escalation Protocol\n\n" + "When a git operation fails, you MUST follow this protocol:\n" + "1. **Stop** — do not retry with a more aggressive variant.\n" + "2. **Diagnose** — read the error message and identify the root cause " + "(auth scope, permissions, branch protection, etc.).\n" + "3. **Report** — tell the user what failed and why.\n" + "4. **Wait** — let the user decide the next step. Do NOT autonomously " + "escalate to force pushes, API workarounds, or destructive operations.\n\n" + "Violating these rules can permanently destroy user work, close PRs, " + "and lose review history. When in doubt, ask the user.\n\n" +) + + RUBRIC_EVALUATION_HEADER = "## Rubric Evaluation\n\n" RUBRIC_EVALUATION_INTRO = ( @@ -215,6 +257,10 @@ def build_workspace_context_prompt( prompt += f"- **repos/{repo_name}/**\n" prompt += GIT_PUSH_STEPS.format(branch=push_branch) + # Git safety guardrails (always included when repos are present) + if repos_cfg: + prompt += GIT_SAFETY_INSTRUCTIONS + # Human-in-the-loop instructions prompt += HUMAN_INPUT_INSTRUCTIONS diff --git a/components/runners/ambient-runner/tests/test_auto_push.py b/components/runners/ambient-runner/tests/test_auto_push.py old mode 100644 new mode 100755 index 7ce0225e3..56bf3cadc --- a/components/runners/ambient-runner/tests/test_auto_push.py +++ b/components/runners/ambient-runner/tests/test_auto_push.py @@ -317,6 +317,46 @@ def test_prompt_includes_multiple_autopush_repos(self): # repo3 should not be in git instructions since autoPush=false # (but it will be in the general repos list) + def test_prompt_includes_git_safety_guardrails_with_repos(self): + """Test that git safety guardrails are included when repos are present.""" + repos_cfg = [ + { + "name": "my-repo", + "url": "https://github.com/owner/my-repo.git", + "branch": "main", + "autoPush": False, + } + ] + + prompt = build_workspace_context_prompt( + repos_cfg=repos_cfg, + workflow_name=None, + artifacts_path="artifacts", + ambient_config={}, + workspace_path="/workspace", + ) + + # Verify git safety guardrails are present + assert "Git Safety Guardrails" in prompt + assert "NEVER delete remote branches" in prompt + assert "NEVER force push" in prompt + assert "NEVER modify the user's default/main branch" in prompt + assert "NEVER run destructive local operations without a backup" in prompt + assert "NEVER embed tokens or credentials" in prompt + assert "Escalation Protocol" in prompt + + def test_prompt_excludes_git_safety_guardrails_without_repos(self): + """Test that git safety guardrails are excluded when no repos are present.""" + prompt = build_workspace_context_prompt( + repos_cfg=[], + workflow_name=None, + artifacts_path="artifacts", + ambient_config={}, + workspace_path="/workspace", + ) + + assert "Git Safety Guardrails" not in prompt + def test_prompt_without_repos(self): """Test prompt generation when no repos are configured.""" prompt = build_workspace_context_prompt( diff --git a/components/runners/ambient-runner/tests/test_git_guardrails.py b/components/runners/ambient-runner/tests/test_git_guardrails.py new file mode 100644 index 000000000..27ba23c29 --- /dev/null +++ b/components/runners/ambient-runner/tests/test_git_guardrails.py @@ -0,0 +1,344 @@ +"""Unit tests for git_guardrails module.""" + +import pytest + +from ambient_runner.platform.git_guardrails import ( + GitGuardrailViolation, + check_command, + format_violations, + has_blocking_violation, + redact_tokens_in_command, +) + + +class TestCheckCommand: + """Tests for check_command function.""" + + # -- Blocked patterns -------------------------------------------------- + + def test_delete_remote_ref_via_gh_api(self): + """Deleting a remote ref via gh api should be blocked.""" + cmd = 'gh api -X DELETE "repos/user/repo/git/refs/heads/my-branch"' + violations = check_command(cmd) + assert any(v.rule == "delete_remote_ref" and v.severity == "block" for v in violations) + + def test_delete_remote_ref_via_curl(self): + """Deleting a remote ref via curl should be blocked.""" + cmd = 'curl -X DELETE https://api.github.com/repos/user/repo/git/refs/heads/branch' + violations = check_command(cmd) + assert any(v.rule == "delete_remote_ref" and v.severity == "block" for v in violations) + + def test_api_force_update_ref(self): + """Force-updating a ref via API should be blocked.""" + cmd = ( + 'gh api -X PATCH repos/user/repo/git/refs/heads/branch ' + '-f sha=abc123 -f "force":true' + ) + violations = check_command(cmd) + assert any( + v.rule == "api_force_update_ref" and v.severity == "block" for v in violations + ) + + def test_api_create_commit(self): + """Creating commits directly via API should be blocked.""" + cmd = 'gh api -X POST repos/user/repo/git/commits -f message="direct commit"' + violations = check_command(cmd) + assert any( + v.rule == "api_create_commit_on_ref" and v.severity == "block" + for v in violations + ) + + def test_api_create_tree(self): + """Creating trees directly via API should be blocked.""" + cmd = 'curl -X POST https://api.github.com/repos/user/repo/git/trees' + violations = check_command(cmd) + assert any( + v.rule == "api_create_commit_on_ref" and v.severity == "block" + for v in violations + ) + + def test_api_create_blob(self): + """Creating blobs directly via API should be blocked.""" + cmd = 'gh api -X POST repos/user/repo/git/blobs -f content="data"' + violations = check_command(cmd) + assert any( + v.rule == "api_create_commit_on_ref" and v.severity == "block" + for v in violations + ) + + def test_force_push(self): + """git push --force should be blocked.""" + cmd = "git push origin my-branch --force" + violations = check_command(cmd) + assert any(v.rule == "force_push" and v.severity == "block" for v in violations) + + def test_force_push_short_flag(self): + """git push -f should be blocked.""" + cmd = "git push origin my-branch -f" + violations = check_command(cmd) + assert any( + v.rule == "force_push_short" and v.severity == "block" for v in violations + ) + + def test_force_with_lease_not_blocked_as_force(self): + """git push --force-with-lease should NOT trigger the force_push rule.""" + cmd = "git push origin my-branch --force-with-lease" + violations = check_command(cmd) + assert not any(v.rule == "force_push" for v in violations) + + def test_push_to_main(self): + """Pushing to main should be blocked.""" + cmd = "git push origin main" + violations = check_command(cmd) + assert any(v.rule == "push_to_main" and v.severity == "block" for v in violations) + + def test_push_to_master(self): + """Pushing to master should be blocked.""" + cmd = "git push origin master" + violations = check_command(cmd) + assert any(v.rule == "push_to_main" and v.severity == "block" for v in violations) + + def test_push_to_feature_branch_allowed(self): + """Pushing to a feature branch should not trigger push_to_main.""" + cmd = "git push origin feature/my-branch" + violations = check_command(cmd) + assert not any(v.rule == "push_to_main" for v in violations) + + def test_reset_hard(self): + """git reset --hard should be blocked.""" + cmd = "git reset --hard origin/main" + violations = check_command(cmd) + assert any(v.rule == "reset_hard" and v.severity == "block" for v in violations) + + def test_clean_force(self): + """git clean -fd should be blocked.""" + cmd = "git clean -fd" + violations = check_command(cmd) + assert any(v.rule == "clean_force" and v.severity == "block" for v in violations) + + def test_clean_force_extended(self): + """git clean -fdx should be blocked.""" + cmd = "git clean -fdx" + violations = check_command(cmd) + assert any(v.rule == "clean_force" and v.severity == "block" for v in violations) + + def test_checkout_discard_all(self): + """git checkout -- . should be blocked.""" + cmd = "git checkout -- ." + violations = check_command(cmd) + assert any( + v.rule == "checkout_discard" and v.severity == "block" for v in violations + ) + + def test_branch_delete_remote(self): + """git push --delete should be blocked.""" + cmd = "git push origin --delete my-branch" + violations = check_command(cmd) + assert any( + v.rule == "branch_delete_remote" and v.severity == "block" for v in violations + ) + + def test_branch_delete_colon_syntax(self): + """git push origin :branch should be blocked.""" + cmd = "git push origin :my-branch" + violations = check_command(cmd) + assert any( + v.rule == "branch_delete_remote_colon" and v.severity == "block" + for v in violations + ) + + # -- Warning patterns -------------------------------------------------- + + def test_rebase_warns(self): + """git rebase should generate a warning.""" + cmd = "git rebase main" + violations = check_command(cmd) + assert any(v.rule == "rebase" and v.severity == "warn" for v in violations) + + def test_force_with_lease_warns(self): + """git push --force-with-lease should generate a warning.""" + cmd = "git push origin my-branch --force-with-lease" + violations = check_command(cmd) + assert any( + v.rule == "force_with_lease" and v.severity == "warn" for v in violations + ) + + def test_amend_commit_warns(self): + """git commit --amend should generate a warning.""" + cmd = 'git commit --amend -m "updated message"' + violations = check_command(cmd) + assert any(v.rule == "amend_commit" and v.severity == "warn" for v in violations) + + # -- Safe commands (no violations) ------------------------------------- + + def test_safe_push(self): + """Normal push to a feature branch should have no blocking violations.""" + cmd = "git push -u origin feature/my-work" + assert not has_blocking_violation(cmd) + + def test_safe_commit(self): + """Normal commit should have no violations.""" + cmd = 'git commit -m "add feature"' + violations = check_command(cmd) + assert len(violations) == 0 + + def test_safe_add(self): + """git add should have no violations.""" + cmd = "git add ." + violations = check_command(cmd) + assert len(violations) == 0 + + def test_safe_status(self): + """git status should have no violations.""" + cmd = "git status" + violations = check_command(cmd) + assert len(violations) == 0 + + def test_safe_diff(self): + """git diff should have no violations.""" + cmd = "git diff HEAD~1" + violations = check_command(cmd) + assert len(violations) == 0 + + def test_safe_log(self): + """git log should have no violations.""" + cmd = "git log --oneline -10" + violations = check_command(cmd) + assert len(violations) == 0 + + def test_safe_gh_pr_create(self): + """gh pr create should have no violations.""" + cmd = 'gh pr create --title "my PR" --body "description"' + violations = check_command(cmd) + assert len(violations) == 0 + + def test_non_git_command(self): + """Non-git commands should have no violations.""" + cmd = "ls -la /workspace/repos" + violations = check_command(cmd) + assert len(violations) == 0 + + def test_empty_command(self): + """Empty command should return no violations.""" + violations = check_command("") + assert len(violations) == 0 + + def test_none_command(self): + """None command should return no violations.""" + violations = check_command(None) + assert len(violations) == 0 + + def test_whitespace_command(self): + """Whitespace-only command should return no violations.""" + violations = check_command(" ") + assert len(violations) == 0 + + +class TestHasBlockingViolation: + """Tests for has_blocking_violation function.""" + + def test_blocking_command_returns_true(self): + assert has_blocking_violation("git push --force origin main") is True + + def test_warning_only_returns_false(self): + assert has_blocking_violation("git rebase main") is False + + def test_safe_command_returns_false(self): + assert has_blocking_violation("git status") is False + + +class TestFormatViolations: + """Tests for format_violations function.""" + + def test_empty_list(self): + assert format_violations([]) == "" + + def test_single_violation(self): + violations = [ + GitGuardrailViolation( + rule="force_push", + severity="block", + command="git push --force", + explanation="Force pushing is dangerous", + ) + ] + result = format_violations(violations) + assert "BLOCKED" in result + assert "force_push" in result + assert "Force pushing is dangerous" in result + + def test_mixed_severities(self): + violations = [ + GitGuardrailViolation( + rule="force_push", + severity="block", + command="test", + explanation="blocked reason", + ), + GitGuardrailViolation( + rule="rebase", + severity="warn", + command="test", + explanation="warning reason", + ), + ] + result = format_violations(violations) + assert "BLOCKED" in result + assert "WARNING" in result + + +class TestRedactTokensInCommand: + """Tests for redact_tokens_in_command function.""" + + def test_redact_github_pat(self): + cmd = "curl -H 'Authorization: token ghp_Qo5uXxYzAbCdEfGhIjKlMnOpQrStUvWxYz12' https://api.github.com" + result = redact_tokens_in_command(cmd) + assert "ghp_" not in result + assert "[REDACTED]" in result + + def test_redact_github_fine_grained_pat(self): + cmd = "git remote set-url origin https://github_pat_abc123def456_abcdefghijklmnopqrstuvwxyz1234@github.com/user/repo" + result = redact_tokens_in_command(cmd) + assert "github_pat_" not in result + assert "[REDACTED]" in result + + def test_redact_gitlab_token(self): + cmd = "git clone https://oauth2:glpat-xxxxxxxxxxxxxxxxxxxx@gitlab.com/user/repo" + result = redact_tokens_in_command(cmd) + assert "glpat-" not in result + assert "[REDACTED]" in result + + def test_redact_url_credentials(self): + cmd = "git clone https://user:secret_token@github.com/user/repo" + result = redact_tokens_in_command(cmd) + assert "secret_token" not in result + + def test_no_tokens_unchanged(self): + cmd = "git push -u origin my-branch" + result = redact_tokens_in_command(cmd) + assert result == cmd + + def test_empty_string(self): + result = redact_tokens_in_command("") + assert result == "" + + +class TestMultipleViolations: + """Test commands that trigger multiple violations.""" + + def test_force_push_to_main(self): + """Force push to main should trigger both force_push and push_to_main.""" + cmd = "git push --force origin main" + violations = check_command(cmd) + rules = {v.rule for v in violations} + assert "force_push" in rules + assert "push_to_main" in rules + + def test_destructive_local_sequence(self): + """Commands chained with && should each be checked.""" + # check_command operates on the full string, so patterns match + cmd = "git reset --hard HEAD && git clean -fd" + violations = check_command(cmd) + rules = {v.rule for v in violations} + assert "reset_hard" in rules + assert "clean_force" in rules