Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions tests/test_cobol_dag_architect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import pytest
import sys
from pathlib import Path

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'Path' is not used.
from unittest.mock import patch

# IMPORTANT: Adjust this path to match exactly where your file is located
import gitgalaxy.tools.cobol_to_cobol.cobol_dag_architect as dag_module

# ==============================================================================
# TEST 1: The Ghost Deflector & Intent Extraction
# ==============================================================================
def test_ghost_deflector_lineage(tmp_path):
"""
Proves the lineage extractor correctly maps DD assignments, strips prefixes,
catches dynamic calls, and perfectly ignores 'OPEN' statements hidden inside
paragraphs marked as dead.
"""
mock_cobol = tmp_path / "PGM1.cbl"
cobol_code = (
" PROGRAM-ID. PGM1.\n"
" SELECT FILE-IN ASSIGN TO UT-S-INPUT01.\n"
" SELECT FILE-OUT ASSIGN TO OUTPUT01.\n"
" PROCEDURE DIVISION.\n"
" MAIN-ENTRY.\n"
" OPEN INPUT FILE-IN.\n"
" CALL 'STATIC-PGM'.\n" # Static call (should be ignored)
" CALL WS-DYN-PGM.\n" # Dynamic call (Honesty Sensor should catch)
" DEAD-PARA.\n"
" OPEN OUTPUT FILE-OUT.\n" # This is dead code!
)
mock_cobol.write_text(cobol_code, encoding="utf-8")

# 1. Test without dead code context (Base baseline)
raw_lineage = dag_module.extract_lineage(mock_cobol)
assert "INPUT01" in raw_lineage["inputs"]
assert "OUTPUT01" in raw_lineage["outputs"] # Without Ghost Deflector, it hallucinates this output

# 2. Test WITH the Ghost Deflector activated
safe_lineage = dag_module.extract_lineage(mock_cobol, dead_paras={"DEAD-PARA"})
assert "INPUT01" in safe_lineage["inputs"]
assert "OUTPUT01" not in safe_lineage["outputs"], "Ghost Deflector failed! It hallucinated dead code dependencies."

# 3. Test the Honesty Sensor
assert "WS-DYN-PGM" in safe_lineage["unresolved_calls"], "Failed to catch the dynamic jump!"
assert "STATIC-PGM" not in safe_lineage["unresolved_calls"]

# ==============================================================================
# TEST 2: Mathematical Topological Sort (Happy Path)
# ==============================================================================
def test_dag_architect_topological_sort(tmp_path, capsys):
"""
Proves Kahn's Algorithm perfectly calculates execution order by resolving
Producer -> Consumer file dependencies.
"""
repo_dir = tmp_path / "dag_repo"
repo_dir.mkdir()

# PGM_C reads FILE2 and writes FILE3
(repo_dir / "PGMC.cbl").write_text(
" PROGRAM-ID. PGMC.\n"
" SELECT F2 ASSIGN TO FILE2.\n"
" SELECT F3 ASSIGN TO FILE3.\n"
" PROCEDURE DIVISION.\n"
" OPEN INPUT F2.\n"
" OPEN OUTPUT F3.\n", encoding="utf-8"
)

# PGM_A reads FILE0 and writes FILE1 (Should run FIRST)
(repo_dir / "PGMA.cbl").write_text(
" PROGRAM-ID. PGMA.\n"
" SELECT F1 ASSIGN TO FILE1.\n"
" PROCEDURE DIVISION.\n"
" OPEN OUTPUT F1.\n", encoding="utf-8"
)

# PGM_B reads FILE1 and writes FILE2 (Should run SECOND)
(repo_dir / "PGMB.cbl").write_text(
" PROGRAM-ID. PGMB.\n"
" SELECT F1 ASSIGN TO FILE1.\n"
" SELECT F2 ASSIGN TO FILE2.\n"
" PROCEDURE DIVISION.\n"
" OPEN INPUT F1.\n"
" OPEN OUTPUT F2.\n", encoding="utf-8"
)

test_args = ["cobol_dag_architect.py", str(repo_dir)]
with patch.object(sys, 'argv', test_args):
dag_module.main()

captured = capsys.readouterr()

