Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions concore_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,22 @@ def build(workflow_file, source, output, type, auto_build, compose):
@cli.command()
@click.argument("workflow_file", type=click.Path(exists=True))
@click.option("--source", "-s", default="src", help="Source directory")
def validate(workflow_file, source):
@click.option(
"--format",
"output_format",
default="text",
type=click.Choice(["text", "json"]),
help="Validation output format",
)
def validate(workflow_file, source, output_format):
"""Validate a workflow file"""
try:
ok = validate_workflow(workflow_file, source, console)
ok = validate_workflow(
workflow_file,
source,
console,
output_format=output_format,
)
if not ok:
sys.exit(1)
except Exception as e:
Expand Down
185 changes: 178 additions & 7 deletions concore_cli/commands/validate.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,167 @@
import json
from pathlib import Path
from bs4 import BeautifulSoup
from rich.panel import Panel
import re
import xml.etree.ElementTree as ET


def validate_workflow(workflow_file, source_dir, console):
def _classify_message(message, bucket_name):
if bucket_name == "info":
if message.startswith("Found ") and "node(s)" in message:
return {"info_type": "node_count"}
if message.startswith("Found ") and "edge(s)" in message:
return {"info_type": "edge_count"}
if message.startswith("ZMQ-based edges:"):
return {"info_type": "zmq_edges"}
if message.startswith("File-based edges:"):
return {"info_type": "file_edges"}
return {"info_type": "info"}

if message == "File is empty":
return {"error_type": "empty_file"}
if message.startswith("Invalid XML:"):
return {"error_type": "invalid_xml"}
if message == "Not a valid GraphML file - missing <graphml> root element":
return {"error_type": "invalid_graphml"}
if message == "Missing <graph> element":
return {"error_type": "missing_graph_element"}
if message == "Graph missing required 'edgedefault' attribute":
return {"error_type": "missing_edgedefault"}
if message.startswith("Invalid edgedefault value"):
return {"error_type": "invalid_edgedefault"}
if message == "No nodes found in workflow":
return {"error_type": "no_nodes"}
if message == "No edges found in workflow":
return {"error_type": "no_edges"}
if message.startswith("Source directory not found:"):
return {"error_type": "missing_source_dir"}
if message == "Node missing required 'id' attribute":
return {"error_type": "missing_node_id"}
if message.startswith("Node '") and message.endswith(
"contains unsafe shell characters"
):
return {"error_type": "unsafe_node_label"}
if message.startswith("Node '") and "missing format 'ID:filename'" in message:
return {"error_type": "invalid_node_label_format"}
if message.startswith("Node '") and message.endswith("has invalid format"):
return {"error_type": "invalid_node_label_format"}
if message.startswith("Node '") and message.endswith("has no filename"):
return {"error_type": "missing_node_filename"}
if message.startswith("Node '") and message.endswith("has unusual file extension"):
return {"error_type": "unusual_file_extension"}
if message.startswith("Missing source file:"):
return {"error_type": "missing_source_file"}
if message.startswith("Node ") and message.endswith(" has no label"):
return {"error_type": "missing_node_label"}
if message.startswith("Error parsing node:"):
return {"error_type": "node_parse_error"}
if message.startswith("Duplicate node label:"):
return {"error_type": "duplicate_node_label"}
if message == "Edge missing source or target":
return {"error_type": "missing_edge_endpoint"}
if message.startswith("Edge references non-existent source node:"):
return {"error_type": "missing_edge_source"}
if message.startswith("Edge references non-existent target node:"):
return {"error_type": "missing_edge_target"}
if message == "Workflow contains cycles (expected for control loops)":
return {"error_type": "cycle_detected"}
if message.startswith("Invalid port number:"):
return {"error_type": "invalid_port_number"}
if message.startswith("Port conflict:"):
return {"error_type": "port_conflict"}
if message.startswith("Port ") and "is in reserved range" in message:
return {"error_type": "reserved_port"}
if message.startswith("File not found:"):
return {"error_type": "file_not_found"}
if message.startswith("Validation failed:"):
return {"error_type": "validation_exception"}
return {"error_type": "validation_message"}


def _build_entries(bucket_name, messages, source_nodes):
entries = []
for message in messages:
entry = {"message": message}
entry.update(_classify_message(message, bucket_name))

