Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions tests/test_ai_appsec_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# IMPORTANT: Adjust this path to match exactly where your file is located
from gitgalaxy.tools.ai_guardrails.ai_appsec_sensor import AIAppSecSensor

# ==============================================================================
# TEST 1: The RCE Funnel (Weaponized Prompt Injection)
# ==============================================================================
def test_rce_funnel_detection():
"""
Proves that an LLM directly wired to OS execution (eval/subprocess)
and exposed via a public API correctly triggers the RCE Funnel alert.
"""
sensor = AIAppSecSensor()

mock_files = [{
"telemetry": {
"llm_api": 1, # ☢️ AI is present
"arch_api": 1, # ☢️ Exposed to the public internet
"sec_danger": 1, # ☢️ Contains eval() or subprocess execution
"safety_density": 0.9
}
}]

result = sensor.hunt_threats(mock_files)
appsec_report = result[0]["telemetry"]["ai_appsec"]

assert appsec_report["is_rce_funnel"] is True, "Failed to detect the RCE Funnel!"
assert any("RCE Funnel" in warning for warning in appsec_report["critical_warnings"])

# ==============================================================================
# TEST 2: The God-Mode Agent (Autonomous Data Corruption)
# ==============================================================================
def test_god_mode_agent_detection():
"""
Proves that an AI agent given autonomous tools, write-access to complex
databases, and low defensive programming density triggers the God-Mode alert.
"""
sensor = AIAppSecSensor()

mock_files = [{
"max_db_complexity": 3, # ☢️ Heavy database write access
"telemetry": {
"ai_tools": 1, # ☢️ Agentic tool calling enabled
"safety_density": 0.2 # ☢️ Dangerously low defensive programming
}
}]

result = sensor.hunt_threats(mock_files)
appsec_report = result[0]["telemetry"]["ai_appsec"]

assert appsec_report["over_permissioned_agent"] is True, "Failed to detect the God-Mode Agent!"
assert any("God-Mode Agent" in warning for warning in appsec_report["critical_warnings"])

# ==============================================================================
# TEST 3: The Exfiltration Vector (Unsandboxed Sockets)
# ==============================================================================
def test_exfiltration_vector_detection():
"""
Proves that an LLM with access to both raw network sockets and hardcoded
environment secrets triggers the Exfiltration Vector alert.
"""
sensor = AIAppSecSensor()

mock_files = [{
"telemetry": {
"llm_api": 1, # ☢️ AI is present
"arch_io": 1, # ☢️ Can make outbound network requests
"sec_secrets": 1 # ☢️ Has access to AWS keys/passwords
}
}]

result = sensor.hunt_threats(mock_files)
appsec_report = result[0]["telemetry"]["ai_appsec"]

assert appsec_report["agentic_exfiltration_risk"] is True, "Failed to detect the Exfiltration Vector!"
assert any("Exfiltration Vector" in warning for warning in appsec_report["critical_warnings"])

# ==============================================================================
# TEST 4: The Clean Baseline (False-Positive Guard)
# ==============================================================================
def test_safe_baseline():
"""
Proves that a properly sandboxed AI integration (e.g., an LLM script with
no network execution, no eval(), and high safety density) passes cleanly.
"""
sensor = AIAppSecSensor()

mock_files = [{
"max_db_complexity": 0,
"telemetry": {
"llm_api": 1, # ✅ AI is present
"arch_api": 0, # ✅ Not exposed to the public
"sec_danger": 0, # ✅ No eval/subprocess
"sec_secrets": 0, # ✅ No secrets exposed
"safety_density": 0.95 # ✅ High defensive try/catch density
}
}]

result = sensor.hunt_threats(mock_files)
appsec_report = result[0]["telemetry"]["ai_appsec"]

# Assert absolutely NO flags were triggered
assert appsec_report["is_rce_funnel"] is False
assert appsec_report["over_permissioned_agent"] is False
assert appsec_report["agentic_exfiltration_risk"] is False
assert len(appsec_report["critical_warnings"]) == 0, "False positive triggered on a safe file!"
137 changes: 137 additions & 0 deletions tests/test_dev_agent_firewall.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# IMPORTANT: Adjust this path to match exactly where your file is located
from gitgalaxy.tools.ai_guardrails.dev_agent_firewall import DevAgentFirewall

# ==============================================================================
# TEST 1: The Context Window Shredder (Black Hole Detection)
# ==============================================================================
def test_black_hole_detection():
"""
Proves that files exceeding 8k tokens with O(N^3) or worse complexity
are correctly flagged as Context Window Shredders.
"""
firewall = DevAgentFirewall()

mock_files = [{
"token_mass": 8500, # ☢️ Exceeds 8k limit
"max_big_o": 3, # ☢️ High algorithmic complexity
"telemetry": {},
"risk_vector": []
}]

