Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion gitgalaxy/tools/cobol_to_cobol/cobol_compiler_forge.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,19 @@ def generate_build_jcl(source_text: str, prog_name: str, files: set, dialect: st
jcl.append(f"// DCB=(RECFM=U,BLKSIZE=32760,DSORG=PO)")

for f in files:
clean_f = re.sub(r'[^A-Z0-9]', '', f.upper())
clean_f = f.upper().strip()
# 1. Strip IBM prefixes first
clean_f = re.sub(r'^(?:UT|UR)-S-', '', clean_f)
# 2. Strip non-alphanumeric characters
clean_f = re.sub(r'[^A-Z0-9]', '', clean_f)
# 3. Enforce 8-character Mainframe limit
if len(clean_f) > 8: clean_f = clean_f[-8:]

if clean_f:
jcl.append(f"//{clean_f} DD DSN=HERC01.DATA.{clean_f},DISP=(MOD,CATLG,DELETE),")
jcl.append(f"// UNIT=SYSDA,SPACE=(TRK,(10,10),RLSE),")
jcl.append(f"// DCB=(LRECL=80,RECFM=FB,BLKSIZE=800)")

if clean_f:
jcl.append(f"//{clean_f} DD DSN=HERC01.DATA.{clean_f},DISP=(MOD,CATLG,DELETE),")
jcl.append(f"// UNIT=SYSDA,SPACE=(TRK,(10,10),RLSE),")
Expand Down
6 changes: 3 additions & 3 deletions gitgalaxy/tools/cobol_to_cobol/cobol_graveyard_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def resolve_copybooks(content: str, source_path: Path) -> str:
Handles dynamic variable swapping via the REPLACING clause.
"""
# Matches: COPY NAME. or COPY NAME REPLACING ==A== BY ==B==.
copy_pattern = re.compile(r'^[ \t]*COPY\s+[\'"]?([A-Z0-9_\-]+)[\'"]?(?:\s+REPLACING\s+(.+?))?\.?', re.MULTILINE | re.IGNORECASE)
copy_pattern = re.compile(r'^[ \t]*COPY\s+[\'"]?([A-Z0-9_\-]+)[\'"]?(?:\s+REPLACING\s+(.+?))?\.', re.MULTILINE | re.IGNORECASE)

def replacer(match):
copy_name = match.group(1).upper()
Expand All @@ -33,8 +33,8 @@ def replacer(match):
# Extracts pairs, ignoring the optional == delimiters
pairs = re.findall(r'(?:==)?([A-Z0-9_\-]+)(?:==)?\s+BY\s+(?:==)?([A-Z0-9_\-]+)(?:==)?', replacing_clause, re.IGNORECASE)
for old_val, new_val in pairs:
# Use word boundaries (\b) so we don't accidentally replace partial words
cpy_content = re.sub(r'\b' + re.escape(old_val) + r'\b', new_val, cpy_content)
# Use negative lookarounds so we don't accidentally replace partial words with hyphens
cpy_content = re.sub(r'(?<![A-Z0-9_\-])' + re.escape(old_val) + r'(?![A-Z0-9_\-])', new_val, cpy_content)

return f"*> --- START COPY {copy_name} ---\n{cpy_content}\n*> --- END COPY {copy_name} ---"

Expand Down
2 changes: 1 addition & 1 deletion gitgalaxy/tools/cobol_to_cobol/cobol_lexical_patcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def patch_lexical_traps(filepath: Path) -> bool:
return False

# Fast check before engaging heavy regex
if "NEXT SENTENCE" not in content.upper():
if not re.search(r'\bNEXT\s+SENTENCE\b', content, re.IGNORECASE):
return False

# 1. Sense the Environment
Expand Down
9 changes: 5 additions & 4 deletions gitgalaxy/tools/cobol_to_cobol/cobol_microservice_slicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ def slice_business_logic(filepath: Path, initial_var: str, dead_paras: set = Non
comp_match = re.search(r'COMPUTE\s+([A-Z0-9\-]+)\s*=', clean_line)
if comp_match:
var1 = comp_match.group(1)
if var1 in tainted_vars:
# Taint every variable inside the math equation
vars_in_eq = re.findall(r'([A-Z][A-Z0-9\-]+)', clean_line.split('=')[1])
vars_in_eq = re.findall(r'([A-Z][A-Z0-9\-]+)', clean_line.split('=')[1])
# Taint forwards and backwards!
if var1 in tainted_vars or any(v in tainted_vars for v in vars_in_eq):
tainted_vars.add(var1)
tainted_vars.update(vars_in_eq)

# ==========================================================================
# PASS 2: Extraction
# ==========================================================================
Expand Down
3 changes: 2 additions & 1 deletion gitgalaxy/tools/cobol_to_cobol/cobol_schema_forge.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def forge_schemas(filepath: Path, ignore_vars: set = None, corporate_header: str
r'^[ \t]*(?P<level>0[1-9]|[1-4][0-9]|77)[ \t]+'
r'(?P<name>[A-Z0-9\-]+)'
r'(?:[ \t]+PIC(?:TURE)?[ \t]+(?P<pic>[A-Z0-9\(\)V\.\-]+))?'
r'(?:[ \t]+(?:IS[ \t]+)?(?P<usage>COMP(?:-[1-5])?|BINARY|PACKED-DECIMAL))?',
r'(?:[ \t]+(?:IS[ \t]+)?(?P<usage>COMP(?:-[1-5])?|BINARY|PACKED-DECIMAL))?'
r'.*$',
re.MULTILINE
)

Expand Down
117 changes: 117 additions & 0 deletions tests/test_cobol_agent_task_forge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import pytest

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'pytest' is not used.
import json
from pathlib import Path

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'Path' is not used.

# IMPORTANT: Adjust this path to match exactly where your file is located
import gitgalaxy.tools.cobol_to_cobol.cobol_agent_task_forge as forge_module

# ==============================================================================
# TEST 1: The Context Merger (Ticket Generation)
# ==============================================================================
def test_generate_agent_ticket_merging(tmp_path):
"""
Proves the engine accurately builds the JSON ticket, strips the filename
prefix from the anomaly strings, and seamlessly merges the IR lineage.
"""
mock_file = tmp_path / "PGM1.cbl"
mock_anomalies = [
"[PGM1.cbl : Line 0010] CRITICAL LIMIT - ALTER detected",
"[PGM1.cbl : Line 0020] HIGH LIMIT - COPY REPLACING detected"
]

mock_ir = {
"analysis": {
"lineage": {
"inputs": ["FILE-IN"],
"outputs": ["FILE-OUT"],
"unresolved_calls": ["SUBPROG"]
}
}
}

ticket = forge_module.generate_agent_ticket("PGM1.cbl", mock_file, mock_anomalies, mock_ir)

# 1. Base Ticket Structure
assert ticket["job_id"] == "PGM1_REMEDIATION"
assert ticket["task_type"] == "STRUCTURAL_ANOMALY_RESOLUTION"
assert ticket["target_file"] == str(mock_file.resolve())

# 2. Anomaly Stripping
assert "CRITICAL LIMIT - ALTER detected" in ticket["context"]["detected_anomalies"]
assert "[PGM1.cbl" not in ticket["context"]["detected_anomalies"][0], "Failed to strip the prefix!"

# 3. IR Lineage Merging
assert ticket["context"]["inputs_required"] == ["FILE-IN"]
assert ticket["context"]["outputs_produced"] == ["FILE-OUT"]
assert ticket["context"]["external_calls"] == ["SUBPROG"]

# ==============================================================================
# TEST 2: The E2E Job Dispatcher (Grouping & File I/O)
# ==============================================================================
def test_forge_agent_jobs_e2e(tmp_path):
"""
Proves the engine correctly groups multiple flags by file, matches them to
physical source files, and writes the JSON job tickets to the designated folder.
"""
clean_room = tmp_path / "clean_room"
source_dir = tmp_path / "legacy_src"
source_dir.mkdir()

# Create the mock source file so the engine finds it
(source_dir / "PGM2.cbl").write_text("IDENTIFICATION DIVISION.", encoding="utf-8")

mock_flags = [
"[PGM2.cbl] ERROR 1",
"[PGM2.cbl] ERROR 2"
]

jobs_created = forge_module.forge_agent_jobs(clean_room, source_dir, mock_flags)

assert jobs_created == 1, "Failed to group 2 flags into 1 job ticket!"

# Verify the output directory and file
job_dir = clean_room / "06_ai_agent_jobs"
job_file = job_dir / "PGM2_agent_job.json"

assert job_dir.exists(), "Failed to create the 06_ai_agent_jobs directory!"
assert job_file.exists(), "Failed to write the physical JSON ticket!"

# Verify the written payload
payload = json.loads(job_file.read_text(encoding="utf-8"))
assert payload["job_id"] == "PGM2_REMEDIATION"
assert len(payload["context"]["detected_anomalies"]) == 2

# ==============================================================================
# TEST 3: Graceful Degradation (Missing IR & Missing Source)
# ==============================================================================
def test_forge_agent_jobs_graceful_degradation(tmp_path):
"""
Proves that missing IR state files don't crash the generation (fallback to
empty arrays) and missing physical source files safely abort ticket creation.
"""
clean_room = tmp_path / "clean_room"
source_dir = tmp_path / "legacy_src"
source_dir.mkdir()

# PGM3 exists, but has NO matching IR file in 04_ir_state_dumps
(source_dir / "PGM3.cbl").write_text("IDENTIFICATION DIVISION.", encoding="utf-8")

# PGM4 does NOT exist in the source directory

mock_flags = [
"[PGM3.cbl] ERROR 1",
"[PGM4.cbl] ERROR 2"
]

jobs_created = forge_module.forge_agent_jobs(clean_room, source_dir, mock_flags)

# Only PGM3 should generate a ticket. PGM4 must be skipped.
assert jobs_created == 1, "Failed to skip the missing source file!"

job_file = clean_room / "06_ai_agent_jobs" / "PGM3_agent_job.json"
assert job_file.exists()

# Ensure it gracefully degraded the missing IR context to empty arrays
payload = json.loads(job_file.read_text(encoding="utf-8"))
assert payload["context"]["inputs_required"] == [], "Graceful fallback for missing IR inputs failed!"
assert payload["context"]["outputs_produced"] == [], "Graceful fallback for missing IR outputs failed!"
104 changes: 104 additions & 0 deletions tests/test_cobol_compiler_forge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import pytest

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'pytest' is not used.
import sys
from pathlib import Path

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'Path' is not used.
from unittest.mock import patch

# IMPORTANT: Adjust this path to match exactly where your file is located
import gitgalaxy.tools.cobol_to_cobol.cobol_compiler_forge as forge_module

# ==============================================================================
# TEST 1: The Dialect Compiler Router
# ==============================================================================
def test_dialect_router_jcl_generation():
"""
Proves the JCL generation engine dynamically routes to the correct
Mainframe compiler (COBUCL vs IGYWCL) based on the detected dialect.
"""
mock_source = "PROGRAM-ID. HELLO."

# 1. COBOL-74 Path (Legacy)
jcl_74 = forge_module.generate_build_jcl(mock_source, "PGM1", set(), "COBOL-74")
assert "EXEC COBUCL" in jcl_74, "Failed to route COBOL-74 to the legacy compiler!"
assert "EXEC IGYWCL" not in jcl_74

# 2. COBOL-85 Path (Modern)
jcl_85 = forge_module.generate_build_jcl(mock_source, "PGM2", set(), "COBOL-85")
assert "EXEC IGYWCL" in jcl_85, "Failed to route COBOL-85 to the modern enterprise compiler!"
assert "EXEC COBUCL" not in jcl_85

# ==============================================================================
# TEST 2: The Infinite Loop Failsafe
# ==============================================================================
def test_flatten_copybooks_cyclic_failsafe(tmp_path):
"""
Proves the engine mathematically breaks infinite copybook recursion loops
(e.g., A imports B, B imports A) without causing a StackOverflow crash.
"""
repo_dir = tmp_path / "cyclic_repo"
repo_dir.mkdir()

# Create two copybooks that import each other
(repo_dir / "CYCLE-A.cpy").write_text("COPY CYCLE-B.", encoding="utf-8")
(repo_dir / "CYCLE-B.cpy").write_text("COPY CYCLE-A.", encoding="utf-8")

# A root program that triggers the trap
root_code = "PROGRAM-ID. BOOM.\nCOPY CYCLE-A."

# Run the flattener
inlined_code = forge_module.flatten_copybooks(root_code, repo_dir)

# If the test finishes without crashing with a RecursionError, the failsafe worked.
# We verify it successfully nested multiple times before pulling the emergency brake.
assert inlined_code.count("INLINED COPYBOOK: CYCLE-A") >= 1, "Failed to recurse at all!"
assert "PROGRAM-ID. BOOM." in inlined_code, "Root AST was destroyed by the cycle!"

# ==============================================================================
# TEST 3: The E2E Flattener & JCL Provisioning
# ==============================================================================
def test_compiler_forge_e2e(tmp_path):
"""
Proves the E2E pipeline correctly discovers a COBOL file, inlines its
local copybook, provisions physical datasets via IEFBR14, and saves the JCL.
"""
src_dir = tmp_path / "src"
out_dir = tmp_path / "out"
src_dir.mkdir()

# 1. The Copybook
(src_dir / "MYDATA.cpy").write_text(" 01 MY-VAR PIC X.", encoding="utf-8")

# 2. The Main Program (Requires modern COBOL-85 compiler due to END-IF)
(src_dir / "MAINPGM.cbl").write_text(
" PROGRAM-ID. MAINPGM.\n"
" SELECT FILE-IN ASSIGN TO UT-S-INPUT01.\n"
" DATA DIVISION.\n"
" COPY MYDATA.\n"
" PROCEDURE DIVISION.\n"
" IF 1 = 1 CONTINUE END-IF.", # END-IF triggers COBOL-85
encoding="utf-8"
)

# 3. Execute the Forge
test_args = ["cobol_compiler_forge.py", str(src_dir), str(out_dir)]
with patch.object(sys, 'argv', test_args):
try:
forge_module.main()
except SystemExit as e:
assert e.code == 0

# 4. Verify Output Structure
jcl_file = out_dir / "BUILD_MAINPGM.jcl"
assert jcl_file.exists(), "Forge failed to create the target JCL file!"

jcl_content = jcl_file.read_text(encoding="utf-8")

# A) Verify Infrastructure Provisioning
assert "EXEC PGM=IEFBR14" in jcl_content
assert "//INPUT01 DD DSN=HERC01.DATA.INPUT01" in jcl_content, "Failed to map SELECT ASSIGN to DSN!"

# B) Verify Dialect Routing
assert "EXEC IGYWCL" in jcl_content, "Failed to dynamically route to modern compiler!"

# C) Verify Copybook Inlining
assert "INLINED COPYBOOK: MYDATA" in jcl_content
assert "01 MY-VAR PIC X." in jcl_content, "Failed to inline the actual copybook data!"
Loading
Loading