# Assert execution order is exactly A -> B -> C regardless of file read order
assert "STEP 01: Run [PGMA]" in captured.out
assert "STEP 02: Run [PGMB]" in captured.out
assert "STEP 03: Run [PGMC]" in captured.out

# ==============================================================================
# TEST 3: Cycle Detection (Deadlock Trap)
# ==============================================================================
def test_dag_architect_cycle_detection(tmp_path, capsys):
"""
Proves the engine catches circular data dependencies and halts execution
before generating a mathematically impossible pipeline.
"""
repo_dir = tmp_path / "cyclic_repo"
repo_dir.mkdir()

# PGM_1 reads FILE-B and writes FILE-A
(repo_dir / "P1.cbl").write_text(
" PROGRAM-ID. P1.\n"
" SELECT FB ASSIGN TO FILE-B.\n"
" SELECT FA ASSIGN TO FILE-A.\n"
" PROCEDURE DIVISION.\n"
" OPEN INPUT FB.\n"
" OPEN OUTPUT FA.\n", encoding="utf-8"
)

# PGM_2 reads FILE-A and writes FILE-B (Creates a deadlock cycle)
(repo_dir / "P2.cbl").write_text(
" PROGRAM-ID. P2.\n"
" SELECT FA ASSIGN TO FILE-A.\n"
" SELECT FB ASSIGN TO FILE-B.\n"
" PROCEDURE DIVISION.\n"
" OPEN INPUT FA.\n"
" OPEN OUTPUT FB.\n", encoding="utf-8"
)

test_args = ["cobol_dag_architect.py", str(repo_dir)]
with patch.object(sys, 'argv', test_args):
with pytest.raises(SystemExit) as exc:
dag_module.main()

# Must exit with error code 1 due to the cycle
assert exc.value.code == 1, "Failed to trap the cycle and crash the build!"

captured = capsys.readouterr()
assert "WARNING: Cyclic Dependency Detected" in captured.out
assert "Deadlocked Programs:" in captured.out
117 changes: 117 additions & 0 deletions tests/test_cobol_graveyard_finder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import pytest

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'pytest' is not used.
import sys
from pathlib import Path

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'Path' is not used.
from unittest.mock import patch

# IMPORTANT: Adjust this path to match exactly where your file is located
import gitgalaxy.tools.cobol_to_cobol.cobol_graveyard_finder as graveyard_module

# ==============================================================================
# TEST 1: The Copybook Shapeshifter (Inline Variable Swapping)
# ==============================================================================
def test_copybook_shapeshifter(tmp_path):
"""
Proves that the engine correctly resolves local .cpy files, injects their
contents, and accurately processes the REPLACING ==A== BY ==B== logic.
"""
repo_dir = tmp_path / "copy_repo"
repo_dir.mkdir()

# 1. The main program
main_pgm = repo_dir / "MAIN.cbl"
main_pgm.write_text(" COPY MYDATA REPLACING ==OLD-VAR== BY ==NEW-VAR==.", encoding="utf-8")

# 2. The external copybook
copybook = repo_dir / "MYDATA.cpy"
copybook.write_text(" 01 OLD-VAR PIC X(10).\n 01 OLD-VAR-X PIC X(5).", encoding="utf-8")

# 3. Execute the resolver
raw_content = main_pgm.read_text(encoding="utf-8")
resolved_content = graveyard_module.resolve_copybooks(raw_content, main_pgm)

# 4. Assertions
# A) Ensure the content was injected
assert "START COPY MYDATA" in resolved_content
# B) Ensure the strict boundary replacement worked (OLD-VAR became NEW-VAR)
assert "01 NEW-VAR PIC" in resolved_content
# C) ZERO-TRUST GUARD: Ensure partial matches were NOT replaced (OLD-VAR-X stays OLD-VAR-X)
assert "01 OLD-VAR-X PIC" in resolved_content, "The Shapeshifter destroyed a partial word match!"