result = firewall.evaluate_ecosystem(mock_files)
guardrails = result[0]["telemetry"]["ai_guardrails"]

assert guardrails["is_agentic_black_hole"] is True, "Failed to detect the Agentic Black Hole!"
assert any("Black Hole detected" in warning for warning in guardrails["warnings"])

# ==============================================================================
# TEST 2: The HITL Mandate (Blast Radius + Risk Debt)
# ==============================================================================
def test_hitl_mandate_detection():
"""
Proves that high PageRank (blast radius) combined with severe risk debt
mandates a Human-in-the-Loop constraint.
"""
firewall = DevAgentFirewall()

mock_files = [{
"token_mass": 1000,
"max_big_o": 1,
"risk_vector": [100, 50, 60], # ☢️ Sum = 210 (> 200 threshold)
"telemetry": {
"network_metrics": {
"normalized_blast_radius": 1.5 # ☢️ > 1.0 threshold
}
}
}]

result = firewall.evaluate_ecosystem(mock_files)
guardrails = result[0]["telemetry"]["ai_guardrails"]

assert guardrails["requires_hitl"] is True, "Failed to enforce the HITL Mandate!"
assert any("Human-in-the-Loop required" in warning for warning in guardrails["warnings"])

# ==============================================================================
# TEST 3: The Hallucination Zone (Metaprogramming + Low Docs)
# ==============================================================================
def test_hallucination_zone_detection():
"""
Proves that high dynamic execution (heat triggers) and poor documentation
triggers the Hallucination Zone warning.
"""
firewall = DevAgentFirewall()

mock_files = [{
"telemetry": {
"heat_triggers": 3, # ☢️ > 2 dynamic execution triggers
"doc_density": 0.15 # ☢️ < 0.20 density
}
}]

result = firewall.evaluate_ecosystem(mock_files)
guardrails = result[0]["telemetry"]["ai_guardrails"]

assert guardrails["hallucination_zone"] is True, "Failed to detect the Hallucination Zone!"
assert any("Hallucination Zone" in warning for warning in guardrails["warnings"])

# ==============================================================================
# TEST 4: The Silent Mutation Risk (Flux + Blast + No Tests)
# ==============================================================================
def test_silent_mutation_risk_detection():
"""
Proves that files with high state flux, high inbound dependencies, and
zero test coverage are flagged as a Silent Mutation Risk.
"""
firewall = DevAgentFirewall()

mock_files = [{
"telemetry": {
"state_flux": 55, # ☢️ > 50 flux
"has_tests": False, # ☢️ Zero test coverage
"network_metrics": {
"in_degree": 6 # ☢️ > 5 dependencies rely on this
}
}
}]

result = firewall.evaluate_ecosystem(mock_files)
guardrails = result[0]["telemetry"]["ai_guardrails"]

assert guardrails.get("silent_mutation_risk") is True, "Failed to detect Silent Mutation Risk!"
assert any("Silent Mutation Risk" in warning for warning in guardrails["warnings"])

# ==============================================================================
# TEST 5: The Clean Baseline (False-Positive Guard)
# ==============================================================================
def test_safe_agentic_baseline():
"""
Proves that a well-documented, well-tested, simple file passes the firewall
without triggering any agentic guardrails.
"""
firewall = DevAgentFirewall()

mock_files = [{
"token_mass": 2000, # ✅ Safe size
"max_big_o": 1, # ✅ Simple O(N) logic
"risk_vector": [10, 5], # ✅ Low risk debt (15)
"telemetry": {
"heat_triggers": 0, # ✅ No dynamic execution
"doc_density": 0.85, # ✅ Highly documented
"state_flux": 10, # ✅ Low flux
"has_tests": True, # ✅ Safely tested
"network_metrics": {
"normalized_blast_radius": 0.5, # ✅ Low blast radius
"in_degree": 1 # ✅ Low dependencies
}
}
}]

result = firewall.evaluate_ecosystem(mock_files)
guardrails = result[0]["telemetry"]["ai_guardrails"]

# Assert absolutely NO flags were triggered
assert guardrails["is_agentic_black_hole"] is False
assert guardrails["requires_hitl"] is False
assert guardrails["hallucination_zone"] is False
assert guardrails.get("silent_mutation_risk", False) is False
assert len(guardrails["warnings"]) == 0, "False positive triggered on a safe file!"
127 changes: 127 additions & 0 deletions tests/test_terabyte_log_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import pytest
import sys
import json
from unittest.mock import patch

# IMPORTANT: Adjust this path to match exactly where your file is located
import gitgalaxy.tools.terabyte_log_scanning.terabyte_log_scanner as scanner_module

