Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 33 additions & 108 deletions src/specify_cli/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from packaging import version as pkg_version
from packaging.specifiers import SpecifierSet, InvalidSpecifier

from .catalogs import CatalogEntry as BaseCatalogEntry, CatalogStackBase

_FALLBACK_CORE_COMMAND_NAMES = frozenset({
"analyze",
"checklist",
Expand Down Expand Up @@ -107,13 +109,8 @@ def normalize_priority(value: Any, default: int = 10) -> int:


@dataclass
class CatalogEntry:
class CatalogEntry(BaseCatalogEntry):
"""Represents a single catalog entry in the catalog stack."""
url: str
name: str
priority: int
install_allowed: bool
description: str = ""


class ExtensionManifest:
Expand Down Expand Up @@ -1666,12 +1663,16 @@ def register_commands_for_claude(
return self.register_commands_for_agent("claude", manifest, extension_dir, project_root)


class ExtensionCatalog:
class ExtensionCatalog(CatalogStackBase):
"""Manages extension catalog fetching, caching, and searching."""

DEFAULT_CATALOG_URL = "https://raw.githubusercontent.com/github/spec-kit/main/extensions/catalog.json"
COMMUNITY_CATALOG_URL = "https://raw.githubusercontent.com/github/spec-kit/main/extensions/catalog.community.json"
CACHE_DURATION = 3600 # 1 hour in seconds
CONFIG_FILENAME = "extension-catalogs.yml"
ENTRY_CLASS = CatalogEntry
ERROR_TYPE = ValidationError
VALIDATION_ERROR_TYPE = ValidationError

def __init__(self, project_root: Path):
"""Initialize extension catalog manager.
Expand All @@ -1685,27 +1686,6 @@ def __init__(self, project_root: Path):
self.cache_file = self.cache_dir / "catalog.json"
self.cache_metadata_file = self.cache_dir / "catalog-metadata.json"

def _validate_catalog_url(self, url: str) -> None:
"""Validate that a catalog URL uses HTTPS (localhost HTTP allowed).

Args:
url: URL to validate

Raises:
ValidationError: If URL is invalid or uses non-HTTPS scheme
"""
from urllib.parse import urlparse

parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost):
raise ValidationError(
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
"HTTP is only allowed for localhost."
)
if not parsed.netloc:
raise ValidationError("Catalog URL must be a valid URL with a host.")

def _make_request(self, url: str):
"""Build a urllib Request, adding auth headers when a provider matches.

Expand All @@ -1722,81 +1702,6 @@ def _open_url(self, url: str, timeout: int = 10):
from specify_cli.authentication.http import open_url
return open_url(url, timeout)

def _load_catalog_config(self, config_path: Path) -> Optional[List[CatalogEntry]]:
"""Load catalog stack configuration from a YAML file.

Args:
config_path: Path to extension-catalogs.yml

Returns:
Ordered list of CatalogEntry objects, or None if file doesn't exist.

Raises:
ValidationError: If any catalog entry has an invalid URL,
the file cannot be parsed, a priority value is invalid,
or the file exists but contains no valid catalog entries
(fail-closed for security).
"""
if not config_path.exists():
return None
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeError) as e:
raise ValidationError(
f"Failed to read catalog config {config_path}: {e}"
)
catalogs_data = data.get("catalogs", [])
if not catalogs_data:
# File exists but has no catalogs key or empty list - fail closed
raise ValidationError(
f"Catalog config {config_path} exists but contains no 'catalogs' entries. "
f"Remove the file to use built-in defaults, or add valid catalog entries."
)
if not isinstance(catalogs_data, list):
raise ValidationError(
f"Invalid catalog config: 'catalogs' must be a list, got {type(catalogs_data).__name__}"
)
entries: List[CatalogEntry] = []
skipped_entries: List[int] = []
for idx, item in enumerate(catalogs_data):
if not isinstance(item, dict):
raise ValidationError(
f"Invalid catalog entry at index {idx}: expected a mapping, got {type(item).__name__}"
)
url = str(item.get("url", "")).strip()
if not url:
skipped_entries.append(idx)
continue
self._validate_catalog_url(url)
try:
priority = int(item.get("priority", idx + 1))
except (TypeError, ValueError):
raise ValidationError(
f"Invalid priority for catalog '{item.get('name', idx + 1)}': "
f"expected integer, got {item.get('priority')!r}"
)
raw_install = item.get("install_allowed", False)
if isinstance(raw_install, str):
install_allowed = raw_install.strip().lower() in ("true", "yes", "1")
else:
install_allowed = bool(raw_install)
entries.append(CatalogEntry(
url=url,
name=str(item.get("name", f"catalog-{idx + 1}")),
priority=priority,
install_allowed=install_allowed,
description=str(item.get("description", "")),
))
entries.sort(key=lambda e: e.priority)
if not entries:
# All entries were invalid (missing URLs) - fail closed for security
raise ValidationError(
f"Catalog config {config_path} contains {len(catalogs_data)} entries but none have valid URLs "
f"(entries at indices {skipped_entries} were skipped). "
f"Each catalog entry must have a 'url' field."
)
return entries

def get_active_catalogs(self) -> List[CatalogEntry]:
"""Get the ordered list of active catalogs.

Expand Down Expand Up @@ -1826,24 +1731,44 @@ def get_active_catalogs(self) -> List[CatalogEntry]:
file=sys.stderr,
)
self._non_default_catalog_warning_shown = True
return [CatalogEntry(url=catalog_url, name="custom", priority=1, install_allowed=True, description="Custom catalog via SPECKIT_CATALOG_URL")]
return [
self._entry(
url=catalog_url,
name="custom",
priority=1,
install_allowed=True,
description="Custom catalog via SPECKIT_CATALOG_URL",
)
]

# 2. Project-level config overrides all defaults
project_config_path = self.project_root / ".specify" / "extension-catalogs.yml"
project_config_path = self.project_root / ".specify" / self.CONFIG_FILENAME
catalogs = self._load_catalog_config(project_config_path)
if catalogs is not None:
return catalogs

# 3. User-level config
user_config_path = Path.home() / ".specify" / "extension-catalogs.yml"
user_config_path = Path.home() / ".specify" / self.CONFIG_FILENAME
catalogs = self._load_catalog_config(user_config_path)
if catalogs is not None:
return catalogs

# 4. Built-in default stack
return [
CatalogEntry(url=self.DEFAULT_CATALOG_URL, name="default", priority=1, install_allowed=True, description="Built-in catalog of installable extensions"),
CatalogEntry(url=self.COMMUNITY_CATALOG_URL, name="community", priority=2, install_allowed=False, description="Community-contributed extensions (discovery only)"),
self._entry(
url=self.DEFAULT_CATALOG_URL,
name="default",
priority=1,
install_allowed=True,
description="Built-in catalog of installable extensions",
),
self._entry(
url=self.COMMUNITY_CATALOG_URL,
name="community",
priority=2,
install_allowed=False,
description="Community-contributed extensions (discovery only)",
),
]

def get_catalog_url(self) -> str:
Expand Down
109 changes: 107 additions & 2 deletions tests/test_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,7 +1846,7 @@ def test_unregister_skill_removes_parent_directory(self, project_dir, temp_dir):
registrar = CommandRegistrar()
from specify_cli.extensions import ExtensionManifest
manifest = ExtensionManifest(ext_dir / "extension.yml")
registered = registrar.register_commands_for_agent("codex", manifest, ext_dir, project_dir)
registrar.register_commands_for_agent("codex", manifest, ext_dir, project_dir)

skill_subdir = skills_dir / "speckit-cleanup-ext-run"
assert skill_subdir.exists(), "Skill subdirectory should exist after registration"
Expand Down Expand Up @@ -2580,7 +2580,8 @@ def fake_open(req, timeout=None):
def test_download_extension_sends_auth_header(self, temp_dir, monkeypatch):
"""download_extension passes Authorization header when a provider is configured."""
from unittest.mock import patch, MagicMock
import zipfile, io
import zipfile
import io

monkeypatch.setenv("GITHUB_TOKEN", "ghp_testtoken")
self._inject_github_config(monkeypatch, token_env="GITHUB_TOKEN")
Expand Down Expand Up @@ -2854,6 +2855,110 @@ def test_load_catalog_config_localhost_allowed(self, temp_dir):
assert len(entries) == 1
assert entries[0].url == "http://localhost:8000/catalog.json"

@pytest.mark.parametrize(
"config_content", ["[]\n", "false\n", "0\n", "''\n", "- item\n"]
)
def test_load_catalog_config_rejects_non_mapping_roots(
self, temp_dir, config_content
):
"""Malformed roots raise ValidationError, not fallback or AttributeError."""
project_dir = self._make_project(temp_dir)
config_path = project_dir / ".specify" / "extension-catalogs.yml"
config_path.write_text(config_content, encoding="utf-8")

catalog = ExtensionCatalog(project_dir)

with pytest.raises(
ValidationError, match="expected a YAML mapping at the root"
) as exc_info:
catalog.get_active_catalogs()
assert str(config_path) in str(exc_info.value)

def test_load_catalog_config_rejects_boolean_priority(self, temp_dir):
"""Boolean priorities are rejected instead of being coerced to 1 or 0."""
import yaml as yaml_module

project_dir = self._make_project(temp_dir)
config_path = project_dir / ".specify" / "extension-catalogs.yml"
config_path.write_text(
yaml_module.dump(
{
"catalogs": [
{
"name": "bad-priority",
"url": "https://example.com/catalog.json",
"priority": True,
}
]
}
),
encoding="utf-8",
)

catalog = ExtensionCatalog(project_dir)

with pytest.raises(
ValidationError, match="Invalid priority|expected integer"
) as exc_info:
catalog.get_active_catalogs()
assert str(config_path) in str(exc_info.value)

def test_load_catalog_config_defaults_blank_names(self, temp_dir):
"""Blank and null names normalize by valid catalog order."""
import yaml as yaml_module

project_dir = self._make_project(temp_dir)
config_path = project_dir / ".specify" / "extension-catalogs.yml"
config_path.write_text(
yaml_module.dump(
{
"catalogs": [
{"name": "skipped", "url": " "},
{"name": None, "url": "https://one.example.com/catalog.json"},
{"name": " ", "url": "https://two.example.com/catalog.json"},
]
}
),
encoding="utf-8",
)

catalog = ExtensionCatalog(project_dir)

assert [entry.name for entry in catalog.get_active_catalogs()] == [
"catalog-1",
"catalog-2",
]

@pytest.mark.parametrize(
("url", "expected_detail"),
[
("relative/catalog.json", "HTTPS"),
("https:///no-host", "valid URL with a host"),
],
)
def test_load_catalog_config_invalid_url_includes_context(
self, temp_dir, url, expected_detail
):
"""Invalid catalog URLs include the config path and entry index."""
import yaml as yaml_module

project_dir = self._make_project(temp_dir)
config_path = project_dir / ".specify" / "extension-catalogs.yml"
config_path.write_text(
yaml_module.dump({"catalogs": [{"name": "bad", "url": url}]}),
encoding="utf-8",
)

catalog = ExtensionCatalog(project_dir)

with pytest.raises(ValidationError) as exc_info:
catalog.get_active_catalogs()
message = str(exc_info.value)
assert "Invalid catalog URL" in message
assert str(config_path) in message
assert "index 0" in message
assert expected_detail in message

# --- Merge conflict resolution ---

def test_merge_conflict_higher_priority_wins(self, temp_dir):
Expand Down
Loading