diff --git a/src/agentready/assessors/documentation.py b/src/agentready/assessors/documentation.py index 4c4125c8..218a661b 100644 --- a/src/agentready/assessors/documentation.py +++ b/src/agentready/assessors/documentation.py @@ -644,7 +644,7 @@ def _check_template_compliance(self, sample_files: list) -> int: required_sections = ["status", "context", "decision", "consequences"] total_points = 0 - max_points_per_file = 20 // len(sample_files) + max_points_per_file = 20 / len(sample_files) for adr_file in sample_files: try: @@ -662,7 +662,7 @@ def _check_template_compliance(self, sample_files: list) -> int: except OSError: continue # Skip unreadable files - return int(total_points) + return round(total_points) def _create_remediation(self) -> Remediation: """Create remediation guidance for missing/inadequate ADRs.""" diff --git a/src/agentready/models/assessment.py b/src/agentready/models/assessment.py index 228acc39..02458ed0 100644 --- a/src/agentready/models/assessment.py +++ b/src/agentready/models/assessment.py @@ -78,6 +78,40 @@ def __post_init__(self): f"attributes total ({self.attributes_total})" ) + @classmethod + def from_dict(cls, data: dict) -> "Assessment": + """Create Assessment from dictionary. + + Handles the key mapping where to_dict() serializes + 'attributes_not_assessed' as 'attributes_skipped'. + """ + from datetime import datetime + + metadata_data = data.get("metadata") + config_data = data.get("config") + + return cls( + repository=Repository.from_dict(data["repository"]), + timestamp=datetime.fromisoformat(data["timestamp"]), + overall_score=data["overall_score"], + certification_level=data["certification_level"], + attributes_assessed=data["attributes_assessed"], + attributes_not_assessed=data.get( + "attributes_skipped", data.get("attributes_not_assessed", 0) + ), + attributes_total=data["attributes_total"], + findings=[Finding.from_dict(f) for f in data.get("findings", [])], + config=Config.model_validate(config_data) if config_data else None, + duration_seconds=data.get("duration_seconds", 0.0), + discovered_skills=[ + DiscoveredSkill.from_dict(s) for s in data.get("discovered_skills", []) + ], + metadata=( + AssessmentMetadata.from_dict(metadata_data) if metadata_data else None + ), + schema_version=data.get("schema_version", cls.CURRENT_SCHEMA_VERSION), + ) + def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { diff --git a/src/agentready/models/attribute.py b/src/agentready/models/attribute.py index a407146f..7a66c113 100644 --- a/src/agentready/models/attribute.py +++ b/src/agentready/models/attribute.py @@ -38,6 +38,19 @@ def __post_init__(self): f"Default weight must be in range [0.0, 1.0]: {self.default_weight}" ) + @classmethod + def from_dict(cls, data: dict) -> "Attribute": + """Create Attribute from dictionary.""" + return cls( + id=data["id"], + name=data["name"], + category=data["category"], + tier=data["tier"], + description=data["description"], + criteria=data["criteria"], + default_weight=data["default_weight"], + ) + def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { diff --git a/src/agentready/models/citation.py b/src/agentready/models/citation.py index be6f5f18..b49bfce7 100644 --- a/src/agentready/models/citation.py +++ b/src/agentready/models/citation.py @@ -30,6 +30,16 @@ def __post_init__(self): if not self.relevance: raise ValueError("Citation relevance must be non-empty") + @classmethod + def from_dict(cls, data: dict) -> "Citation": + """Create Citation from dictionary.""" + return cls( + source=data["source"], + title=data["title"], + url=data.get("url"), + relevance=data["relevance"], + ) + def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { diff --git a/src/agentready/models/discovered_skill.py b/src/agentready/models/discovered_skill.py index a7e3bc33..35add2f2 100644 --- a/src/agentready/models/discovered_skill.py +++ b/src/agentready/models/discovered_skill.py @@ -76,6 +76,22 @@ def __post_init__(self): if not self.pattern_summary: raise ValueError("Pattern summary must be non-empty") + @classmethod + def from_dict(cls, data: dict) -> "DiscoveredSkill": + """Create DiscoveredSkill from dictionary.""" + return cls( + skill_id=data["skill_id"], + name=data["name"], + description=data["description"], + confidence=data["confidence"], + source_attribute_id=data["source_attribute_id"], + reusability_score=data["reusability_score"], + impact_score=data["impact_score"], + pattern_summary=data["pattern_summary"], + code_examples=data.get("code_examples", []), + citations=[Citation.from_dict(c) for c in data.get("citations", [])], + ) + def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { diff --git a/src/agentready/models/finding.py b/src/agentready/models/finding.py index 25232df1..82e08273 100644 --- a/src/agentready/models/finding.py +++ b/src/agentready/models/finding.py @@ -34,6 +34,21 @@ def __post_init__(self): if not self.steps: raise ValueError("Remediation must have at least one step") + @classmethod + def from_dict(cls, data: dict) -> "Remediation": + """Create Remediation from dictionary.""" + steps = data.get("steps") + if not steps: + steps = [data["summary"]] + return cls( + summary=data["summary"], + steps=steps, + tools=data.get("tools", []), + commands=data.get("commands", []), + examples=data.get("examples", []), + citations=[Citation.from_dict(c) for c in data.get("citations", [])], + ) + def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { @@ -88,6 +103,23 @@ def __post_init__(self): if self.status == "error" and not self.error_message: raise ValueError("Error message required for status error") + @classmethod + def from_dict(cls, data: dict) -> "Finding": + """Create Finding from dictionary.""" + remediation_data = data.get("remediation") + return cls( + attribute=Attribute.from_dict(data["attribute"]), + status=data["status"], + score=data.get("score"), + measured_value=data.get("measured_value"), + threshold=data.get("threshold"), + evidence=data.get("evidence", []), + remediation=( + Remediation.from_dict(remediation_data) if remediation_data else None + ), + error_message=data.get("error_message"), + ) + def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { diff --git a/src/agentready/models/metadata.py b/src/agentready/models/metadata.py index fd34dc27..bb76e22f 100644 --- a/src/agentready/models/metadata.py +++ b/src/agentready/models/metadata.py @@ -32,6 +32,19 @@ class AssessmentMetadata: command: str working_directory: str + @classmethod + def from_dict(cls, data: dict) -> "AssessmentMetadata": + """Create AssessmentMetadata from dictionary.""" + return cls( + agentready_version=data["agentready_version"], + research_version=data["research_version"], + assessment_timestamp=data["assessment_timestamp"], + assessment_timestamp_human=data["assessment_timestamp_human"], + executed_by=data["executed_by"], + command=data["command"], + working_directory=data["working_directory"], + ) + def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { diff --git a/src/agentready/models/repository.py b/src/agentready/models/repository.py index 35f2c185..02436814 100644 --- a/src/agentready/models/repository.py +++ b/src/agentready/models/repository.py @@ -74,6 +74,25 @@ def get_short_commit_hash(self) -> str: """ return shorten_commit_hash(self.commit_hash) + @classmethod + def from_dict(cls, data: dict) -> "Repository": + """Create Repository from dictionary. + + Skips filesystem validation so cached assessments remain readable + even if the original repository path no longer exists on disk. + """ + repo = object.__new__(cls) + repo.path = Path(data["path"]) + repo.name = data["name"] + repo.url = data.get("url") + repo.branch = data["branch"] + repo.commit_hash = data["commit_hash"] + repo.languages = data.get("languages", {}) + repo.total_files = data.get("total_files", 0) + repo.total_lines = data.get("total_lines", 0) + repo.config = None + return repo + @property def primary_language(self) -> str: """Get the primary programming language (most files). diff --git a/src/agentready/services/assessment_cache.py b/src/agentready/services/assessment_cache.py index d820e9e6..f565ce21 100644 --- a/src/agentready/services/assessment_cache.py +++ b/src/agentready/services/assessment_cache.py @@ -6,6 +6,8 @@ from pathlib import Path from typing import Optional +from pydantic import ValidationError + from ..models import Assessment @@ -108,7 +110,14 @@ def get(self, repository_url: str, commit_hash: str) -> Optional[Assessment]: assessment_data = json.loads(assessment_json) return self._deserialize_assessment(assessment_data) - except (sqlite3.Error, json.JSONDecodeError, ValueError): + except ( + sqlite3.Error, + json.JSONDecodeError, + ValueError, + KeyError, + TypeError, + ValidationError, + ): return None def set( @@ -250,15 +259,4 @@ def _deserialize_assessment(data: dict) -> Assessment: Returns: Assessment object """ - # This is a simplified deserialization - # In practice, you'd need full deserialization logic - # For now, we'll use a placeholder that assumes the cached JSON - # has the correct structure - - # Note: This is a simplified approach. In production, you'd need - # proper deserialization that reconstructs all nested objects - raise NotImplementedError( - "Full assessment deserialization not yet implemented. " - "Consider caching assessment JSON directly and providing " - "a proper deserializer factory." - ) + return Assessment.from_dict(data) diff --git a/tests/unit/test_cache_deserialization.py b/tests/unit/test_cache_deserialization.py new file mode 100644 index 00000000..245ebbc1 --- /dev/null +++ b/tests/unit/test_cache_deserialization.py @@ -0,0 +1,364 @@ +"""Regression tests for assessment cache deserialization. + +Tests that AssessmentCache can round-trip Assessment objects through +SQLite storage, verifying the fix for the NotImplementedError in +_deserialize_assessment(). +""" + +import json +import sqlite3 +from datetime import datetime, timedelta +from pathlib import Path +from tempfile import TemporaryDirectory + +import pytest + +from agentready.models.assessment import Assessment +from agentready.models.attribute import Attribute +from agentready.models.citation import Citation +from agentready.models.discovered_skill import DiscoveredSkill +from agentready.models.finding import Finding, Remediation +from agentready.models.metadata import AssessmentMetadata +from agentready.models.repository import Repository +from agentready.services.assessment_cache import AssessmentCache + + +@pytest.fixture +def git_repo(tmp_path): + """Create a minimal git repo directory for Repository validation.""" + (tmp_path / ".git").mkdir() + return tmp_path + + +@pytest.fixture +def sample_attribute(): + return Attribute( + id="test_attr", + name="Test Attribute", + category="Testing", + tier=1, + description="A test attribute", + criteria="Must pass", + default_weight=0.04, + ) + + +@pytest.fixture +def sample_citation(): + return Citation( + source="Test Source", + title="Test Title", + url="https://example.com", + relevance="Relevant for testing", + ) + + +@pytest.fixture +def sample_remediation(sample_citation): + return Remediation( + summary="Fix the issue", + steps=["Step 1", "Step 2"], + tools=["tool1"], + commands=["cmd1"], + examples=["example1"], + citations=[sample_citation], + ) + + +@pytest.fixture +def sample_finding(sample_attribute, sample_remediation): + return Finding( + attribute=sample_attribute, + status="pass", + score=85.0, + measured_value="85%", + threshold=">80%", + evidence=["evidence1"], + remediation=sample_remediation, + error_message=None, + ) + + +@pytest.fixture +def sample_assessment(git_repo, sample_finding): + repo = Repository( + path=git_repo, + name="test-repo", + url="https://github.com/test/repo", + branch="main", + commit_hash="abc123def456", + languages={"Python": 42}, + total_files=100, + total_lines=5000, + ) + return Assessment( + repository=repo, + timestamp=datetime(2026, 3, 26, 12, 0, 0), + overall_score=85.0, + certification_level="Gold", + attributes_assessed=1, + attributes_not_assessed=0, + attributes_total=1, + findings=[sample_finding], + config=None, + duration_seconds=1.5, + ) + + +class TestModelFromDict: + """Test from_dict() class methods on all models.""" + + def test_citation_round_trip(self, sample_citation): + result = Citation.from_dict(sample_citation.to_dict()) + assert result.source == sample_citation.source + assert result.title == sample_citation.title + assert result.url == sample_citation.url + assert result.relevance == sample_citation.relevance + + def test_citation_without_url(self): + c = Citation(source="S", title="T", url=None, relevance="R") + result = Citation.from_dict(c.to_dict()) + assert result.url is None + + def test_attribute_round_trip(self, sample_attribute): + result = Attribute.from_dict(sample_attribute.to_dict()) + assert result.id == sample_attribute.id + assert result.tier == sample_attribute.tier + assert result.default_weight == sample_attribute.default_weight + + def test_remediation_round_trip(self, sample_remediation): + result = Remediation.from_dict(sample_remediation.to_dict()) + assert result.summary == sample_remediation.summary + assert len(result.steps) == 2 + assert len(result.citations) == 1 + assert result.citations[0].source == "Test Source" + + def test_finding_round_trip(self, sample_finding): + result = Finding.from_dict(sample_finding.to_dict()) + assert result.status == "pass" + assert result.score == 85.0 + assert result.attribute.id == "test_attr" + assert result.remediation is not None + assert result.remediation.summary == "Fix the issue" + + def test_finding_without_remediation(self, sample_attribute): + f = Finding( + attribute=sample_attribute, + status="pass", + score=90.0, + measured_value="90%", + threshold=">80%", + evidence=[], + remediation=None, + error_message=None, + ) + result = Finding.from_dict(f.to_dict()) + assert result.remediation is None + + def test_finding_error_status(self, sample_attribute): + f = Finding.error(sample_attribute, "Something went wrong") + result = Finding.from_dict(f.to_dict()) + assert result.status == "error" + assert result.error_message == "Something went wrong" + + def test_metadata_round_trip(self): + m = AssessmentMetadata( + agentready_version="2.31.1", + research_version="1.0", + assessment_timestamp="2026-03-26T12:00:00", + assessment_timestamp_human="March 26, 2026 at 12:00 PM", + executed_by="test@host", + command="agentready assess .", + working_directory="/tmp/test", + ) + result = AssessmentMetadata.from_dict(m.to_dict()) + assert result.agentready_version == "2.31.1" + assert result.executed_by == "test@host" + + def test_discovered_skill_round_trip(self, sample_citation): + s = DiscoveredSkill( + skill_id="test-skill", + name="Test Skill", + description="A test skill", + confidence=90.0, + source_attribute_id="test_attr", + reusability_score=80.0, + impact_score=70.0, + pattern_summary="A pattern", + code_examples=["example"], + citations=[sample_citation], + ) + result = DiscoveredSkill.from_dict(s.to_dict()) + assert result.skill_id == "test-skill" + assert result.confidence == 90.0 + assert len(result.citations) == 1 + + def test_repository_round_trip(self, git_repo): + r = Repository( + path=git_repo, + name="test-repo", + url="https://github.com/test/repo", + branch="main", + commit_hash="abc123", + languages={"Python": 10}, + total_files=15, + total_lines=500, + ) + result = Repository.from_dict(r.to_dict()) + assert result.name == "test-repo" + assert result.branch == "main" + assert result.languages == {"Python": 10} + + def test_repository_from_dict_skips_filesystem_validation(self): + """from_dict should not validate the path exists on disk, + so cached assessments remain readable after the repo is moved.""" + data = { + "path": "/nonexistent/repo/path", + "name": "gone-repo", + "url": "https://github.com/test/gone", + "branch": "main", + "commit_hash": "abc123", + "languages": {"Python": 5}, + "total_files": 10, + "total_lines": 200, + } + result = Repository.from_dict(data) + assert result.name == "gone-repo" + assert str(result.path) == "/nonexistent/repo/path" + + def test_remediation_from_dict_missing_steps_uses_summary(self): + """If steps is missing or empty, from_dict should fall back to + using the summary as a single step to satisfy the invariant.""" + data = { + "summary": "Fix the issue", + "tools": [], + "commands": [], + "examples": [], + "citations": [], + } + result = Remediation.from_dict(data) + assert result.steps == ["Fix the issue"] + + def test_assessment_round_trip(self, sample_assessment): + data = sample_assessment.to_dict() + result = Assessment.from_dict(data) + assert result.overall_score == 85.0 + assert result.certification_level == "Gold" + assert result.attributes_assessed == 1 + assert result.attributes_not_assessed == 0 + assert len(result.findings) == 1 + assert result.findings[0].score == 85.0 + + def test_assessment_attributes_skipped_key_mapping(self, sample_assessment): + """Verify that 'attributes_skipped' in serialized JSON maps to + 'attributes_not_assessed' in the model (the key mapping bug).""" + data = sample_assessment.to_dict() + # to_dict() serializes as 'attributes_skipped' + assert "attributes_skipped" in data + assert "attributes_not_assessed" not in data + # from_dict() should correctly map it back + result = Assessment.from_dict(data) + assert result.attributes_not_assessed == 0 + + +class TestCacheRoundTrip: + """Test full cache set/get round-trip — the core regression test.""" + + def test_cache_set_and_get(self, sample_assessment): + """Regression test: cache.get() must return an Assessment, not raise + NotImplementedError or return None.""" + with TemporaryDirectory() as tmpdir: + cache = AssessmentCache(Path(tmpdir)) + url = "https://github.com/test/repo" + commit = "abc123" + + assert cache.set(url, commit, sample_assessment) + result = cache.get(url, commit) + + assert result is not None + assert isinstance(result, Assessment) + assert result.overall_score == sample_assessment.overall_score + assert result.certification_level == sample_assessment.certification_level + assert len(result.findings) == len(sample_assessment.findings) + + def test_cache_miss_returns_none(self): + """Cache miss should return None, not raise.""" + with TemporaryDirectory() as tmpdir: + cache = AssessmentCache(Path(tmpdir)) + result = cache.get("https://github.com/test/repo", "nonexistent") + assert result is None + + def test_cache_preserves_finding_details(self, sample_assessment): + """Ensure nested model data survives the cache round-trip.""" + with TemporaryDirectory() as tmpdir: + cache = AssessmentCache(Path(tmpdir)) + url = "https://github.com/test/repo" + commit = "abc123" + + cache.set(url, commit, sample_assessment) + result = cache.get(url, commit) + + finding = result.findings[0] + assert finding.attribute.id == "test_attr" + assert finding.attribute.tier == 1 + assert finding.remediation is not None + assert finding.remediation.summary == "Fix the issue" + assert len(finding.remediation.citations) == 1 + + def test_cache_invalidate_then_miss(self, sample_assessment): + """After invalidation, get should return None.""" + with TemporaryDirectory() as tmpdir: + cache = AssessmentCache(Path(tmpdir)) + url = "https://github.com/test/repo" + commit = "abc123" + + cache.set(url, commit, sample_assessment) + cache.invalidate(url, commit) + assert cache.get(url, commit) is None + + def test_cache_expired_entry_returns_none(self, sample_assessment): + """Expired entries should be deleted and return None.""" + with TemporaryDirectory() as tmpdir: + cache = AssessmentCache(Path(tmpdir), ttl_days=0) + + url = "https://github.com/test/repo" + commit = "abc123" + + # Manually insert with an already-expired timestamp + assessment_json = json.dumps(sample_assessment.to_dict()) + expired = (datetime.now() - timedelta(hours=1)).isoformat() + + with sqlite3.connect(cache.db_path) as conn: + conn.execute( + """INSERT INTO assessments + (repository_url, commit_hash, overall_score, assessment_json, expires_at) + VALUES (?, ?, ?, ?, ?)""", + (url, commit, 85.0, assessment_json, expired), + ) + conn.commit() + + result = cache.get(url, commit) + assert result is None + + def test_cache_malformed_json_returns_none(self): + """Malformed cache entries should return None, not crash.""" + with TemporaryDirectory() as tmpdir: + cache = AssessmentCache(Path(tmpdir)) + url = "https://github.com/test/repo" + commit = "abc123" + + # Insert malformed JSON missing required keys + malformed = json.dumps({"overall_score": 50.0}) + expires = (datetime.now() + timedelta(days=7)).isoformat() + + with sqlite3.connect(cache.db_path) as conn: + conn.execute( + """INSERT INTO assessments + (repository_url, commit_hash, overall_score, assessment_json, expires_at) + VALUES (?, ?, ?, ?, ?)""", + (url, commit, 50.0, malformed, expires), + ) + conn.commit() + + result = cache.get(url, commit) + assert result is None diff --git a/tests/unit/test_doc_scoring_precision.py b/tests/unit/test_doc_scoring_precision.py new file mode 100644 index 00000000..0276ea99 --- /dev/null +++ b/tests/unit/test_doc_scoring_precision.py @@ -0,0 +1,67 @@ +"""Regression test for integer division bug in documentation scoring. + +Tests that _check_template_compliance() uses float division so that +repositories with any number of ADR files can achieve the maximum +score of 20/20 when all required sections are present. +""" + +import pytest + +from agentready.assessors.documentation import ArchitectureDecisionsAssessor + + +@pytest.fixture +def assessor(): + """Create an ArchitectureDecisionsAssessor instance.""" + return ArchitectureDecisionsAssessor() + + +def _make_adr_files(tmp_path, count): + """Create ADR files that contain all required sections.""" + files = [] + for i in range(count): + f = tmp_path / f"adr-{i:03d}.md" + f.write_text( + "# ADR\n\n## Status\nAccepted\n\n## Context\n" + "Background info\n\n## Decision\nWe decided X\n\n" + "## Consequences\nResult of decision\n" + ) + files.append(f) + return files + + +class TestCheckTemplateCompliancePrecision: + """Regression tests for the integer division bug.""" + + @pytest.mark.parametrize("file_count", [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + def test_perfect_adrs_score_20(self, assessor, tmp_path, file_count): + """With all required sections present in every file, the score + must be 20/20 regardless of file count. + + Previously, integer division (20 // n) caused precision loss + for non-divisor counts (e.g., 7 files -> max 14/20). + """ + files = _make_adr_files(tmp_path, file_count) + score = assessor._check_template_compliance(files) + assert ( + score == 20 + ), f"Expected 20/20 for {file_count} perfect ADR files, got {score}/20" + + def test_empty_files_score_zero(self, assessor): + """Empty file list should return 0.""" + assert assessor._check_template_compliance([]) == 0 + + def test_partial_sections_score_proportionally(self, assessor, tmp_path): + """Files with half the required sections should score ~half.""" + f = tmp_path / "adr-001.md" + # Only "status" and "context" present (2 of 4 required sections) + f.write_text("# ADR\n\n## Status\nAccepted\n\n## Context\nInfo\n") + score = assessor._check_template_compliance([f]) + # 2/4 sections * 20 points = 10 + assert score == 10 + + def test_seven_files_no_longer_capped_at_14(self, assessor, tmp_path): + """Specific regression: 7 files was previously capped at 14/20.""" + files = _make_adr_files(tmp_path, 7) + score = assessor._check_template_compliance(files) + assert score == 20, f"7-file regression: expected 20, got {score}"