if message.startswith("Missing source file:"):
filename = message.split(":", 1)[1].strip()
node_id = source_nodes.get(filename)
if node_id:
entry["node_id"] = node_id
elif message.startswith("Node ") and message.endswith(" has no label"):
entry["node_id"] = message[5:-9]
elif message.startswith("Edge references non-existent source node:"):
entry["node_id"] = message.split(":", 1)[1].strip()
elif message.startswith("Edge references non-existent target node:"):
entry["node_id"] = message.split(":", 1)[1].strip()

entries.append(entry)
return entries


def _build_payload(workflow_path, source_root, errors, warnings, info, source_nodes):
error_entries = _build_entries("errors", errors, source_nodes)
warning_entries = _build_entries("warnings", warnings, source_nodes)
info_entries = _build_entries("info", info, source_nodes)

nodes_affected = []
for entry in error_entries + warning_entries:
node_id = entry.get("node_id")
if node_id and node_id not in nodes_affected:
nodes_affected.append(node_id)

return {
"workflow": workflow_path.name,
"source_dir": str(source_root),
"valid": len(errors) == 0,
"errors": error_entries,
"warnings": warning_entries,
"info": info_entries,
"summary": {
"error_count": len(error_entries),
"warning_count": len(warning_entries),
"info_count": len(info_entries),
"nodes_affected": nodes_affected,
},
}


def validate_workflow(workflow_file, source_dir, console, output_format="text"):
workflow_path = Path(workflow_file)
source_root = workflow_path.parent / source_dir

console.print(f"[cyan]Validating:[/cyan] {workflow_path.name}")
console.print()
if output_format == "text":
console.print(f"[cyan]Validating:[/cyan] {workflow_path.name}")
console.print()

errors = []
warnings = []
info = []
source_nodes = {}

def finalize():
show_results(console, errors, warnings, info)
if output_format == "json":
print(
json.dumps(
_build_payload(
workflow_path,
source_root,
errors,
warnings,
info,
source_nodes,
),
indent=2,
)
)
else:
show_results(console, errors, warnings, info)
return len(errors) == 0

try:
with open(workflow_path, "r") as f:
with open(workflow_path, "r", encoding="utf-8") as f:
content = f.read()

if not content.strip():
Expand Down Expand Up @@ -109,6 +249,7 @@ def finalize():
warnings.append(f"Node '{label}' has invalid format")
else:
nodeId_part, filename = parts
source_nodes[filename] = node_id
if not filename:
errors.append(f"Node '{label}' has no filename")
elif not any(
Expand Down Expand Up @@ -177,10 +318,40 @@ def finalize():
return finalize()

except FileNotFoundError:
console.print(f"[red]Error:[/red] File not found: {workflow_path}")
if output_format == "json":
print(
json.dumps(
_build_payload(
workflow_path,
source_root,
[f"File not found: {workflow_path}"],
[],
[],
source_nodes,
),
indent=2,
)
)
else:
console.print(f"[red]Error:[/red] File not found: {workflow_path}")
return False
except Exception as e:
console.print(f"[red]Validation failed:[/red] {str(e)}")
if output_format == "json":
print(
json.dumps(
_build_payload(
workflow_path,
source_root,
[f"Validation failed: {str(e)}"],
[],
[],
source_nodes,
),
indent=2,
)
)
else:
console.print(f"[red]Validation failed:[/red] {str(e)}")
return False


Expand Down
38 changes: 38 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,44 @@ def test_validate_missing_node_file(self):
self.assertNotEqual(result.exit_code, 0)
self.assertIn("Missing source file", result.output)

def test_validate_json_output_for_valid_file(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)

result = self.runner.invoke(
cli,
["validate", "test-project/workflow.graphml", "--format", "json"],
)
self.assertEqual(result.exit_code, 0)

payload = json.loads(result.output)
self.assertTrue(payload["valid"])
self.assertEqual(payload["summary"]["error_count"], 0)
self.assertEqual(payload["workflow"], "workflow.graphml")
self.assertIn("src", payload["source_dir"])

def test_validate_json_output_for_missing_source_file(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)

missing_file = Path("test-project/src/script.py")
if missing_file.exists():
missing_file.unlink()

result = self.runner.invoke(
cli,
["validate", "test-project/workflow.graphml", "--format", "json"],
)
self.assertNotEqual(result.exit_code, 0)

payload = json.loads(result.output)
self.assertFalse(payload["valid"])
self.assertEqual(payload["summary"]["error_count"], 1)
self.assertEqual(payload["errors"][0]["error_type"], "missing_source_file")
self.assertEqual(payload["errors"][0]["node_id"], "n1")

def test_status_command(self):
result = self.runner.invoke(cli, ["status"])
self.assertEqual(result.exit_code, 0)
Expand Down
Loading