# ==============================================================================
# TEST 1: The IR State Handshake & Binary Extraction
# ==============================================================================
def test_scanner_json_handshake_and_extraction(tmp_path):
"""
Proves that the engine correctly parses the IR state JSON, extracts the targets,
scans a binary log stream, and safely extracts only the matching lines to disk.
"""
# 1. Setup the physical mock workspace
work_dir = tmp_path / "scanner_workspace"
work_dir.mkdir()

# A) The Mock IR State (GitGalaxy Standard Schema)
ir_state = {
"analysis": {
"known_programs": ["PGM_ALPHA", "PGM_BETA"]
}
}
state_file = work_dir / "ir_state.json"
state_file.write_text(json.dumps(ir_state), encoding="utf-8")

# B) The Mock Terabyte Log (Mix of noise and target hits)
target_log = work_dir / "mainframe_dump.log"
target_log.write_text(
"2026-05-11 09:15 [INFO] System boot sequence initialized\n"
"2026-05-11 09:20 [EXEC] PGM_ALPHA executed successfully\n"
"2026-05-11 09:25 [WARN] Unrelated process memory spike\n"
"2026-05-11 10:00 [EXEC] PGM_BETA encountered warning 04\n"
"2026-05-11 10:05 [EXEC] PGM_ALPHA restarted\n",
encoding="utf-8"
)

# 2. Execute the Engine
test_args = ["terabyte_log_scanner.py", str(target_log), "--input_state", str(state_file)]

with patch.object(sys, 'argv', test_args):
# We don't trap SystemExit here because a successful run should exit normally (no sys.exit call)
scanner_module.main()

# 3. The Invariant Assertions
# A) Verify the filtered results log
results_file = work_dir / "mainframe_dump_results.txt"
assert results_file.exists(), "Scanner failed to create the results output file!"

results_content = results_file.read_text(encoding="utf-8")

assert "PGM_ALPHA executed successfully" in results_content
assert "PGM_BETA encountered warning 04" in results_content
assert "System boot sequence initialized" not in results_content, "Noise slipped through the binary filter!"

# B) Verify the Telemetry Sidecar
sidecar_file = work_dir / "dynamic_telemetry.json"
assert sidecar_file.exists(), "Scanner failed to generate the JSON sidecar!"

telemetry = json.loads(sidecar_file.read_text(encoding="utf-8"))
counts = telemetry.get("execution_counts", {})

# PGM_ALPHA appeared twice, PGM_BETA appeared once
assert counts.get("PGM_ALPHA") == 2, "Mathematical aggregation failed for PGM_ALPHA!"
assert counts.get("PGM_BETA") == 1, "Mathematical aggregation failed for PGM_BETA!"

# ==============================================================================
# TEST 2: The Schema Guard (Invalid JSON Rejection)
# ==============================================================================
def test_scanner_invalid_json_schema(tmp_path):
"""
Proves that if a malformed or incorrect JSON schema is provided, the
scanner aggressively aborts to prevent silent failures down the pipeline.
"""
work_dir = tmp_path / "schema_repo"
work_dir.mkdir()

# Create a JSON file missing the required 'analysis' -> 'known_programs' path
bad_state_file = work_dir / "bad_state.json"
bad_state_file.write_text(json.dumps({"wrong_root": []}), encoding="utf-8")

dummy_log = work_dir / "dummy.log"
dummy_log.write_text("empty", encoding="utf-8")

test_args = ["terabyte_log_scanner.py", str(dummy_log), "--input_state", str(bad_state_file)]

with patch.object(sys, 'argv', test_args):
with pytest.raises(SystemExit) as exc:
scanner_module.main()

# The engine must throw a fatal error (exit code 1) on schema mismatch
assert exc.value.code == 1, "Scanner failed to block an invalid JSON schema!"

# ==============================================================================
# TEST 3: The Manual CLI Override (-k Flag)
# ==============================================================================
def test_scanner_manual_keyword_override(tmp_path):
"""
Proves that the scanner can bypass the JSON input state and process
raw CLI keyword arguments correctly.
"""
work_dir = tmp_path / "cli_repo"
work_dir.mkdir()

target_log = work_dir / "app.log"
target_log.write_text(
"Line 1: ERROR 500\n"
"Line 2: SUCCESS 200\n"
"Line 3: ERROR 404\n",
encoding="utf-8"
)

# Use -k ERROR to hunt for errors
test_args = ["terabyte_log_scanner.py", str(target_log), "-k", "ERROR"]

with patch.object(sys, 'argv', test_args):
scanner_module.main()

results_file = work_dir / "app_results.txt"
content = results_file.read_text(encoding="utf-8")

assert "ERROR 500" in content
assert "ERROR 404" in content
assert "SUCCESS 200" not in content, "Manual keyword override failed to filter!"
Loading