# ==============================================================================
# TEST 2: The AST Dead Code Math
# ==============================================================================
def test_ast_dead_code_math(tmp_path):
"""
Proves that the engine correctly separates data from execution, isolates
orphaned variables, and calculates unreachable phantom paragraphs.
"""
mock_cobol = tmp_path / "DEADPGM.cbl"
cobol_code = (
" DATA DIVISION.\n"
" 01 USED-VAR PIC X.\n"
" 01 ORPHAN-VAR PIC X.\n" # Declared but never used
" 01 FILLER PIC X.\n" # Noise, should be ignored
" PROCEDURE DIVISION.\n"
" MAIN-PARA.\n" # Entry point (Reached)
" PERFORM USED-PARA.\n"
" USED-PARA.\n" # Reached via PERFORM
" DISPLAY USED-VAR.\n"
" DEAD-PARA.\n" # Unreachable (Phantom)
" DISPLAY 'HELLO'.\n"
" DEAD-EXIT.\n" # Ends in -EXIT (Should be ignored)
)
mock_cobol.write_text(cobol_code, encoding="utf-8")

metrics = graveyard_module.x_ray_dead_code(mock_cobol)

# 1. Variable Assertions
assert "ORPHAN-VAR" in metrics["orphaned_vars"]
assert "USED-VAR" not in metrics["orphaned_vars"]
assert "FILLER" not in metrics["orphaned_vars"], "Engine failed to filter out FILLER noise!"

# 2. Paragraph Assertions
assert "DEAD-PARA" in metrics["dead_paras"]
assert "MAIN-PARA" not in metrics["dead_paras"], "Engine flagged the entry point as dead!"
assert "USED-PARA" not in metrics["dead_paras"]
assert "DEAD-EXIT" not in metrics["dead_paras"], "Engine failed to filter out *-EXIT paragraphs!"

# 3. Math (1 orphaned var + 1 dead para * 10 lines = 11 LOC saved)
assert metrics["loc_saved"] == 11

# ==============================================================================
# TEST 3: The E2E CLI Aggregation
# ==============================================================================
def test_graveyard_cli_e2e(tmp_path, capsys):
"""
Proves the CLI wrapper recurses directories, tallies the bloat savings
across multiple files, and prints a mathematically accurate summary.
"""
repo_dir = tmp_path / "legacy_src"
repo_dir.mkdir()

# File 1: Has 1 dead paragraph (10 LOC)
(repo_dir / "PGM1.cbl").write_text(
" DATA DIVISION.\n PROCEDURE DIVISION.\n MAIN.\n DEAD-P.\n",
encoding="utf-8"
)

# File 2: Has 2 orphaned vars (2 LOC)
(repo_dir / "PGM2.cbl").write_text(
" DATA DIVISION.\n 01 D1 PIC X.\n 01 D2 PIC X.\n PROCEDURE DIVISION.\n MAIN.\n",
encoding="utf-8"
)

test_args = ["cobol_graveyard_finder.py", str(repo_dir)]
with patch.object(sys, 'argv', test_args):
try:
graveyard_module.main()
except SystemExit as e:
assert e.code == 0

captured = capsys.readouterr()

# Assertions on the final CLI output calculations
assert "Files Flagged for Cleanup : 2" in captured.out
assert "Unused Memory Addresses : 2 orphaned variables" in captured.out
assert "Unreachable Logic Blocks : 1 phantom paragraphs" in captured.out
assert "Estimated Bloat Removed : ~12 Lines of Code" in captured.out
127 changes: 127 additions & 0 deletions tests/test_cobol_jcl_forge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import pytest

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'pytest' is not used.
import sys
import re

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 're' is not used.
from pathlib import Path

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'Path' is not used.
from unittest.mock import patch

# IMPORTANT: Adjust this path to match exactly where your file is located
import gitgalaxy.tools.cobol_to_cobol.cobol_jcl_forge as forge_module

# ==============================================================================
# TEST 1: The Flattener and Intent Extractor
# ==============================================================================
def test_cobol_intent_analysis(tmp_path):
"""
Proves that the engine correctly ignores column-7 comments, extracts
the PROGRAM-ID, cleans DD names, and identifies transactional DB blocks.
"""
mock_cobol = tmp_path / "MOCKPGM.cbl"

