diff --git a/src/specify_cli/integrations/manifest.py b/src/specify_cli/integrations/manifest.py index 258c536e5b..7c2b750957 100644 --- a/src/specify_cli/integrations/manifest.py +++ b/src/specify_cli/integrations/manifest.py @@ -115,6 +115,7 @@ def __init__(self, key: str, project_root: Path, version: str = "") -> None: self.project_root = project_root.resolve() self.version = version self._files: dict[str, str] = {} # rel_path → sha256 hex + self._recovered_files: set[str] = set() self._installed_at: str = "" # -- Manifest file location ------------------------------------------- @@ -146,15 +147,60 @@ def record_file(self, rel_path: str | Path, content: bytes | str) -> Path: self._files[normalized] = hashlib.sha256(content).hexdigest() return abs_path - def record_existing(self, rel_path: str | Path) -> None: - """Record the hash of an already-existing file at *rel_path*. - - Raises ``ValueError`` if *rel_path* resolves outside the project root. + def record_existing(self, rel_path: str | Path, *, recovered: bool = False) -> None: + """Record the hash of an already-existing regular file at *rel_path*. + + When ``recovered=True``, the path is also marked in the manifest's + ``recovered_files`` list to signal that the file's on-disk hash was + *observed* during install (because the file already existed and was not + overwritten), not *produced* by the install. Future ``refresh_managed`` + runs should consult ``is_recovered`` before treating the recorded hash + as a managed baseline. + + Raises: + ValueError: if *rel_path* resolves outside the project root, is + a symlink, or is not a regular file. A directory or other + non-file path cannot be silently recorded — its hash would + be meaningless and ``check_modified``/``uninstall`` would + treat the entry as permanently broken. """ rel = Path(rel_path) + # Cheap lexical pre-check first so absolute / parent-traversal paths + # don't trigger a filesystem stat outside the project root before + # ``_validate_rel_path`` raises. ``_validate_rel_path`` produces the + # canonical error messages used elsewhere. + if rel.is_absolute() or ".." in rel.parts: + _validate_rel_path(rel, self.project_root) + # _validate_rel_path raised for any actually-escaping path. If we reach + # here the path normalizes inside root (e.g. ``dir/../file.txt``). + # Reject anyway: manifest keys must be canonical so ``check_modified`` + # and ``uninstall`` cannot key the same file under two paths. + raise ValueError( + f"Manifest paths must be canonical; '..' segments are not " + f"allowed (got {rel})" + ) + # Walk each path component before resolution so a symlinked ancestor + # (e.g. ``linked_dir/file.txt`` where ``linked_dir`` is a symlink) + # cannot be silently followed by ``_validate_rel_path().resolve()`` + # down to a target outside the project root. ``_ensure_safe_manifest_directory`` + # uses the same pattern. + _walk = self.project_root + for part in rel.parts: + _walk = _walk / part + if _walk.is_symlink(): + raise ValueError( + f"Refusing to record symlinked manifest path: {rel} " + f"(symlinked at {_walk.relative_to(self.project_root).as_posix()})" + ) abs_path = _validate_rel_path(rel, self.project_root) + if not abs_path.is_file(): + raise ValueError( + f"Manifest path is not a regular file: {rel}" + ) normalized = abs_path.relative_to(self.project_root).as_posix() self._files[normalized] = _sha256(abs_path) + if recovered: + self._recovered_files.add(normalized) # -- Querying --------------------------------------------------------- @@ -163,6 +209,24 @@ def files(self) -> dict[str, str]: """Return a copy of the ``{rel_path: sha256}`` mapping.""" return dict(self._files) + @property + def recovered_files(self) -> set[str]: + """Return a copy of the set of paths recorded with ``recovered=True``. + + These entries had their hashes observed (not produced) during install + because the file already existed on disk and the install skipped it. + Their on-disk bytes may be user customizations — callers that would + overwrite based on hash equality (e.g. ``refresh_managed``) MUST check + ``is_recovered`` first. + """ + return set(self._recovered_files) + + def is_recovered(self, rel_path: str | Path) -> bool: + """Return True if *rel_path* was recorded via ``record_existing(recovered=True)``.""" + rel = Path(rel_path) + normalized = rel.as_posix() + return normalized in self._recovered_files + def check_modified(self) -> list[str]: """Return relative paths of tracked files whose content changed on disk.""" modified: list[str] = [] @@ -269,6 +333,11 @@ def save(self) -> Path: "version": self.version, "installed_at": self._installed_at, "files": self._files, + **( + {"recovered_files": sorted(self._recovered_files)} + if self._recovered_files + else {} + ), } path = self.manifest_path content = json.dumps(data, indent=2) + "\n" @@ -320,6 +389,16 @@ def load(cls, key: str, project_root: Path) -> IntegrationManifest: inst._installed_at = data.get("installed_at", "") inst._files = files + recovered = data.get("recovered_files", []) + if not isinstance(recovered, list) or not all( + isinstance(p, str) for p in recovered + ): + raise ValueError( + f"Integration manifest 'recovered_files' at {path} must be a " + "list of string paths" + ) + inst._recovered_files = set(recovered) + stored_key = data.get("integration", "") if stored_key and stored_key != key: raise ValueError( diff --git a/src/specify_cli/shared_infra.py b/src/specify_cli/shared_infra.py index 35bf02e644..8071df69a5 100644 --- a/src/specify_cli/shared_infra.py +++ b/src/specify_cli/shared_infra.py @@ -359,6 +359,23 @@ def _ensure_or_bucket_dir(directory: Path) -> bool: preserved_user_files.append(rel) else: skipped_files.append(rel) + # Record the existing-on-disk file in the manifest so a + # fresh manifest run against an already-populated + # ``.specify/`` tree does not silently drop it (#2107). + # ``prior_hashes`` is the function-scope snapshot taken + # at entry, so this membership check is O(1) and avoids + # the repeated ``dict(self._files)`` copy that + # ``manifest.files`` performs on every access. + if dst_path.is_file() and rel not in prior_hashes: + try: + manifest.record_existing(rel, recovered=True) + except (OSError, ValueError) as exc: + # Tolerate races / permission issues / non-file + # collisions so one weird path does not abort + # the whole install. + console.print( + f"[yellow]⚠[/yellow] could not record {rel} in manifest: {exc}" + ) continue if not _ensure_or_bucket_dir(dst_path.parent): @@ -383,6 +400,23 @@ def _ensure_or_bucket_dir(directory: Path) -> bool: preserved_user_files.append(rel) else: skipped_files.append(rel) + # Record the existing-on-disk template in the manifest so a + # fresh manifest run against an already-populated + # ``.specify/`` tree does not silently drop it (#2107). + # ``prior_hashes`` is the function-scope snapshot taken at + # entry, so this membership check is O(1) and avoids the + # repeated ``dict(self._files)`` copy that ``manifest.files`` + # performs on every access. + if dst.is_file() and rel not in prior_hashes: + try: + manifest.record_existing(rel, recovered=True) + except (OSError, ValueError) as exc: + # Tolerate races / permission issues / non-file + # collisions so one weird path does not abort + # the whole install. + console.print( + f"[yellow]⚠[/yellow] could not record {rel} in manifest: {exc}" + ) continue content = src.read_text(encoding="utf-8") @@ -401,7 +435,7 @@ def _ensure_or_bucket_dir(directory: Path) -> bool: if skipped_files: console.print( - f"[yellow]⚠[/yellow] {len(skipped_files)} shared infrastructure file(s) already exist and were not updated:" + f"[yellow]⚠[/yellow] {len(skipped_files)} shared infrastructure path(s) already exist and were not updated:" ) for path in skipped_files: console.print(f" {path}") diff --git a/tests/integrations/test_integration_claude.py b/tests/integrations/test_integration_claude.py index 142db0dd92..b9948a4585 100644 --- a/tests/integrations/test_integration_claude.py +++ b/tests/integrations/test_integration_claude.py @@ -3,6 +3,7 @@ import codecs import json import os +from pathlib import Path from unittest.mock import patch import yaml @@ -556,3 +557,204 @@ def test_post_process_injects_all_claude_flags(self): assert "user-invocable: true" in result assert "disable-model-invocation: false" in result assert "replace dots" in result + + +class TestSpeckitManifestRecordsSkippedFiles: + """Regression test for issue #2107. + + ``install_shared_infra`` must record every shared-infrastructure file + under ``.specify/`` in ``speckit.manifest.json``, including files that + were *skipped* because they already existed on disk and ``force=False``. + + Before the fix, the skip branches in the scripts and templates loops + appended to ``skipped_files`` without calling ``manifest.record_existing``. + So when ``install_shared_infra`` ran with a fresh (or lost) manifest + against an already-populated ``.specify/`` tree, every file went down the + skip path, ``planned_copies`` and ``planned_templates`` stayed empty, and + ``manifest.save()`` wrote an empty ``files`` field — leaving the + integration believing nothing was installed. + + Reproduction (without the fix) using ``install_shared_infra`` directly: + + install_shared_infra(p, "sh", ..., force=False) # 1st run → 10 files + (p / ".specify/integrations/speckit.manifest.json").unlink() + install_shared_infra(p, "sh", ..., force=False) # 2nd run → 0 files + # ^^ BUG: empty + """ + + def _read_manifest_files(self, project_path: Path) -> dict: + manifest_path = ( + project_path / ".specify" / "integrations" / "speckit.manifest.json" + ) + assert manifest_path.exists(), ( + f"speckit.manifest.json not written at {manifest_path}" + ) + data = json.loads(manifest_path.read_text(encoding="utf-8")) + # ``IntegrationManifest.save`` serialises a ``files`` dict — assert + # the schema explicitly so a regression to a different key (e.g. + # the internal ``_files`` attribute name) fails loudly instead of + # being masked by a silent fallback. + assert isinstance(data, dict), ( + f"manifest root is not a dict, got {type(data).__name__}" + ) + assert "files" in data, ( + f"manifest missing 'files' key, got keys: {sorted(data.keys())}" + ) + files = data["files"] + assert isinstance(files, dict), ( + f"manifest 'files' is not a dict, got {type(files).__name__}" + ) + return files + + def test_install_shared_infra_records_skipped_files(self, tmp_path): + """With ``force=False`` and ``.specify/`` already populated, the + manifest must still record every file — the skip branches are not + allowed to drop files from the manifest.""" + from rich.console import Console + from specify_cli.shared_infra import install_shared_infra + + # Resolve the project's own packaged sources by walking up from this + # test file to the repo root (which contains ``scripts/`` and + # ``templates/`` that ``shared_scripts_source`` looks for). + repo_root = Path(__file__).resolve().parents[2] + console = Console(quiet=True) + + # First run — fresh project, manifest gets populated normally. + install_shared_infra( + tmp_path, + "sh", + version="0.0.0", + core_pack=None, + repo_root=repo_root, + console=console, + force=False, + ) + first_files = self._read_manifest_files(tmp_path) + assert first_files, "first install produced an empty manifest" + + # Simulate a lost manifest while ``.specify/`` is still on disk + # (e.g. the manifest was deleted, corrupted, or the layout was + # extracted out-of-band). + manifest_path = ( + tmp_path / ".specify" / "integrations" / "speckit.manifest.json" + ) + manifest_path.unlink() + + # Second run — every file already exists, so every iteration takes + # the skip branch. With the fix, those files are still recorded. + install_shared_infra( + tmp_path, + "sh", + version="0.0.0", + core_pack=None, + repo_root=repo_root, + console=console, + force=False, + ) + second_files = self._read_manifest_files(tmp_path) + assert second_files, ( + "speckit.manifest.json files dict is empty after install with " + "skipped files (issue #2107) — every file went down the skip " + "branch but none were recorded" + ) + + # The recovered manifest must cover everything the first run tracked. + missing = set(first_files) - set(second_files) + assert not missing, ( + f"these files were tracked on the first install but missing after " + f"the skipped-files re-install: {sorted(missing)[:5]}" + ) + + def test_install_shared_infra_handles_directory_at_script_destination( + self, tmp_path + ): + """A non-file (directory) at a script's destination must NOT crash + ``install_shared_infra`` and must NOT be recorded in the manifest — + the path still appears in the user-visible skipped-paths warning. + """ + from io import StringIO + from rich.console import Console + from specify_cli.shared_infra import install_shared_infra + + repo_root = Path(__file__).resolve().parents[2] + output = StringIO() + console = Console(file=output, force_terminal=False, width=200) + + # Pre-create the .specify/scripts/bash tree, then plant a directory + # where a script file is expected so the skip branch hits a + # non-regular-file path. + bash_dir = tmp_path / ".specify" / "scripts" / "bash" + bash_dir.mkdir(parents=True) + (bash_dir / "common.sh").mkdir() # collision: dir where file expected + + # Must not crash. + install_shared_infra( + tmp_path, + "sh", + version="0.0.0", + core_pack=None, + repo_root=repo_root, + console=console, + force=False, + ) + + files = self._read_manifest_files(tmp_path) + assert ".specify/scripts/bash/common.sh" not in files, ( + "directory at script dst must not be recorded in the manifest" + ) + text = output.getvalue() + assert "common.sh" in text, ( + "directory-at-script-dst path must surface in the skipped warning" + ) + + def test_install_shared_infra_handles_directory_at_template_destination( + self, tmp_path + ): + """Symmetric coverage for the templates loop: a directory at a + template's destination must NOT crash install nor be recorded.""" + from io import StringIO + from rich.console import Console + from specify_cli.shared_infra import install_shared_infra + + repo_root = Path(__file__).resolve().parents[2] + output = StringIO() + console = Console(file=output, force_terminal=False, width=200) + + templates_dir = tmp_path / ".specify" / "templates" + templates_dir.mkdir(parents=True) + + src_templates = repo_root / "templates" + real_template = next( + ( + p.name + for p in src_templates.iterdir() + if p.is_file() + and not p.name.startswith(".") + and p.name != "vscode-settings.json" + ), + None, + ) + assert real_template, ( + "no real template found in repo to collide against" + ) + (templates_dir / real_template).mkdir() # collision + + install_shared_infra( + tmp_path, + "sh", + version="0.0.0", + core_pack=None, + repo_root=repo_root, + console=console, + force=False, + ) + + files = self._read_manifest_files(tmp_path) + template_rel = f".specify/templates/{real_template}" + assert template_rel not in files, ( + "directory at template dst must not be recorded in manifest" + ) + text = output.getvalue() + assert real_template in text, ( + "directory-at-template-dst path must surface in the skipped warning" + ) diff --git a/tests/integrations/test_manifest.py b/tests/integrations/test_manifest.py index 596397d4f7..e4d7c5ce77 100644 --- a/tests/integrations/test_manifest.py +++ b/tests/integrations/test_manifest.py @@ -34,6 +34,57 @@ def test_record_existing(self, tmp_path): assert m.files["existing.txt"] == _sha256(f) +class TestManifestRecordExistingErrors: + """Error-case coverage for ``record_existing`` symlink + non-file guards. + + Added in #2483 — Copilot review flagged these as un-tested regressions + after the ``is_symlink``/``is_file`` guards were introduced. + """ + + def test_rejects_symlink_target(self, tmp_path): + target = tmp_path / "target.txt" + target.write_text("target content", encoding="utf-8") + link = tmp_path / "link.txt" + link.symlink_to(target) + m = IntegrationManifest("test", tmp_path) + with pytest.raises(ValueError, match="symlinked"): + m.record_existing("link.txt") + + def test_rejects_dangling_symlink(self, tmp_path): + # A symlink pointing nowhere should still be rejected before the + # ``is_file()`` check (which would itself be False on a dangler). + link = tmp_path / "dangler.txt" + link.symlink_to(tmp_path / "no-such-target.txt") + m = IntegrationManifest("test", tmp_path) + with pytest.raises(ValueError, match="symlinked"): + m.record_existing("dangler.txt") + + def test_rejects_directory_path(self, tmp_path): + (tmp_path / "a_dir").mkdir() + m = IntegrationManifest("test", tmp_path) + with pytest.raises(ValueError, match="not a regular file"): + m.record_existing("a_dir") + + def test_rejects_missing_path(self, tmp_path): + # ``is_file()`` is False for non-existent paths too; the same error + # surface keeps callers from having to distinguish "missing" from + # "wrong kind" — both mean "cannot hash this". + m = IntegrationManifest("test", tmp_path) + with pytest.raises(ValueError, match="not a regular file"): + m.record_existing("never-existed.txt") + + def test_lexical_prevalidation_for_absolute_path(self, tmp_path): + # ``record_existing`` must reject absolute paths via the lexical + # pre-check, NOT via the filesystem-touching ``is_symlink()`` call. + # Verified by passing an absolute path that points to a directory + # outside the project root — the canonical "Absolute paths" error + # must surface before any stat on the absolute path. + m = IntegrationManifest("test", tmp_path) + abs_path = "C:\\tmp\\escape.txt" if sys.platform == "win32" else "/tmp/escape.txt" + with pytest.raises(ValueError, match="Absolute paths"): + m.record_existing(abs_path) + + class TestManifestPathTraversal: def test_record_file_rejects_parent_traversal(self, tmp_path): m = IntegrationManifest("test", tmp_path) @@ -245,3 +296,83 @@ def test_load_invalid_json_raises(self, tmp_path): path.write_text("{not valid json", encoding="utf-8") with pytest.raises(ValueError, match="invalid JSON"): IntegrationManifest.load("bad", tmp_path) + + +class TestManifestRecoveredFiles: + """Coverage for the ``recovered_files`` channel added in #2483. + + When ``shared_infra`` skips an existing file (because the user already has + it on disk) it now records the file with ``recovered=True``. The path + appears in ``manifest.recovered_files`` and ``is_recovered(path)`` returns + True. ``refresh_managed`` (out of scope for this PR) consults this list + before treating the recorded hash as a managed baseline, defending against + silent overwrite of user customizations after manifest loss. + """ + + def test_record_existing_default_is_not_recovered(self, tmp_path): + (tmp_path / "f.txt").write_text("x", encoding="utf-8") + m = IntegrationManifest("test", tmp_path) + m.record_existing("f.txt") + assert m.is_recovered("f.txt") is False + assert m.recovered_files == set() + + def test_record_existing_with_recovered_flag(self, tmp_path): + (tmp_path / "f.txt").write_text("x", encoding="utf-8") + m = IntegrationManifest("test", tmp_path) + m.record_existing("f.txt", recovered=True) + assert m.is_recovered("f.txt") is True + assert m.recovered_files == {"f.txt"} + # File still hashed normally so check_modified/uninstall keep working + assert m.files["f.txt"] == _sha256(tmp_path / "f.txt") + + def test_recovered_files_round_trips_through_save_load(self, tmp_path): + (tmp_path / "a.txt").write_text("aaa", encoding="utf-8") + (tmp_path / "b.txt").write_text("bbb", encoding="utf-8") + m = IntegrationManifest("test", tmp_path, version="9.9") + m.record_existing("a.txt", recovered=True) + m.record_existing("b.txt") # not recovered + m.save() + loaded = IntegrationManifest.load("test", tmp_path) + assert loaded.is_recovered("a.txt") is True + assert loaded.is_recovered("b.txt") is False + assert loaded.recovered_files == {"a.txt"} + + def test_save_omits_empty_recovered_files(self, tmp_path): + m = IntegrationManifest("test", tmp_path) + m.record_file("f.txt", "x") + path = m.save() + data = json.loads(path.read_text(encoding="utf-8")) + assert "recovered_files" not in data + + def test_load_rejects_non_list_recovered_files(self, tmp_path): + path = tmp_path / ".specify" / "integrations" / "bad.manifest.json" + path.parent.mkdir(parents=True) + path.write_text( + json.dumps({"files": {}, "recovered_files": "not-a-list"}), + encoding="utf-8", + ) + with pytest.raises(ValueError, match="recovered_files"): + IntegrationManifest.load("bad", tmp_path) + + +class TestRecordExistingNewGuards: + """Coverage for the two new guards added by Copilot's 2026-05-18 review.""" + + def test_rejects_symlinked_ancestor(self, tmp_path): + real_dir = tmp_path / "real_dir" + real_dir.mkdir() + (real_dir / "file.txt").write_text("payload", encoding="utf-8") + (tmp_path / "linked_dir").symlink_to(real_dir, target_is_directory=True) + m = IntegrationManifest("test", tmp_path) + with pytest.raises(ValueError, match="symlinked"): + m.record_existing("linked_dir/file.txt") + + def test_rejects_inside_root_dotdot_with_explicit_message(self, tmp_path): + # ``dir/../file.txt`` normalizes inside root, so the old "escapes + # project root" message was misleading. The new message names the + # actual reason: canonicalization. + (tmp_path / "dir").mkdir() + (tmp_path / "file.txt").write_text("x", encoding="utf-8") + m = IntegrationManifest("test", tmp_path) + with pytest.raises(ValueError, match=r"canonical|'\.\.' segments"): + m.record_existing("dir/../file.txt")