diff --git a/gitgalaxy/tools/cobol_to_cobol/cobol_compiler_forge.py b/gitgalaxy/tools/cobol_to_cobol/cobol_compiler_forge.py index 6fce893c..f6e7a4f6 100644 --- a/gitgalaxy/tools/cobol_to_cobol/cobol_compiler_forge.py +++ b/gitgalaxy/tools/cobol_to_cobol/cobol_compiler_forge.py @@ -87,8 +87,19 @@ def generate_build_jcl(source_text: str, prog_name: str, files: set, dialect: st jcl.append(f"// DCB=(RECFM=U,BLKSIZE=32760,DSORG=PO)") for f in files: - clean_f = re.sub(r'[^A-Z0-9]', '', f.upper()) + clean_f = f.upper().strip() + # 1. Strip IBM prefixes first + clean_f = re.sub(r'^(?:UT|UR)-S-', '', clean_f) + # 2. Strip non-alphanumeric characters + clean_f = re.sub(r'[^A-Z0-9]', '', clean_f) + # 3. Enforce 8-character Mainframe limit if len(clean_f) > 8: clean_f = clean_f[-8:] + + if clean_f: + jcl.append(f"//{clean_f} DD DSN=HERC01.DATA.{clean_f},DISP=(MOD,CATLG,DELETE),") + jcl.append(f"// UNIT=SYSDA,SPACE=(TRK,(10,10),RLSE),") + jcl.append(f"// DCB=(LRECL=80,RECFM=FB,BLKSIZE=800)") + if clean_f: jcl.append(f"//{clean_f} DD DSN=HERC01.DATA.{clean_f},DISP=(MOD,CATLG,DELETE),") jcl.append(f"// UNIT=SYSDA,SPACE=(TRK,(10,10),RLSE),") diff --git a/gitgalaxy/tools/cobol_to_cobol/cobol_graveyard_finder.py b/gitgalaxy/tools/cobol_to_cobol/cobol_graveyard_finder.py index 00746015..1dd3b9be 100644 --- a/gitgalaxy/tools/cobol_to_cobol/cobol_graveyard_finder.py +++ b/gitgalaxy/tools/cobol_to_cobol/cobol_graveyard_finder.py @@ -16,7 +16,7 @@ def resolve_copybooks(content: str, source_path: Path) -> str: Handles dynamic variable swapping via the REPLACING clause. """ # Matches: COPY NAME. or COPY NAME REPLACING ==A== BY ==B==. - copy_pattern = re.compile(r'^[ \t]*COPY\s+[\'"]?([A-Z0-9_\-]+)[\'"]?(?:\s+REPLACING\s+(.+?))?\.?', re.MULTILINE | re.IGNORECASE) + copy_pattern = re.compile(r'^[ \t]*COPY\s+[\'"]?([A-Z0-9_\-]+)[\'"]?(?:\s+REPLACING\s+(.+?))?\.', re.MULTILINE | re.IGNORECASE) def replacer(match): copy_name = match.group(1).upper() @@ -33,8 +33,8 @@ def replacer(match): # Extracts pairs, ignoring the optional == delimiters pairs = re.findall(r'(?:==)?([A-Z0-9_\-]+)(?:==)?\s+BY\s+(?:==)?([A-Z0-9_\-]+)(?:==)?', replacing_clause, re.IGNORECASE) for old_val, new_val in pairs: - # Use word boundaries (\b) so we don't accidentally replace partial words - cpy_content = re.sub(r'\b' + re.escape(old_val) + r'\b', new_val, cpy_content) + # Use negative lookarounds so we don't accidentally replace partial words with hyphens + cpy_content = re.sub(r'(? --- START COPY {copy_name} ---\n{cpy_content}\n*> --- END COPY {copy_name} ---" diff --git a/gitgalaxy/tools/cobol_to_cobol/cobol_lexical_patcher.py b/gitgalaxy/tools/cobol_to_cobol/cobol_lexical_patcher.py index d96fc2de..95071639 100644 --- a/gitgalaxy/tools/cobol_to_cobol/cobol_lexical_patcher.py +++ b/gitgalaxy/tools/cobol_to_cobol/cobol_lexical_patcher.py @@ -33,7 +33,7 @@ def patch_lexical_traps(filepath: Path) -> bool: return False # Fast check before engaging heavy regex - if "NEXT SENTENCE" not in content.upper(): + if not re.search(r'\bNEXT\s+SENTENCE\b', content, re.IGNORECASE): return False # 1. Sense the Environment diff --git a/gitgalaxy/tools/cobol_to_cobol/cobol_microservice_slicer.py b/gitgalaxy/tools/cobol_to_cobol/cobol_microservice_slicer.py index 6de6c473..9d5ca1a3 100644 --- a/gitgalaxy/tools/cobol_to_cobol/cobol_microservice_slicer.py +++ b/gitgalaxy/tools/cobol_to_cobol/cobol_microservice_slicer.py @@ -77,11 +77,12 @@ def slice_business_logic(filepath: Path, initial_var: str, dead_paras: set = Non comp_match = re.search(r'COMPUTE\s+([A-Z0-9\-]+)\s*=', clean_line) if comp_match: var1 = comp_match.group(1) - if var1 in tainted_vars: - # Taint every variable inside the math equation - vars_in_eq = re.findall(r'([A-Z][A-Z0-9\-]+)', clean_line.split('=')[1]) + vars_in_eq = re.findall(r'([A-Z][A-Z0-9\-]+)', clean_line.split('=')[1]) + # Taint forwards and backwards! + if var1 in tainted_vars or any(v in tainted_vars for v in vars_in_eq): + tainted_vars.add(var1) tainted_vars.update(vars_in_eq) - + # ========================================================================== # PASS 2: Extraction # ========================================================================== diff --git a/gitgalaxy/tools/cobol_to_cobol/cobol_schema_forge.py b/gitgalaxy/tools/cobol_to_cobol/cobol_schema_forge.py index d8492688..369414fd 100644 --- a/gitgalaxy/tools/cobol_to_cobol/cobol_schema_forge.py +++ b/gitgalaxy/tools/cobol_to_cobol/cobol_schema_forge.py @@ -74,7 +74,8 @@ def forge_schemas(filepath: Path, ignore_vars: set = None, corporate_header: str r'^[ \t]*(?P0[1-9]|[1-4][0-9]|77)[ \t]+' r'(?P[A-Z0-9\-]+)' r'(?:[ \t]+PIC(?:TURE)?[ \t]+(?P[A-Z0-9\(\)V\.\-]+))?' - r'(?:[ \t]+(?:IS[ \t]+)?(?PCOMP(?:-[1-5])?|BINARY|PACKED-DECIMAL))?', + r'(?:[ \t]+(?:IS[ \t]+)?(?PCOMP(?:-[1-5])?|BINARY|PACKED-DECIMAL))?' + r'.*$', re.MULTILINE ) diff --git a/tests/test_cobol_agent_task_forge.py b/tests/test_cobol_agent_task_forge.py new file mode 100644 index 00000000..c1956963 --- /dev/null +++ b/tests/test_cobol_agent_task_forge.py @@ -0,0 +1,117 @@ +import pytest +import json +from pathlib import Path + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_agent_task_forge as forge_module + +# ============================================================================== +# TEST 1: The Context Merger (Ticket Generation) +# ============================================================================== +def test_generate_agent_ticket_merging(tmp_path): + """ + Proves the engine accurately builds the JSON ticket, strips the filename + prefix from the anomaly strings, and seamlessly merges the IR lineage. + """ + mock_file = tmp_path / "PGM1.cbl" + mock_anomalies = [ + "[PGM1.cbl : Line 0010] CRITICAL LIMIT - ALTER detected", + "[PGM1.cbl : Line 0020] HIGH LIMIT - COPY REPLACING detected" + ] + + mock_ir = { + "analysis": { + "lineage": { + "inputs": ["FILE-IN"], + "outputs": ["FILE-OUT"], + "unresolved_calls": ["SUBPROG"] + } + } + } + + ticket = forge_module.generate_agent_ticket("PGM1.cbl", mock_file, mock_anomalies, mock_ir) + + # 1. Base Ticket Structure + assert ticket["job_id"] == "PGM1_REMEDIATION" + assert ticket["task_type"] == "STRUCTURAL_ANOMALY_RESOLUTION" + assert ticket["target_file"] == str(mock_file.resolve()) + + # 2. Anomaly Stripping + assert "CRITICAL LIMIT - ALTER detected" in ticket["context"]["detected_anomalies"] + assert "[PGM1.cbl" not in ticket["context"]["detected_anomalies"][0], "Failed to strip the prefix!" + + # 3. IR Lineage Merging + assert ticket["context"]["inputs_required"] == ["FILE-IN"] + assert ticket["context"]["outputs_produced"] == ["FILE-OUT"] + assert ticket["context"]["external_calls"] == ["SUBPROG"] + +# ============================================================================== +# TEST 2: The E2E Job Dispatcher (Grouping & File I/O) +# ============================================================================== +def test_forge_agent_jobs_e2e(tmp_path): + """ + Proves the engine correctly groups multiple flags by file, matches them to + physical source files, and writes the JSON job tickets to the designated folder. + """ + clean_room = tmp_path / "clean_room" + source_dir = tmp_path / "legacy_src" + source_dir.mkdir() + + # Create the mock source file so the engine finds it + (source_dir / "PGM2.cbl").write_text("IDENTIFICATION DIVISION.", encoding="utf-8") + + mock_flags = [ + "[PGM2.cbl] ERROR 1", + "[PGM2.cbl] ERROR 2" + ] + + jobs_created = forge_module.forge_agent_jobs(clean_room, source_dir, mock_flags) + + assert jobs_created == 1, "Failed to group 2 flags into 1 job ticket!" + + # Verify the output directory and file + job_dir = clean_room / "06_ai_agent_jobs" + job_file = job_dir / "PGM2_agent_job.json" + + assert job_dir.exists(), "Failed to create the 06_ai_agent_jobs directory!" + assert job_file.exists(), "Failed to write the physical JSON ticket!" + + # Verify the written payload + payload = json.loads(job_file.read_text(encoding="utf-8")) + assert payload["job_id"] == "PGM2_REMEDIATION" + assert len(payload["context"]["detected_anomalies"]) == 2 + +# ============================================================================== +# TEST 3: Graceful Degradation (Missing IR & Missing Source) +# ============================================================================== +def test_forge_agent_jobs_graceful_degradation(tmp_path): + """ + Proves that missing IR state files don't crash the generation (fallback to + empty arrays) and missing physical source files safely abort ticket creation. + """ + clean_room = tmp_path / "clean_room" + source_dir = tmp_path / "legacy_src" + source_dir.mkdir() + + # PGM3 exists, but has NO matching IR file in 04_ir_state_dumps + (source_dir / "PGM3.cbl").write_text("IDENTIFICATION DIVISION.", encoding="utf-8") + + # PGM4 does NOT exist in the source directory + + mock_flags = [ + "[PGM3.cbl] ERROR 1", + "[PGM4.cbl] ERROR 2" + ] + + jobs_created = forge_module.forge_agent_jobs(clean_room, source_dir, mock_flags) + + # Only PGM3 should generate a ticket. PGM4 must be skipped. + assert jobs_created == 1, "Failed to skip the missing source file!" + + job_file = clean_room / "06_ai_agent_jobs" / "PGM3_agent_job.json" + assert job_file.exists() + + # Ensure it gracefully degraded the missing IR context to empty arrays + payload = json.loads(job_file.read_text(encoding="utf-8")) + assert payload["context"]["inputs_required"] == [], "Graceful fallback for missing IR inputs failed!" + assert payload["context"]["outputs_produced"] == [], "Graceful fallback for missing IR outputs failed!" \ No newline at end of file diff --git a/tests/test_cobol_compiler_forge.py b/tests/test_cobol_compiler_forge.py new file mode 100644 index 00000000..abaa6eb8 --- /dev/null +++ b/tests/test_cobol_compiler_forge.py @@ -0,0 +1,104 @@ +import pytest +import sys +from pathlib import Path +from unittest.mock import patch + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_compiler_forge as forge_module + +# ============================================================================== +# TEST 1: The Dialect Compiler Router +# ============================================================================== +def test_dialect_router_jcl_generation(): + """ + Proves the JCL generation engine dynamically routes to the correct + Mainframe compiler (COBUCL vs IGYWCL) based on the detected dialect. + """ + mock_source = "PROGRAM-ID. HELLO." + + # 1. COBOL-74 Path (Legacy) + jcl_74 = forge_module.generate_build_jcl(mock_source, "PGM1", set(), "COBOL-74") + assert "EXEC COBUCL" in jcl_74, "Failed to route COBOL-74 to the legacy compiler!" + assert "EXEC IGYWCL" not in jcl_74 + + # 2. COBOL-85 Path (Modern) + jcl_85 = forge_module.generate_build_jcl(mock_source, "PGM2", set(), "COBOL-85") + assert "EXEC IGYWCL" in jcl_85, "Failed to route COBOL-85 to the modern enterprise compiler!" + assert "EXEC COBUCL" not in jcl_85 + +# ============================================================================== +# TEST 2: The Infinite Loop Failsafe +# ============================================================================== +def test_flatten_copybooks_cyclic_failsafe(tmp_path): + """ + Proves the engine mathematically breaks infinite copybook recursion loops + (e.g., A imports B, B imports A) without causing a StackOverflow crash. + """ + repo_dir = tmp_path / "cyclic_repo" + repo_dir.mkdir() + + # Create two copybooks that import each other + (repo_dir / "CYCLE-A.cpy").write_text("COPY CYCLE-B.", encoding="utf-8") + (repo_dir / "CYCLE-B.cpy").write_text("COPY CYCLE-A.", encoding="utf-8") + + # A root program that triggers the trap + root_code = "PROGRAM-ID. BOOM.\nCOPY CYCLE-A." + + # Run the flattener + inlined_code = forge_module.flatten_copybooks(root_code, repo_dir) + + # If the test finishes without crashing with a RecursionError, the failsafe worked. + # We verify it successfully nested multiple times before pulling the emergency brake. + assert inlined_code.count("INLINED COPYBOOK: CYCLE-A") >= 1, "Failed to recurse at all!" + assert "PROGRAM-ID. BOOM." in inlined_code, "Root AST was destroyed by the cycle!" + +# ============================================================================== +# TEST 3: The E2E Flattener & JCL Provisioning +# ============================================================================== +def test_compiler_forge_e2e(tmp_path): + """ + Proves the E2E pipeline correctly discovers a COBOL file, inlines its + local copybook, provisions physical datasets via IEFBR14, and saves the JCL. + """ + src_dir = tmp_path / "src" + out_dir = tmp_path / "out" + src_dir.mkdir() + + # 1. The Copybook + (src_dir / "MYDATA.cpy").write_text(" 01 MY-VAR PIC X.", encoding="utf-8") + + # 2. The Main Program (Requires modern COBOL-85 compiler due to END-IF) + (src_dir / "MAINPGM.cbl").write_text( + " PROGRAM-ID. MAINPGM.\n" + " SELECT FILE-IN ASSIGN TO UT-S-INPUT01.\n" + " DATA DIVISION.\n" + " COPY MYDATA.\n" + " PROCEDURE DIVISION.\n" + " IF 1 = 1 CONTINUE END-IF.", # END-IF triggers COBOL-85 + encoding="utf-8" + ) + + # 3. Execute the Forge + test_args = ["cobol_compiler_forge.py", str(src_dir), str(out_dir)] + with patch.object(sys, 'argv', test_args): + try: + forge_module.main() + except SystemExit as e: + assert e.code == 0 + + # 4. Verify Output Structure + jcl_file = out_dir / "BUILD_MAINPGM.jcl" + assert jcl_file.exists(), "Forge failed to create the target JCL file!" + + jcl_content = jcl_file.read_text(encoding="utf-8") + + # A) Verify Infrastructure Provisioning + assert "EXEC PGM=IEFBR14" in jcl_content + assert "//INPUT01 DD DSN=HERC01.DATA.INPUT01" in jcl_content, "Failed to map SELECT ASSIGN to DSN!" + + # B) Verify Dialect Routing + assert "EXEC IGYWCL" in jcl_content, "Failed to dynamically route to modern compiler!" + + # C) Verify Copybook Inlining + assert "INLINED COPYBOOK: MYDATA" in jcl_content + assert "01 MY-VAR PIC X." in jcl_content, "Failed to inline the actual copybook data!" \ No newline at end of file diff --git a/tests/test_cobol_dag_architect.py b/tests/test_cobol_dag_architect.py new file mode 100644 index 00000000..a4a20b99 --- /dev/null +++ b/tests/test_cobol_dag_architect.py @@ -0,0 +1,138 @@ +import pytest +import sys +from pathlib import Path +from unittest.mock import patch + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_dag_architect as dag_module + +# ============================================================================== +# TEST 1: The Ghost Deflector & Intent Extraction +# ============================================================================== +def test_ghost_deflector_lineage(tmp_path): + """ + Proves the lineage extractor correctly maps DD assignments, strips prefixes, + catches dynamic calls, and perfectly ignores 'OPEN' statements hidden inside + paragraphs marked as dead. + """ + mock_cobol = tmp_path / "PGM1.cbl" + cobol_code = ( + " PROGRAM-ID. PGM1.\n" + " SELECT FILE-IN ASSIGN TO UT-S-INPUT01.\n" + " SELECT FILE-OUT ASSIGN TO OUTPUT01.\n" + " PROCEDURE DIVISION.\n" + " MAIN-ENTRY.\n" + " OPEN INPUT FILE-IN.\n" + " CALL 'STATIC-PGM'.\n" # Static call (should be ignored) + " CALL WS-DYN-PGM.\n" # Dynamic call (Honesty Sensor should catch) + " DEAD-PARA.\n" + " OPEN OUTPUT FILE-OUT.\n" # This is dead code! + ) + mock_cobol.write_text(cobol_code, encoding="utf-8") + + # 1. Test without dead code context (Base baseline) + raw_lineage = dag_module.extract_lineage(mock_cobol) + assert "INPUT01" in raw_lineage["inputs"] + assert "OUTPUT01" in raw_lineage["outputs"] # Without Ghost Deflector, it hallucinates this output + + # 2. Test WITH the Ghost Deflector activated + safe_lineage = dag_module.extract_lineage(mock_cobol, dead_paras={"DEAD-PARA"}) + assert "INPUT01" in safe_lineage["inputs"] + assert "OUTPUT01" not in safe_lineage["outputs"], "Ghost Deflector failed! It hallucinated dead code dependencies." + + # 3. Test the Honesty Sensor + assert "WS-DYN-PGM" in safe_lineage["unresolved_calls"], "Failed to catch the dynamic jump!" + assert "STATIC-PGM" not in safe_lineage["unresolved_calls"] + +# ============================================================================== +# TEST 2: Mathematical Topological Sort (Happy Path) +# ============================================================================== +def test_dag_architect_topological_sort(tmp_path, capsys): + """ + Proves Kahn's Algorithm perfectly calculates execution order by resolving + Producer -> Consumer file dependencies. + """ + repo_dir = tmp_path / "dag_repo" + repo_dir.mkdir() + + # PGM_C reads FILE2 and writes FILE3 + (repo_dir / "PGMC.cbl").write_text( + " PROGRAM-ID. PGMC.\n" + " SELECT F2 ASSIGN TO FILE2.\n" + " SELECT F3 ASSIGN TO FILE3.\n" + " PROCEDURE DIVISION.\n" + " OPEN INPUT F2.\n" + " OPEN OUTPUT F3.\n", encoding="utf-8" + ) + + # PGM_A reads FILE0 and writes FILE1 (Should run FIRST) + (repo_dir / "PGMA.cbl").write_text( + " PROGRAM-ID. PGMA.\n" + " SELECT F1 ASSIGN TO FILE1.\n" + " PROCEDURE DIVISION.\n" + " OPEN OUTPUT F1.\n", encoding="utf-8" + ) + + # PGM_B reads FILE1 and writes FILE2 (Should run SECOND) + (repo_dir / "PGMB.cbl").write_text( + " PROGRAM-ID. PGMB.\n" + " SELECT F1 ASSIGN TO FILE1.\n" + " SELECT F2 ASSIGN TO FILE2.\n" + " PROCEDURE DIVISION.\n" + " OPEN INPUT F1.\n" + " OPEN OUTPUT F2.\n", encoding="utf-8" + ) + + test_args = ["cobol_dag_architect.py", str(repo_dir)] + with patch.object(sys, 'argv', test_args): + dag_module.main() + + captured = capsys.readouterr() + + # Assert execution order is exactly A -> B -> C regardless of file read order + assert "STEP 01: Run [PGMA]" in captured.out + assert "STEP 02: Run [PGMB]" in captured.out + assert "STEP 03: Run [PGMC]" in captured.out + +# ============================================================================== +# TEST 3: Cycle Detection (Deadlock Trap) +# ============================================================================== +def test_dag_architect_cycle_detection(tmp_path, capsys): + """ + Proves the engine catches circular data dependencies and halts execution + before generating a mathematically impossible pipeline. + """ + repo_dir = tmp_path / "cyclic_repo" + repo_dir.mkdir() + + # PGM_1 reads FILE-B and writes FILE-A + (repo_dir / "P1.cbl").write_text( + " PROGRAM-ID. P1.\n" + " SELECT FB ASSIGN TO FILE-B.\n" + " SELECT FA ASSIGN TO FILE-A.\n" + " PROCEDURE DIVISION.\n" + " OPEN INPUT FB.\n" + " OPEN OUTPUT FA.\n", encoding="utf-8" + ) + + # PGM_2 reads FILE-A and writes FILE-B (Creates a deadlock cycle) + (repo_dir / "P2.cbl").write_text( + " PROGRAM-ID. P2.\n" + " SELECT FA ASSIGN TO FILE-A.\n" + " SELECT FB ASSIGN TO FILE-B.\n" + " PROCEDURE DIVISION.\n" + " OPEN INPUT FA.\n" + " OPEN OUTPUT FB.\n", encoding="utf-8" + ) + + test_args = ["cobol_dag_architect.py", str(repo_dir)] + with patch.object(sys, 'argv', test_args): + with pytest.raises(SystemExit) as exc: + dag_module.main() + + # Must exit with error code 1 due to the cycle + assert exc.value.code == 1, "Failed to trap the cycle and crash the build!" + + captured = capsys.readouterr() + assert "WARNING: Cyclic Dependency Detected" in captured.out + assert "Deadlocked Programs:" in captured.out \ No newline at end of file diff --git a/tests/test_cobol_etl_unpacker.py b/tests/test_cobol_etl_unpacker.py new file mode 100644 index 00000000..5ed5d03b --- /dev/null +++ b/tests/test_cobol_etl_unpacker.py @@ -0,0 +1,132 @@ +import pytest +import sys +import json +import csv +from pathlib import Path +from unittest.mock import patch + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_etl_unpacker as etl_module + +# ============================================================================== +# TEST 1: The Schema Byte Calculator +# ============================================================================== +def test_calculate_byte_layout(): + """ + Proves the engine accurately parses PIC clauses from the JSON schema + to calculate physical byte boundaries, especially COMP-3 compression math. + """ + mock_schema = { + "properties": { + "FIRST_NAME": {"description": "Legacy PIC: X(10)"}, # 10 bytes text + "AGE": {"description": "Legacy PIC: 999"}, # 3 bytes numeric (zoned) + "BALANCE": {"description": "Legacy PIC: 9(5)V9(2) COMP-3"}, # 7 digits COMP-3 = 4 bytes + "DEBT": {"description": "Legacy PIC: 9(4)V99 COMP-3"} # 6 digits COMP-3 = 4 bytes + } + } + + layout = etl_module.calculate_byte_layout(mock_schema) + + assert len(layout) == 4 + + # 1. Text Field (X) + assert layout[0]["name"] == "FIRST_NAME" + assert layout[0]["bytes"] == 10 + + # 2. Zoned Decimal Field (9) + assert layout[1]["name"] == "AGE" + assert layout[1]["bytes"] == 3 + assert layout[1]["is_numeric"] is True + assert layout[1]["is_comp3"] is False + + # 3. Packed Decimal COMP-3 Math: ceil((7 + 1) / 2) = 4 + assert layout[2]["name"] == "BALANCE" + assert layout[2]["bytes"] == 4 + assert layout[2]["decimals"] == 2 + assert layout[2]["is_comp3"] is True + + # 4. Packed Decimal COMP-3 Math: ceil((6 + 1) / 2) = 4 + assert layout[3]["name"] == "DEBT" + assert layout[3]["bytes"] == 4 + +# ============================================================================== +# TEST 2: The COMP-3 Hexadecimal Decoder +# ============================================================================== +def test_unpack_comp3(): + """ + Proves that IBM Packed Decimal bytes are correctly parsed into Python floats, + verifying nibble sign flags (C/F=Positive, D=Negative) and decimal shifts. + """ + # 123C -> Positive 123 (0 decimals) + assert etl_module.unpack_comp3(b'\x12\x3C', 0) == 123.0 + + # 123D -> Negative 123 (0 decimals) + assert etl_module.unpack_comp3(b'\x12\x3D', 0) == -123.0 + + # 0123456C -> Positive 123456 (2 decimals) -> 1234.56 + assert etl_module.unpack_comp3(b'\x01\x23\x45\x6C', 2) == 1234.56 + + # 0001234D -> Negative 1234 (2 decimals) -> -12.34 + assert etl_module.unpack_comp3(b'\x00\x01\x23\x4D', 2) == -12.34 + + # 123F -> Unsigned (Positive) 123 (0 decimals) + assert etl_module.unpack_comp3(b'\x12\x3F', 0) == 123.0 + +# ============================================================================== +# TEST 3: The E2E Binary Pipeline (EBCDIC -> CSV) +# ============================================================================== +def test_unpack_ebcdic_file_e2e(tmp_path): + """ + Proves the system can ingest a raw binary file, chunk it perfectly according + to the calculated layout, translate cp037 EBCDIC to UTF-8, and write a CSV. + """ + work_dir = tmp_path / "etl_workspace" + work_dir.mkdir() + + # 1. The Schema (Name: X(5), Balance: 9(5)V99 COMP-3) -> 5 + 4 = 9 bytes per record + schema_file = work_dir / "account_schema.json" + schema_file.write_text(json.dumps({ + "properties": { + "NAME": {"description": "Legacy PIC: X(5)"}, + "BALANCE": {"description": "Legacy PIC: 9(5)V99 COMP-3"} + } + }), encoding="utf-8") + + # 2. The Mock Binary Payload + # Record 1: 'ALICE' in EBCDIC + 12345.67 in COMP-3 + r1_name = "ALICE".encode('cp037') # 5 bytes + r1_bal = b'\x01\x23\x45\x67\xC0'[:4] # 4 bytes (01 23 45 6C) + r1_bal = b'\x01\x23\x45\x6C' + + # Record 2: 'BOB ' in EBCDIC + -12.34 in COMP-3 + r2_name = "BOB ".encode('cp037') # 5 bytes + r2_bal = b'\x00\x01\x23\x4D' # 4 bytes (-00012.34) + + binary_file = work_dir / "MAINFRAME.DAT" + binary_file.write_bytes(r1_name + r1_bal + r2_name + r2_bal) + + csv_out = work_dir / "output.csv" + + # 3. Execute the CLI + test_args = ["cobol_etl_unpacker.py", str(binary_file), str(schema_file), "--out", str(csv_out)] + with patch.object(sys, 'argv', test_args): + # We don't trap SystemExit because a successful run exits normally + etl_module.main() + + # 4. Verify CSV Output + assert csv_out.exists(), "ETL Unpacker failed to generate the CSV!" + + with open(csv_out, 'r', encoding='utf-8') as f: + reader = list(csv.reader(f)) + + # Header + assert reader[0] == ["NAME", "BALANCE"] + + # Record 1 + assert reader[1][0] == "ALICE" + assert float(reader[1][1]) == 1234.56 + + # Record 2 + assert reader[1][0] == "ALICE" # Wait, let's check index 2 for BOB + assert reader[2][0] == "BOB" + assert float(reader[2][1]) == -12.34 \ No newline at end of file diff --git a/tests/test_cobol_graveyard_finder.py b/tests/test_cobol_graveyard_finder.py new file mode 100644 index 00000000..682c0c76 --- /dev/null +++ b/tests/test_cobol_graveyard_finder.py @@ -0,0 +1,117 @@ +import pytest +import sys +from pathlib import Path +from unittest.mock import patch + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_graveyard_finder as graveyard_module + +# ============================================================================== +# TEST 1: The Copybook Shapeshifter (Inline Variable Swapping) +# ============================================================================== +def test_copybook_shapeshifter(tmp_path): + """ + Proves that the engine correctly resolves local .cpy files, injects their + contents, and accurately processes the REPLACING ==A== BY ==B== logic. + """ + repo_dir = tmp_path / "copy_repo" + repo_dir.mkdir() + + # 1. The main program + main_pgm = repo_dir / "MAIN.cbl" + main_pgm.write_text(" COPY MYDATA REPLACING ==OLD-VAR== BY ==NEW-VAR==.", encoding="utf-8") + + # 2. The external copybook + copybook = repo_dir / "MYDATA.cpy" + copybook.write_text(" 01 OLD-VAR PIC X(10).\n 01 OLD-VAR-X PIC X(5).", encoding="utf-8") + + # 3. Execute the resolver + raw_content = main_pgm.read_text(encoding="utf-8") + resolved_content = graveyard_module.resolve_copybooks(raw_content, main_pgm) + + # 4. Assertions + # A) Ensure the content was injected + assert "START COPY MYDATA" in resolved_content + # B) Ensure the strict boundary replacement worked (OLD-VAR became NEW-VAR) + assert "01 NEW-VAR PIC" in resolved_content + # C) ZERO-TRUST GUARD: Ensure partial matches were NOT replaced (OLD-VAR-X stays OLD-VAR-X) + assert "01 OLD-VAR-X PIC" in resolved_content, "The Shapeshifter destroyed a partial word match!" + +# ============================================================================== +# TEST 2: The AST Dead Code Math +# ============================================================================== +def test_ast_dead_code_math(tmp_path): + """ + Proves that the engine correctly separates data from execution, isolates + orphaned variables, and calculates unreachable phantom paragraphs. + """ + mock_cobol = tmp_path / "DEADPGM.cbl" + cobol_code = ( + " DATA DIVISION.\n" + " 01 USED-VAR PIC X.\n" + " 01 ORPHAN-VAR PIC X.\n" # Declared but never used + " 01 FILLER PIC X.\n" # Noise, should be ignored + " PROCEDURE DIVISION.\n" + " MAIN-PARA.\n" # Entry point (Reached) + " PERFORM USED-PARA.\n" + " USED-PARA.\n" # Reached via PERFORM + " DISPLAY USED-VAR.\n" + " DEAD-PARA.\n" # Unreachable (Phantom) + " DISPLAY 'HELLO'.\n" + " DEAD-EXIT.\n" # Ends in -EXIT (Should be ignored) + ) + mock_cobol.write_text(cobol_code, encoding="utf-8") + + metrics = graveyard_module.x_ray_dead_code(mock_cobol) + + # 1. Variable Assertions + assert "ORPHAN-VAR" in metrics["orphaned_vars"] + assert "USED-VAR" not in metrics["orphaned_vars"] + assert "FILLER" not in metrics["orphaned_vars"], "Engine failed to filter out FILLER noise!" + + # 2. Paragraph Assertions + assert "DEAD-PARA" in metrics["dead_paras"] + assert "MAIN-PARA" not in metrics["dead_paras"], "Engine flagged the entry point as dead!" + assert "USED-PARA" not in metrics["dead_paras"] + assert "DEAD-EXIT" not in metrics["dead_paras"], "Engine failed to filter out *-EXIT paragraphs!" + + # 3. Math (1 orphaned var + 1 dead para * 10 lines = 11 LOC saved) + assert metrics["loc_saved"] == 11 + +# ============================================================================== +# TEST 3: The E2E CLI Aggregation +# ============================================================================== +def test_graveyard_cli_e2e(tmp_path, capsys): + """ + Proves the CLI wrapper recurses directories, tallies the bloat savings + across multiple files, and prints a mathematically accurate summary. + """ + repo_dir = tmp_path / "legacy_src" + repo_dir.mkdir() + + # File 1: Has 1 dead paragraph (10 LOC) + (repo_dir / "PGM1.cbl").write_text( + " DATA DIVISION.\n PROCEDURE DIVISION.\n MAIN.\n DEAD-P.\n", + encoding="utf-8" + ) + + # File 2: Has 2 orphaned vars (2 LOC) + (repo_dir / "PGM2.cbl").write_text( + " DATA DIVISION.\n 01 D1 PIC X.\n 01 D2 PIC X.\n PROCEDURE DIVISION.\n MAIN.\n", + encoding="utf-8" + ) + + test_args = ["cobol_graveyard_finder.py", str(repo_dir)] + with patch.object(sys, 'argv', test_args): + try: + graveyard_module.main() + except SystemExit as e: + assert e.code == 0 + + captured = capsys.readouterr() + + # Assertions on the final CLI output calculations + assert "Files Flagged for Cleanup : 2" in captured.out + assert "Unused Memory Addresses : 2 orphaned variables" in captured.out + assert "Unreachable Logic Blocks : 1 phantom paragraphs" in captured.out + assert "Estimated Bloat Removed : ~12 Lines of Code" in captured.out \ No newline at end of file diff --git a/tests/test_cobol_jcl_auditor.py b/tests/test_cobol_jcl_auditor.py new file mode 100644 index 00000000..98648c56 --- /dev/null +++ b/tests/test_cobol_jcl_auditor.py @@ -0,0 +1,123 @@ +import pytest +import sys +import json +from pathlib import Path +from unittest.mock import patch + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_jcl_auditor as auditor_module + +# ============================================================================== +# TEST 1: The Raw Intent Parsing Engine +# ============================================================================== +def test_parse_jcl_intent(tmp_path): + """ + Proves that the JCL parser correctly counts LOC while ignoring comments, + and successfully strips out IBM System Programs and System DDs to isolate + true business intent. + """ + mock_jcl = tmp_path / "LEGACY.jcl" + + # 7 active lines of code, 1 comment line + jcl_content = ( + "//TESTJOB JOB (1234),CLASS=A\n" + "//* THIS IS A COMMENT AND SHOULD BE IGNORED\n" + "//STEP01 EXEC PGM=IEBGENER\n" # System PGM (Should be ignored) + "//SYSOUT DD SYSOUT=*\n" # System DD (Should be ignored) + "//STEP02 EXEC PGM=BUSINESS01\n" # Custom PGM (Should be captured) + "//INPUT DD DSN=PROD.DATA.IN,DISP=SHR\n" # Custom DD (Should be captured) + "//OUTPUT DD DSN=PROD.DATA.OUT,DISP=NEW\n" # Custom DD (Should be captured) + "//SYSUDUMP DD SYSOUT=*\n" # System DD (Should be ignored) + ) + mock_jcl.write_text(jcl_content, encoding="utf-8") + + metrics = auditor_module.parse_jcl_intent(mock_jcl) + + # 1. Assert LOC + assert metrics["lines_of_code"] == 7, "Failed to correctly count active LOC!" + + # 2. Assert Program filtering + assert "BUSINESS01" in metrics["exec_pgms"] + assert "IEBGENER" not in metrics["exec_pgms"], "Failed to filter out IBM System Programs!" + + # 3. Assert DD filtering + assert "INPUT" in metrics["data_definitions"] + assert "OUTPUT" in metrics["data_definitions"] + assert "SYSOUT" not in metrics["data_definitions"], "Failed to filter out System DDs!" + assert "SYSUDUMP" not in metrics["data_definitions"] + +# ============================================================================== +# TEST 2: The Audit Engine and Bloat Math +# ============================================================================== +def test_audit_zero_trust_jcls(tmp_path): + """ + Proves that the core audit loop correctly maps forged JCLs to their legacy + counterparts and accurately calculates Bloat Reduction % and I/O shedding. + """ + legacy_dir = tmp_path / "legacy" + forged_dir = tmp_path / "forged" + legacy_dir.mkdir() + forged_dir.mkdir() + + # LEGACY JCL: 5 Lines of Code, 3 Custom DDs + (legacy_dir / "OLDJOB.txt").write_text( + "//STEP1 EXEC PGM=MYPGM\n" + "//DD1 DD DSN=FILE1\n" + "//DD2 DD DSN=FILE2\n" + "//DD3 DD DSN=FILE3\n" + "//SYSPRINT DD SYSOUT=*\n", + encoding="utf-8" + ) + + # FORGED JCL: 2 Lines of Code, 1 Custom DD + # (We shed 3 LOC and 2 Over-Permissioned DDs) + (forged_dir / "MYPGM.jcl").write_text( + "//STEP1 EXEC PGM=MYPGM\n" + "//DD1 DD DSN=FILE1\n", + encoding="utf-8" + ) + + report = auditor_module.audit_zero_trust_jcls(forged_dir, legacy_dir) + + assert report["audited"] == 1 + assert report["original_loc"] == 5 + assert report["forged_loc"] == 2 + assert report["excess_dds_blocked"] == 2, "Failed to calculate shed DDs!" + + # Bloat Reduction = ((5 - 2) / 5) * 100 = 60.0% + assert report["bloat_reduction_pct"] == 60.0, "Bloat math is mathematically incorrect!" + assert "MYPGM" in report["program_breakdown"] + +# ============================================================================== +# TEST 3: The CI/CD Pipeline Wrapper (--json flag) +# ============================================================================== +def test_auditor_cli_json_output(tmp_path, capsys): + """ + Proves the CLI wrapper correctly handles the --json flag, outputting pure + parseable JSON and exiting successfully without printing ASCII art. + """ + legacy_dir = tmp_path / "legacy" + forged_dir = tmp_path / "forged" + legacy_dir.mkdir() + forged_dir.mkdir() + + (legacy_dir / "OLD.jcl").write_text("//STEP EXEC PGM=PGMA\n//DD1 DD DSN=A\n", encoding="utf-8") + (forged_dir / "NEW.jcl").write_text("//STEP EXEC PGM=PGMA\n//DD1 DD DSN=A\n", encoding="utf-8") + + test_args = ["cobol_jcl_auditor.py", str(forged_dir), str(legacy_dir), "--json"] + + with patch.object(sys, 'argv', test_args): + try: + auditor_module.main() + except SystemExit as e: + assert e.code == 0, "CLI exited with error!" + + captured = capsys.readouterr() + + # 1. Assert no ASCII art or CLI vibes polluted the stdout + assert "GitGalaxy Spoke" not in captured.out + + # 2. Assert the output is pure JSON + parsed_output = json.loads(captured.out) + assert parsed_output["audited"] == 1 + assert parsed_output["bloat_reduction_pct"] == 0.0 \ No newline at end of file diff --git a/tests/test_cobol_jcl_forge.py b/tests/test_cobol_jcl_forge.py new file mode 100644 index 00000000..086b4ab6 --- /dev/null +++ b/tests/test_cobol_jcl_forge.py @@ -0,0 +1,127 @@ +import pytest +import sys +import re +from pathlib import Path +from unittest.mock import patch + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_jcl_forge as forge_module + +# ============================================================================== +# TEST 1: The Flattener and Intent Extractor +# ============================================================================== +def test_cobol_intent_analysis(tmp_path): + """ + Proves that the engine correctly ignores column-7 comments, extracts + the PROGRAM-ID, cleans DD names, and identifies transactional DB blocks. + """ + mock_cobol = tmp_path / "MOCKPGM.cbl" + + # Notice the strict 6-space margin and the column-7 asterisk + cobol_code = ( + " IDENTIFICATION DIVISION.\n" + " PROGRAM-ID. 'GLB001'.\n" + " *SELECT FAKE-FILE ASSIGN TO FAKEDD. (Should be ignored!)\n" + " INPUT-OUTPUT SECTION.\n" + " FILE-CONTROL.\n" + " SELECT IN-FILE ASSIGN TO UT-S-INPUTDD.\n" + " SELECT OUT-FILE ASSIGN TO OUTPUTDD.\n" + " PROCEDURE DIVISION.\n" + " EXEC CICS\n" + " RECEIVE MAP('MAP1')\n" + " END-EXEC.\n" + " EXEC SQL\n" + " SELECT * FROM TABLE\n" + " END-EXEC.\n" + ) + mock_cobol.write_text(cobol_code, encoding="utf-8") + + intent = forge_module.analyze_cobol_intent(mock_cobol) + + # 1. Verify basic extraction + assert intent["program_id"] == "GLB001", "Failed to extract PROGRAM-ID!" + + # 2. Verify file extraction and prefix stripping (UT-S-) + files = {f["internal"]: f["dd_name"] for f in intent["files_requested"]} + assert "IN-FILE" in files and files["IN-FILE"] == "INPUTDD" + assert "OUT-FILE" in files and files["OUT-FILE"] == "OUTPUTDD" + assert "FAKE-FILE" not in files, "Failed to ignore column-7 comment!" + + # 3. Verify transactional/database flags + assert intent["is_cics"] is True + assert intent["cics_calls"] == 1 + assert intent["is_db2"] is True + assert intent["sql_calls"] == 1 + +# ============================================================================== +# TEST 2: The Zero-Trust JCL Generator +# ============================================================================== +def test_zero_trust_jcl_generation(): + """ + Proves that the parsed intent dictionary correctly maps into a formatted, + runnable Mainframe JCL script with the requested architecture boundaries. + """ + mock_intent = { + "program_id": "TESTPGM", + "files_requested": [{"internal": "INFILE", "dd_name": "INPUT01"}], + "is_cics": True, + "is_db2": False + } + + # Force a mock lineage to test the NEW disposition creation + mock_lineage = {"outputs": {"INPUT01"}, "inputs": set()} + + jcl_output = forge_module.generate_zero_trust_jcl( + intent=mock_intent, + job_name="MOCKJOB", + account_code="9999", + lineage=mock_lineage + ) + + # 1. Job Card and Base Environment + assert "//MOCKJOB JOB (9999)" in jcl_output + assert "//STEP01 EXEC PGM=TESTPGM" in jcl_output + + # 2. Architecture Flags + assert "ARCHITECTURE REQUIRES: CICS" in jcl_output + assert "DB2" not in jcl_output + + # 3. File Dispositions + assert "//INPUT01 DD DSN=HERC01.DATA.INPUT01" in jcl_output + assert "DISP=(NEW,CATLG,DELETE)" in jcl_output # Because it was passed in the 'outputs' lineage + +# ============================================================================== +# TEST 3: The Hygienic E2E CLI Routing +# ============================================================================== +def test_hygienic_cli_defaults(tmp_path): + """ + Proves the CLI wrapper correctly discovers files, isolates the output into a + timestamped hygienic directory, and successfully writes the JCL payload. + """ + # 1. Setup the physical legacy source directory + src_dir = tmp_path / "legacy_src" + src_dir.mkdir() + + (src_dir / "PROG1.cbl").write_text(" PROGRAM-ID. P1.\n", encoding="utf-8") + (src_dir / "PROG2.cob").write_text(" PROGRAM-ID. P2.\n", encoding="utf-8") + + # 2. Execute the Forge + test_args = ["cobol_jcl_forge.py", str(src_dir)] + with patch.object(sys, 'argv', test_args): + # We don't trap SystemExit because a successful run exits normally + forge_module.main() + + # 3. Verify the Hygienic Output Directory + # Look for a directory matching 'legacy_src_forged_YYYYMMDD_HHMMSS' + directories = [d for d in tmp_path.iterdir() if d.is_dir() and "legacy_src_forged_" in d.name] + assert len(directories) == 1, "The engine failed to create the isolated hygienic directory!" + + hygienic_dir = directories[0] + + # 4. Verify the physical forged files + p1_jcl = hygienic_dir / "P1.jcl" + p2_jcl = hygienic_dir / "P2.jcl" + + assert p1_jcl.exists(), "P1 JCL was not written to the hygienic directory!" + assert p2_jcl.exists(), "P2 JCL was not written to the hygienic directory!" + assert "EXEC PGM=P1" in p1_jcl.read_text(encoding="utf-8") \ No newline at end of file diff --git a/tests/test_cobol_lexical_patcher.py b/tests/test_cobol_lexical_patcher.py new file mode 100644 index 00000000..a7667f3c --- /dev/null +++ b/tests/test_cobol_lexical_patcher.py @@ -0,0 +1,87 @@ +import pytest +from pathlib import Path + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_lexical_patcher as patcher_module + +# ============================================================================== +# TEST 1: The Dialect Sensor +# ============================================================================== +def test_detect_cobol_dialect(): + """ + Proves the sensor correctly dates the compiler era by scanning for + post-1974 structural keywords and scope terminators. + """ + # 1. COBOL-74 Baseline (Strict, no terminators) + assert patcher_module.detect_cobol_dialect("IF A = B NEXT SENTENCE.") == "COBOL-74" + assert patcher_module.detect_cobol_dialect("PERFORM PARA-A THRU PARA-B.") == "COBOL-74" + + # 2. COBOL-85 Modern Signatures + assert patcher_module.detect_cobol_dialect("IF A = B CONTINUE END-IF.") == "COBOL-85" + assert patcher_module.detect_cobol_dialect("EVALUATE WS-STATUS") == "COBOL-85" + assert patcher_module.detect_cobol_dialect("INITIALIZE WS-DATA") == "COBOL-85" + assert patcher_module.detect_cobol_dialect("*> This is an inline comment") == "COBOL-85" + +# ============================================================================== +# TEST 2: The COBOL-85 Modernization Patch +# ============================================================================== +def test_patch_cobol85_modernization(tmp_path): + """ + Proves that in a modern environment, the dangerous NEXT SENTENCE trap + is fully eradicated and replaced with a safe CONTINUE block. + """ + pgm = tmp_path / "PGM85.cbl" + # Contains END-IF, triggering the COBOL-85 sensor + pgm.write_text("IF X = Y NEXT SENTENCE END-IF.", encoding="utf-8") + + was_modified = patcher_module.patch_lexical_traps(pgm) + + assert was_modified is True, "Patcher failed to modify the infected file!" + + content = pgm.read_text(encoding="utf-8") + assert "CONTINUE *> GitGalaxy Patch" in content, "Failed to inject the safe modern patch!" + assert "NEXT SENTENCE" not in content, "The dangerous lexical trap survived!" + +# ============================================================================== +# TEST 3: The COBOL-74 Strict Mode Bypass +# ============================================================================== +def test_patch_cobol74_strict_mode(tmp_path): + """ + Proves that in a legacy environment, the engine normalizes the casing and + spacing of the trap for the AST slicer, but DOES NOT inject modern syntax + that would cause a compiler crash. + """ + pgm = tmp_path / "PGM74.cbl" + # Uses weird casing/spacing to ensure the regex normalization triggers a file write. + # No COBOL-85 terminators present. + pgm.write_text("IF X = Y nExt sEntEnce.", encoding="utf-8") + + was_modified = patcher_module.patch_lexical_traps(pgm) + + assert was_modified is True, "Patcher failed to normalize the spacing/casing!" + + content = pgm.read_text(encoding="utf-8") + assert "NEXT SENTENCE" in content, "Failed to enforce strict mode normalization!" + assert "CONTINUE" not in content, "FATAL: Injected modern code into a COBOL-74 file!" + assert "*>" not in content, "FATAL: Injected modern comment into a COBOL-74 file!" + +# ============================================================================== +# TEST 4: The Fast-Exit Optimization Guard +# ============================================================================== +def test_fast_exit_clean_file(tmp_path): + """ + Proves that files without the lexical trap are instantly skipped, + saving heavy Regex compilation and File I/O overhead. + """ + pgm = tmp_path / "CLEAN.cbl" + pgm.write_text("IF A = B DISPLAY 'SAFE CODE'.", encoding="utf-8") + + # Check the modification timestamp before scanning + initial_mtime = pgm.stat().st_mtime + + was_modified = patcher_module.patch_lexical_traps(pgm) + + assert was_modified is False, "False positive! Patcher modified a clean file." + + # Ensure the file was absolutely not touched on disk + assert pgm.stat().st_mtime == initial_mtime, "Patcher performed an unnecessary disk write!" \ No newline at end of file diff --git a/tests/test_cobol_microservice_slicer.py b/tests/test_cobol_microservice_slicer.py new file mode 100644 index 00000000..d934ca58 --- /dev/null +++ b/tests/test_cobol_microservice_slicer.py @@ -0,0 +1,111 @@ +import pytest +import sys +from pathlib import Path +from unittest.mock import patch + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_microservice_slicer as slicer_module + +# ============================================================================== +# TEST 1: The Recursive Alias Engine (Taint Tracking) +# ============================================================================== +def test_slicer_recursive_tainting(tmp_path): + """ + Proves that the engine successfully chains taints across multiple operations + (MOVE, ADD, COMPUTE) and extracts exactly the lines that touch the logic. + """ + pgm = tmp_path / "SLICE1.cbl" + cobol_code = ( + " PROCEDURE DIVISION.\n" + " MAIN-ENTRY.\n" + " MOVE TARGET-VAR TO VAR-B.\n" # Taints VAR-B + " ADD 10 TO VAR-B.\n" # Extracted (touches VAR-B) + " COMPUTE VAR-C = VAR-B * 2.\n" # Taints VAR-C + " DISPLAY VAR-C.\n" # Extracted (touches VAR-C) + " DISPLAY NOISE-VAR.\n" # Ignored (Untainted) + ) + pgm.write_text(cobol_code, encoding="utf-8") + + logic, taints = slicer_module.slice_business_logic(pgm, "TARGET-VAR") + + # 1. Verify Taint Graph + assert "TARGET-VAR" in taints + assert "VAR-B" in taints + assert "VAR-C" in taints + assert "NOISE-VAR" not in taints, "Engine hallucinated a taint on a noise variable!" + + # 2. Verify Line Extraction + assert len(logic) == 4, "Failed to slice the exact 4 lines of business logic!" + extracted_statements = [item["statement"] for item in logic] + assert "MOVE TARGET-VAR TO VAR-B." in extracted_statements + assert "COMPUTE VAR-C = VAR-B * 2." in extracted_statements + assert "DISPLAY NOISE-VAR." not in extracted_statements + +# ============================================================================== +# TEST 2: The Ghost Deflector (IR Context Awareness) +# ============================================================================== +def test_slicer_ghost_deflector(tmp_path): + """ + Proves that the slicer uses the IR RAM (dead_paras) to mathematically blind + itself to dead code, preventing false-positive taints and extractions. + """ + pgm = tmp_path / "SLICE2.cbl" + cobol_code = ( + " PROCEDURE DIVISION.\n" + " MAIN-ENTRY.\n" + " MOVE TARGET-VAR TO ALIAS-1.\n" + " DEAD-PARA.\n" # This paragraph is mathematically dead + " MOVE ALIAS-1 TO ALIAS-2.\n" # Should NOT taint ALIAS-2 + " DISPLAY ALIAS-2.\n" # Should NOT be extracted + ) + pgm.write_text(cobol_code, encoding="utf-8") + + logic, taints = slicer_module.slice_business_logic(pgm, "TARGET-VAR", dead_paras={"DEAD-PARA"}) + + # 1. Verify the deflector blocked the taint + assert "ALIAS-1" in taints + assert "ALIAS-2" not in taints, "Ghost Deflector failed! ALIAS-2 was tainted by dead code." + + # 2. Verify the deflector blocked the extraction + assert len(logic) == 1 + assert logic[0]["statement"] == "MOVE TARGET-VAR TO ALIAS-1." + assert "DEAD-PARA" not in [item["paragraph"] for item in logic] + +# ============================================================================== +# TEST 3: The Orphaned Memory Abort (Fast Exit) +# ============================================================================== +def test_slicer_orphaned_memory_abort(tmp_path): + """ + Proves that if the Graveyard Reaper identifies the variable as dead memory, + the slicer instantly aborts processing to save CPU cycles. + """ + pgm = tmp_path / "SLICE3.cbl" + pgm.write_text(" PROCEDURE DIVISION.\n MAIN.\n MOVE A TO B.\n", encoding="utf-8") + + logic, taints = slicer_module.slice_business_logic(pgm, "DEAD-VAR", orphaned_vars={"DEAD-VAR"}) + + assert logic == [], "Orphaned memory abort failed to return an empty logic slice!" + assert isinstance(taints, dict) + assert taints["DEAD-VAR"] == "ORPHANED_MEMORY", "Failed to return the abort payload!" + +# ============================================================================== +# TEST 4: The CLI E2E Output +# ============================================================================== +def test_slicer_cli_e2e(tmp_path, capsys): + """ + Proves the CLI wrapper correctly formats the extracted slice into terminal output. + """ + pgm = tmp_path / "SLICE4.cbl" + pgm.write_text(" PROCEDURE DIVISION.\n MAIN-ENTRY.\n MOVE T TO X.\n", encoding="utf-8") + + test_args = ["cobol_microservice_slicer.py", str(pgm), "--var", "T"] + with patch.object(sys, 'argv', test_args): + try: + slicer_module.main() + except SystemExit as e: + assert e.code == 0 + + captured = capsys.readouterr() + assert "TAINTS FOUND: T, X" in captured.out or "TAINTS FOUND: X, T" in captured.out + assert "[MAIN-ENTRY]" in captured.out + assert "MOVE T TO X." in captured.out \ No newline at end of file diff --git a/tests/test_cobol_refractor_controller.py b/tests/test_cobol_refractor_controller.py new file mode 100644 index 00000000..65001613 --- /dev/null +++ b/tests/test_cobol_refractor_controller.py @@ -0,0 +1,88 @@ +import pytest +from pathlib import Path +import sqlite3 + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.cobol_refractor_controller as controller_module + +# ============================================================================== +# TEST 1: The Scale Sensor (OOM Protection) +# ============================================================================== +def test_scale_sensor_calibration(tmp_path): + """ + Proves the orchestrator accurately calculates repository mass and dynamically + toggles the storage medium to prevent Out-Of-Memory (OOM) crashes. + """ + repo_dir = tmp_path / "legacy_repo" + repo_dir.mkdir() + + # Create 3 small mock COBOL files + for i in range(3): + (repo_dir / f"PGM{i}.cbl").write_text("IDENTIFICATION DIVISION.", encoding="utf-8") + + # 1. Test RAM Mode (Thresholds are higher than the payload) + mode, files = controller_module.calibrate_ir_medium(repo_dir, max_files=5, max_mb=10) + assert mode == "RAM", "Failed to default to high-speed RAM!" + assert len(files) == 3 + + # 2. Test SQLite Mode (Threshold tripped by file count) + mode, files = controller_module.calibrate_ir_medium(repo_dir, max_files=2, max_mb=10) + assert mode == "SQLITE", "Scale sensor failed to trip the SQLite safety switch!" + +# ============================================================================== +# TEST 2: Hybrid State Manager Parity +# ============================================================================== +def test_ir_state_manager_parity(tmp_path): + """ + Proves that the IR abstraction layer perfectly mirrors data retrieval + whether backed by temporary RAM or a physical SQLite disk database. + """ + # 1. Initialize RAM Manager + ram_mgr = controller_module.IRStateManager("RAM", tmp_path) + ram_mgr.record_dead_code("PGM-ALPHA", dead_paras={"GHOST-PARA"}, orphaned_vars={"DEAD-VAR"}) + + # 2. Initialize SQLite Manager + sql_mgr = controller_module.IRStateManager("SQLITE", tmp_path) + sql_mgr.record_dead_code("PGM-ALPHA", dead_paras={"GHOST-PARA"}, orphaned_vars={"DEAD-VAR"}) + + # 3. Assert Parity + assert ram_mgr.get_dead_paras("PGM-ALPHA") == sql_mgr.get_dead_paras("PGM-ALPHA") + assert ram_mgr.get_orphaned_vars("PGM-ALPHA") == sql_mgr.get_orphaned_vars("PGM-ALPHA") + + # 4. Verify SQLite strictly wrote to disk + assert (tmp_path / "gitgalaxy_ir.db").exists() + sql_mgr.close() + +# ============================================================================== +# TEST 3: Payload Integration Orchestrator +# ============================================================================== +def test_process_payload_integration(tmp_path): + """ + Proves the orchestrator successfully routes a file through the sub-tools + (Graveyard Reaper, Lineage Architect, Schema Forge) and aggregates the state. + """ + cbl_file = tmp_path / "MAINPGM.cbl" + cbl_file.write_text( + " PROGRAM-ID. MAINPGM.\n" + " DATA DIVISION.\n" + " 01 DEAD-VAR PIC X.\n" # Will trigger Graveyard Reaper + " PROCEDURE DIVISION.\n" + " MAIN.\n" + " DISPLAY 'HELLO'.\n", + encoding="utf-8" + ) + + mgr = controller_module.IRStateManager("RAM", tmp_path) + ir_state = controller_module.process_payload(cbl_file, mgr) + + # 1. Verify Metadata Extraction + assert ir_state["metadata"]["file_name"] == "MAINPGM.cbl" + + # 2. Verify Graveyard Sub-Tool Integration + assert "DEAD-VAR" in ir_state["analysis"]["graveyard"]["orphaned_vars"], "Orchestrator failed to invoke Graveyard Reaper!" + + # 3. Verify Schema Sub-Tool Integration + assert "schemas" in ir_state["generation"] + + # 4. Verify IR State Manager persistence + assert mgr.get_orphaned_vars("MAINPGM") == {"DEAD-VAR"}, "Orchestrator failed to sync with global IR State Manager!" \ No newline at end of file diff --git a/tests/test_cobol_schema_forge.py b/tests/test_cobol_schema_forge.py new file mode 100644 index 00000000..cb1f1af6 --- /dev/null +++ b/tests/test_cobol_schema_forge.py @@ -0,0 +1,88 @@ +import pytest +from pathlib import Path + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_schema_forge as forge_module + +# ============================================================================== +# TEST 1: The Data Type Translation Engine +# ============================================================================== +def test_parse_cobol_picture(): + """ + Proves the engine mathematically translates legacy COBOL PIC clauses into + precise PostgreSQL boundaries and JSON REST types. + """ + # 1. Strings / Text + assert forge_module.parse_cobol_picture("X(50)") == {"sql": "VARCHAR(50)", "json": "string"} + assert forge_module.parse_cobol_picture("XXX") == {"sql": "VARCHAR(3)", "json": "string"} + + # 2. Packed Decimals / Currency + assert forge_module.parse_cobol_picture("9(5)V99") == {"sql": "DECIMAL(7, 2)", "json": "number"} + assert forge_module.parse_cobol_picture("9(5)V9(2)") == {"sql": "DECIMAL(7, 2)", "json": "number"} + assert forge_module.parse_cobol_picture("999.99") == {"sql": "DECIMAL(5, 2)", "json": "number"} + + # 3. Integers (Scaling based on byte boundaries) + assert forge_module.parse_cobol_picture("9(4)") == {"sql": "SMALLINT", "json": "integer"} + assert forge_module.parse_cobol_picture("9(7)") == {"sql": "INTEGER", "json": "integer"} + assert forge_module.parse_cobol_picture("9(12)") == {"sql": "BIGINT", "json": "integer"} + +# ============================================================================== +# TEST 2: The Bloat Cutter (IR Context Synergy) +# ============================================================================== +def test_forge_schemas_bloat_cutter(tmp_path): + """ + Proves that the engine successfully ignores FILLER spaces, 88-level booleans, + and intentionally drops variables proven to be dead memory by the IR RAM. + """ + cpy = tmp_path / "MEMORY.cpy" + cpy.write_text(""" + 01 ROOT-TABLE. + 05 USED-VAR PIC X(10). + 05 DEAD-VAR PIC 9(4). + 05 FILLER PIC X(5). + 88 FLAG-VAR VALUE 'Y'. + """, encoding="utf-8") + + # Pass "DEAD-VAR" into the IR ignore list + schemas = forge_module.forge_schemas(cpy, ignore_vars={"DEAD-VAR"}) + sql_ddl = schemas["sql"] + + # Assertions + assert "USED_VAR" in sql_ddl + assert "DEAD_VAR" not in sql_ddl, "Bloat Cutter failed! Dead memory was migrated to the cloud." + assert "FILLER" not in sql_ddl, "Engine hallucinated a FILLER column!" + assert "FLAG_VAR" not in sql_ddl, "Engine hallucinated an 88-level column!" + +# ============================================================================== +# TEST 3: The E2E Forge & Honesty Sensor +# ============================================================================== +def test_forge_schemas_e2e(tmp_path): + """ + Proves the engine can slice the DATA DIVISION, generate a compliant PostgreSQL + table, build a REST JSON schema, and explicitly flag dangerous legacy patterns. + """ + cbl = tmp_path / "PGM.cbl" + cbl.write_text(""" + DATA DIVISION. + 01 ACCOUNT-RECORD. + 05 ACCT-ID PIC 9(8) COMP-3. + 05 ACCT-NAME PIC X(20) OCCURS 1 TO 5 TIMES DEPENDING ON ACCT-COUNT. + PROCEDURE DIVISION. + """, encoding="utf-8") + + schemas = forge_module.forge_schemas(cbl) + sql_ddl = schemas["sql"] + json_schema = schemas["json"] + + # 1. SQL DDL Verification + assert "CREATE TABLE ACCOUNT_RECORD" in sql_ddl, "Failed to name the table from the 01-level!" + assert "ACCT_ID" in sql_ddl and "ACCT_NAME" in sql_ddl + + # 2. Honesty Sensor Verification + assert "COMP-3 (Packed Decimal)" in sql_ddl, "Failed to tag the legacy COMP-3 footprint!" + assert "WARNING: OCCURS DEPENDING ON detected. Use JSONB." in sql_ddl, "Failed to trap the dynamic array!" + + # 3. JSON REST API Schema Verification + assert json_schema["title"] == "ACCOUNT_RECORD" + assert json_schema["properties"]["ACCT_ID"]["type"] == "integer" + assert "Legacy PIC: 9(8)" in json_schema["properties"]["ACCT_ID"]["description"] \ No newline at end of file diff --git a/tests/test_cobol_system_limits_reporter.py b/tests/test_cobol_system_limits_reporter.py new file mode 100644 index 00000000..3f2eb875 --- /dev/null +++ b/tests/test_cobol_system_limits_reporter.py @@ -0,0 +1,105 @@ +import pytest +import sys +from pathlib import Path +from unittest.mock import patch + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.cobol_to_cobol.cobol_system_limits_reporter as limit_reporter + +# ============================================================================== +# TEST 1: The Dragon Traps & Comment Shield +# ============================================================================== +def test_system_limits_regex_and_comments(tmp_path): + """ + Proves that the regex traps correctly identify all 3 anomalies, + but strictly ignore them if they are commented out in column 7. + """ + # 1. Create a physical mock COBOL file + cobol_file = tmp_path / "DRAGONS.cbl" + + # Notice the 6 spaces before the 7th column for standard COBOL formatting + cobol_code = ( + " IDENTIFICATION DIVISION.\n" + " PROGRAM-ID. DRAGONS.\n" + " * THIS IS A COMMENT: ALTER PARA-A TO PROCEED TO PARA-B\n" # Should be ignored + " PROCEDURE DIVISION.\n" + " PARA-1.\n" + " ALTER PARA-X TO PROCEED TO PARA-Y.\n" # Hit 1 (Line 6) + " DISPLAY 'HELLO'.\n" + " EXEC CICS HANDLE CONDITION ERROR(ERR-RTN).\n" # Hit 2 (Line 8) + " COPY 'MYLIB' REPLACING ==A== BY ==B==.\n" # Hit 3 (Line 9) + ) + cobol_file.write_text(cobol_code, encoding="utf-8") + + # 2. Execute the Scanner directly + anomalies = limit_reporter.scan_system_limits(cobol_file) + + # 3. Assertions + assert len(anomalies) == 3, "Failed to catch all 3 active anomalies or failed to ignore the comment!" + + # Join into a single string to easily assert the formatted output + output_str = "\n".join(anomalies) + + assert "Line 0006] CRITICAL LIMIT" in output_str and "dynamically rewritten" in output_str + assert "Line 0008] CRITICAL LIMIT" in output_str and "Asynchronous error routing" in output_str + assert "Line 0009] HIGH LIMIT" in output_str and "Macro substitution" in output_str + +# ============================================================================== +# TEST 2: The Clean Baseline +# ============================================================================== +def test_system_limits_clean_baseline(tmp_path): + """ + Proves that a mathematically deterministic, modern COBOL file + passes the Honesty Protocol without triggering false positives. + """ + clean_file = tmp_path / "CLEAN.cbl" + clean_code = ( + " IDENTIFICATION DIVISION.\n" + " PROGRAM-ID. CLEAN.\n" + " PROCEDURE DIVISION.\n" + " PERFORM PARA-1 THRU PARA-2.\n" + " GOBACK.\n" + ) + clean_file.write_text(clean_code, encoding="utf-8") + + anomalies = limit_reporter.scan_system_limits(clean_file) + assert len(anomalies) == 0, "False positive triggered on clean COBOL code!" + +# ============================================================================== +# TEST 3: E2E Directory Traversal +# ============================================================================== +def test_system_limits_cli_directory_traversal(tmp_path, capsys): + """ + Proves that the CLI wrapper correctly recurses through a directory, + targets ONLY .cbl and .cob files, and aggregates the warnings. + """ + repo_dir = tmp_path / "legacy_repo" + repo_dir.mkdir() + + # File 1: Infected .cbl file + (repo_dir / "PGM1.cbl").write_text(" ALTER P1 TO P2.\n", encoding="utf-8") + + # File 2: Infected .cob file + (repo_dir / "PGM2.cob").write_text(" COPY A REPLACING B.\n", encoding="utf-8") + + # File 3: Irrelevant file (should be ignored) + (repo_dir / "readme.txt").write_text("ALTER P1 TO P2.\n", encoding="utf-8") + + # Execute the CLI tool + test_args = ["cobol_system_limits_reporter.py", str(repo_dir)] + with patch.object(sys, 'argv', test_args): + # We catch SystemExit in case something fails, but a normal run exits gracefully + try: + limit_reporter.main() + except SystemExit as e: + if e.code != 0: + pytest.fail(f"CLI exited with unexpected error code: {e.code}") + + # Capture the print statements sent to stdout + captured = capsys.readouterr() + + # Verify the results + assert "scanning 2 files" in captured.out, "Failed to properly filter .cbl and .cob files!" + assert "PGM1.cbl : Line 0001" in captured.out + assert "PGM2.cob : Line 0001" in captured.out + assert "WARNING: Found 2 structural anomalies" in captured.out \ No newline at end of file diff --git a/tests/test_pii_leak_hunter.py b/tests/test_pii_leak_hunter.py new file mode 100644 index 00000000..562530db --- /dev/null +++ b/tests/test_pii_leak_hunter.py @@ -0,0 +1,78 @@ +import pytest +import sys +from pathlib import Path +from unittest.mock import patch + +# IMPORTANT: Adjust this path to match exactly where your file is located +import gitgalaxy.tools.terabyte_log_scanning.pii_leak_hunter as pii_module + +# ============================================================================== +# TEST 1: The Masking Engine (Data Destruction Verification) +# ============================================================================== +def test_pii_masking_engine(): + """ + Mathematically verifies that the regex engine correctly intercepts and + destroys sensitive PII data while preserving the safe formatting. + """ + # 1. VISA Test (Destroy 12 digits, keep last 4) + assert pii_module.mask_pii("Card: 4123456789012345") == "Card: VISA-MASKED-2345" + + # 2. MASTERCARD Test (Destroy 12 digits, keep last 4) + assert pii_module.mask_pii("Card: 5123456789012345") == "Card: MC-MASKED-2345" + + # 3. SSN Test (Destroy first 5 digits, keep last 4) + assert pii_module.mask_pii("ID: 123-45-6789") == "ID: XXX-XX-6789" + + # 4. AWS KEY Test (Keep prefix and last 4, destroy the 12-char middle) + assert pii_module.mask_pii("Key: AKIAIOSFODNN7EXAMPLE") == "Key: AKIA-XXXX-MPLE" + + # 5. The Combo Test (Multiple leaks in a single log line) + combo_log = "User AKIAIOSFODNN7EXAMPLE charged 4123456789012345" + assert pii_module.mask_pii(combo_log) == "User AKIA-XXXX-MPLE charged VISA-MASKED-2345" + +# ============================================================================== +# TEST 2: The E2E Stream Filter (File I/O and Isolation) +# ============================================================================== +def test_pii_leak_hunter_e2e(tmp_path): + """ + End-to-End test simulating a live log stream. + Proves that clean lines are dropped, PII lines are safely written, + and no raw sensitive data ever touches the output evidence log. + """ + # 1. Setup the physical mock log file + log_dir = tmp_path / "logs" + log_dir.mkdir() + target_log = log_dir / "production_dump.log" + + # Inject a mix of clean lines and highly sensitive data + target_log.write_text( + "2026-05-11T09:00 [INFO] System boot sequence normal\n" + "2026-05-11T10:00 [DEBUG] Transaction 4111111111111111 processed\n" + "2026-05-11T11:00 [ERROR] Failed AWS auth with AKIAIOSFODNN7EXAMPLE\n" + "2026-05-11T12:00 [WARN] Input SSN 999-99-9999 failed validation\n", + encoding="utf-8" + ) + + # 2. Execute the CLI tool + test_args = ["pii_leak_hunter.py", str(target_log)] + with patch.object(sys, 'argv', test_args): + pii_module.main() + + # 3. Verify the Evidence Log + evidence_file = log_dir / "production_dump_pii_leak_evidence.log" + assert evidence_file.exists(), "The hunter failed to generate the safe evidence log!" + + content = evidence_file.read_text(encoding="utf-8") + + # A) Ensure the clean lines were ignored (Saving disk space/CPU) + assert "System boot sequence normal" not in content + + # B) Ensure the masked data made it to the file + assert "VISA-MASKED-1111" in content + assert "AKIA-XXXX-MPLE" in content + assert "XXX-XX-9999" in content + + # C) ZERO-TRUST GUARANTEE: Ensure the raw PII was completely obliterated + assert "4111111111111111" not in content, "CRITICAL LEAK: Raw VISA card written to disk!" + assert "AKIAIOSFODNN7EXAMPLE" not in content, "CRITICAL LEAK: Raw AWS Key written to disk!" + assert "999-99-9999" not in content, "CRITICAL LEAK: Raw SSN written to disk!" \ No newline at end of file