# Notice the strict 6-space margin and the column-7 asterisk
cobol_code = (
" IDENTIFICATION DIVISION.\n"
" PROGRAM-ID. 'GLB001'.\n"
" *SELECT FAKE-FILE ASSIGN TO FAKEDD. (Should be ignored!)\n"
" INPUT-OUTPUT SECTION.\n"
" FILE-CONTROL.\n"
" SELECT IN-FILE ASSIGN TO UT-S-INPUTDD.\n"
" SELECT OUT-FILE ASSIGN TO OUTPUTDD.\n"
" PROCEDURE DIVISION.\n"
" EXEC CICS\n"
" RECEIVE MAP('MAP1')\n"
" END-EXEC.\n"
" EXEC SQL\n"
" SELECT * FROM TABLE\n"
" END-EXEC.\n"
)
mock_cobol.write_text(cobol_code, encoding="utf-8")

intent = forge_module.analyze_cobol_intent(mock_cobol)

# 1. Verify basic extraction
assert intent["program_id"] == "GLB001", "Failed to extract PROGRAM-ID!"

# 2. Verify file extraction and prefix stripping (UT-S-)
files = {f["internal"]: f["dd_name"] for f in intent["files_requested"]}
assert "IN-FILE" in files and files["IN-FILE"] == "INPUTDD"
assert "OUT-FILE" in files and files["OUT-FILE"] == "OUTPUTDD"
assert "FAKE-FILE" not in files, "Failed to ignore column-7 comment!"

# 3. Verify transactional/database flags
assert intent["is_cics"] is True
assert intent["cics_calls"] == 1
assert intent["is_db2"] is True
assert intent["sql_calls"] == 1

# ==============================================================================
# TEST 2: The Zero-Trust JCL Generator
# ==============================================================================
def test_zero_trust_jcl_generation():
"""
Proves that the parsed intent dictionary correctly maps into a formatted,
runnable Mainframe JCL script with the requested architecture boundaries.
"""
mock_intent = {
"program_id": "TESTPGM",
"files_requested": [{"internal": "INFILE", "dd_name": "INPUT01"}],
"is_cics": True,
"is_db2": False
}

# Force a mock lineage to test the NEW disposition creation
mock_lineage = {"outputs": {"INPUT01"}, "inputs": set()}

jcl_output = forge_module.generate_zero_trust_jcl(
intent=mock_intent,
job_name="MOCKJOB",
account_code="9999",
lineage=mock_lineage
)

# 1. Job Card and Base Environment
assert "//MOCKJOB JOB (9999)" in jcl_output
assert "//STEP01 EXEC PGM=TESTPGM" in jcl_output

# 2. Architecture Flags
assert "ARCHITECTURE REQUIRES: CICS" in jcl_output
assert "DB2" not in jcl_output

# 3. File Dispositions
assert "//INPUT01 DD DSN=HERC01.DATA.INPUT01" in jcl_output
assert "DISP=(NEW,CATLG,DELETE)" in jcl_output # Because it was passed in the 'outputs' lineage

# ==============================================================================
# TEST 3: The Hygienic E2E CLI Routing
# ==============================================================================
def test_hygienic_cli_defaults(tmp_path):
"""
Proves the CLI wrapper correctly discovers files, isolates the output into a
timestamped hygienic directory, and successfully writes the JCL payload.
"""
# 1. Setup the physical legacy source directory
src_dir = tmp_path / "legacy_src"
src_dir.mkdir()

(src_dir / "PROG1.cbl").write_text(" PROGRAM-ID. P1.\n", encoding="utf-8")
(src_dir / "PROG2.cob").write_text(" PROGRAM-ID. P2.\n", encoding="utf-8")

# 2. Execute the Forge
test_args = ["cobol_jcl_forge.py", str(src_dir)]
with patch.object(sys, 'argv', test_args):
# We don't trap SystemExit because a successful run exits normally
forge_module.main()

# 3. Verify the Hygienic Output Directory
# Look for a directory matching 'legacy_src_forged_YYYYMMDD_HHMMSS'
directories = [d for d in tmp_path.iterdir() if d.is_dir() and "legacy_src_forged_" in d.name]
assert len(directories) == 1, "The engine failed to create the isolated hygienic directory!"

hygienic_dir = directories[0]

# 4. Verify the physical forged files
p1_jcl = hygienic_dir / "P1.jcl"
p2_jcl = hygienic_dir / "P2.jcl"

assert p1_jcl.exists(), "P1 JCL was not written to the hygienic directory!"
assert p2_jcl.exists(), "P2 JCL was not written to the hygienic directory!"
assert "EXEC PGM=P1" in p1_jcl.read_text(encoding="utf-8")
Loading