diff --git a/tests/playwright/README.md b/tests/playwright/README.md new file mode 100644 index 0000000..c4f0c39 --- /dev/null +++ b/tests/playwright/README.md @@ -0,0 +1,99 @@ +# SSH Command Playwright E2E Tests + +End-to-end tests for the **SSH Command** Home Assistant custom component using +[Playwright](https://playwright.dev/python/). + +## Prerequisites + +- Python 3.11+ +- A running Home Assistant instance (default: `http://homeassistant:8123`) +- Two SSH test servers accessible at: + - `ssh_docker_test:2222` (user: `foo`, password: `pass`) + - `ssh_docker_test:2223` (user: `foo`, password: `pass`) + +The SSH test servers and Home Assistant are provided by the `docker-compose.yaml` +in the repository root. + +## Quick Start + +```bash +# 1. Start the test environment +docker-compose up -d + +# 2. Wait for Home Assistant to complete its first-run setup, then create an +# admin account (username: admin, password: admin) or set HA_USERNAME/HA_PASSWORD. + +# 3. Install the SSH Command custom component into Home Assistant: +docker cp . homeassistant_test:/config/custom_components/ssh_command + +# 4. Restart Home Assistant so it loads the component: +docker-compose restart homeassistant + +# 5. Install Python dependencies: +pip install -r tests/playwright/requirements.txt + +# 6. Install the Playwright browser: +playwright install chromium + +# 7. Run all tests: +pytest tests/playwright/ +``` + +## Environment Variables + +| Variable | Default | Description | +|---|---|---| +| `HOMEASSISTANT_URL` | `http://homeassistant:8123` | Home Assistant base URL | +| `SSH_HOST` | `ssh_docker_test` | Hostname of the SSH test servers | +| `SSH_PORT_1` | `2222` | Port for SSH Test Server 1 | +| `SSH_PORT_2` | `2223` | Port for SSH Test Server 2 | +| `SSH_USER` | `foo` | SSH username | +| `SSH_PASSWORD` | `pass` | SSH password | +| `HA_USERNAME` | `admin` | Home Assistant admin username | +| `HA_PASSWORD` | `admin` | Home Assistant admin password | + +## Running on a Local Machine (outside Docker) + +```bash +export HOMEASSISTANT_URL=http://localhost:8123 +export SSH_HOST=localhost +export SSH_PORT_1=2222 +export SSH_PORT_2=2223 +export HA_USERNAME=admin +export HA_PASSWORD=admin + +pytest tests/playwright/ -v +``` + +## Test Modules + +| File | What it tests | +|---|---| +| `test_integration_setup.py` | Adding/removing the integration via the config flow | +| `test_command_execution.py` | Executing SSH commands against real test servers | +| `test_services.py` | The `ssh_command.execute` HA service interface | +| `test_frontend.py` | Home Assistant frontend pages and UI interactions | +| `test_configuration.py` | Configuration options (timeout, auth, known hosts, …) | +| `test_security.py` | Security properties (auth validation, unauthenticated access, …) | + +## Fixtures (`conftest.py`) + +| Fixture | Scope | Description | +|---|---|---| +| `playwright_instance` | session | Playwright instance | +| `browser` | session | Headless Chromium browser | +| `ha_base_url` | session | Configured HA URL | +| `ha_token` | session | Long-lived HA access token | +| `context` | function | Authenticated browser context | +| `page` | function | Fresh page within the authenticated context | +| `ssh_server_1` | session | Connection params for SSH server 1 | +| `ssh_server_2` | session | Connection params for SSH server 2 | +| `ha_api` | function | `requests.Session` for the HA REST API | +| `ensure_integration` | function | Ensures SSH Command is set up; tears down after test | + +## Notes + +- Tests are designed to be **idempotent** – each test cleans up after itself. +- Tests do **not** depend on each other. +- Browser-based tests use a headless Chromium instance. +- API-based tests call Home Assistant's REST API directly for speed and reliability. diff --git a/tests/playwright/conftest.py b/tests/playwright/conftest.py new file mode 100644 index 0000000..e114f5c --- /dev/null +++ b/tests/playwright/conftest.py @@ -0,0 +1,245 @@ +"""Pytest configuration and fixtures for SSH Command Playwright E2E tests.""" + +from __future__ import annotations + +import json +import os +import time +from typing import Any, Generator + +import pytest +import requests +from playwright.sync_api import Browser, BrowserContext, Page, Playwright, sync_playwright + +# --------------------------------------------------------------------------- +# Environment-variable driven configuration +# --------------------------------------------------------------------------- + +HA_URL: str = os.environ.get("HOMEASSISTANT_URL", "http://homeassistant:8123") +SSH_HOST: str = os.environ.get("SSH_HOST", "ssh_docker_test") +SSH_PORT_1: int = int(os.environ.get("SSH_PORT_1", "2222")) +SSH_PORT_2: int = int(os.environ.get("SSH_PORT_2", "2223")) +SSH_USER: str = os.environ.get("SSH_USER", "foo") +SSH_PASSWORD: str = os.environ.get("SSH_PASSWORD", "pass") + +HA_USERNAME: str = os.environ.get("HA_USERNAME", "admin") +HA_PASSWORD: str = os.environ.get("HA_PASSWORD", "admin") + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_HA_TOKEN: str | None = None + + +def get_ha_token() -> str: + """Obtain a long-lived Home Assistant access token via the REST API. + + On the first call the token is fetched and cached for the remainder of + the test session. + """ + global _HA_TOKEN # noqa: PLW0603 + if _HA_TOKEN: + return _HA_TOKEN + + # 1. Fetch the CSRF token from the login page + session = requests.Session() + login_page = session.get(f"{HA_URL}/auth/login_flow", timeout=30) + login_page.raise_for_status() + + # 2. Initiate the login flow + flow_resp = session.post( + f"{HA_URL}/auth/login_flow", + json={"client_id": HA_URL, "handler": ["homeassistant", None], "redirect_uri": f"{HA_URL}/"}, + timeout=30, + ) + flow_resp.raise_for_status() + flow_id = flow_resp.json()["flow_id"] + + # 3. Submit credentials + cred_resp = session.post( + f"{HA_URL}/auth/login_flow/{flow_id}", + json={"username": HA_USERNAME, "password": HA_PASSWORD, "client_id": HA_URL}, + timeout=30, + ) + cred_resp.raise_for_status() + auth_code = cred_resp.json().get("result") + + # 4. Exchange code for token + token_resp = session.post( + f"{HA_URL}/auth/token", + data={ + "grant_type": "authorization_code", + "code": auth_code, + "client_id": HA_URL, + }, + timeout=30, + ) + token_resp.raise_for_status() + _HA_TOKEN = token_resp.json()["access_token"] + return _HA_TOKEN + + +def wait_for_ha(timeout: int = 120) -> None: + """Block until Home Assistant is ready to accept connections.""" + deadline = time.time() + timeout + while time.time() < deadline: + try: + resp = requests.get(f"{HA_URL}/api/", timeout=5) + if resp.status_code in (200, 401): + return + except requests.RequestException: + pass + time.sleep(2) + raise RuntimeError(f"Home Assistant did not become ready within {timeout}s") + + +# --------------------------------------------------------------------------- +# Session-scoped Playwright fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def playwright_instance() -> Generator[Playwright, None, None]: + """Provide a session-scoped Playwright instance.""" + with sync_playwright() as pw: + yield pw + + +@pytest.fixture(scope="session") +def browser(playwright_instance: Playwright) -> Generator[Browser, None, None]: + """Provide a session-scoped Chromium browser.""" + browser = playwright_instance.chromium.launch(headless=True) + yield browser + browser.close() + + +@pytest.fixture(scope="session") +def ha_base_url() -> str: + """Return the configured Home Assistant base URL.""" + return HA_URL + + +@pytest.fixture(scope="session") +def ha_token() -> str: + """Provide a valid Home Assistant long-lived access token.""" + wait_for_ha() + return get_ha_token() + + +# --------------------------------------------------------------------------- +# Per-test browser context with an authenticated HA session +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def context(browser: Browser, ha_token: str) -> Generator[BrowserContext, None, None]: + """Provide an authenticated browser context for Home Assistant.""" + ctx = browser.new_context( + base_url=HA_URL, + extra_http_headers={"Authorization": f"Bearer {ha_token}"}, + ) + # Inject the token into localStorage so the HA frontend recognises the session. + # Use json.dumps to safely escape all values before embedding in JS. + token_json = json.dumps(ha_token) + ha_url_json = json.dumps(HA_URL) + ctx.add_init_script( + f""" + window.localStorage.setItem( + 'hassTokens', + JSON.stringify({{ + access_token: {token_json}, + token_type: 'Bearer', + expires_in: 1800, + hassUrl: {ha_url_json}, + clientId: {ha_url_json}, + expires: Date.now() + 1800000, + refresh_token: '' + }}) + ); + """ + ) + yield ctx + ctx.close() + + +@pytest.fixture() +def page(context: BrowserContext) -> Generator[Page, None, None]: + """Provide a fresh page within the authenticated browser context.""" + pg = context.new_page() + yield pg + pg.close() + + +# --------------------------------------------------------------------------- +# SSH server fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def ssh_server_1() -> dict: + """Return connection parameters for SSH Test Server 1.""" + return { + "host": SSH_HOST, + "port": SSH_PORT_1, + "username": SSH_USER, + "password": SSH_PASSWORD, + } + + +@pytest.fixture(scope="session") +def ssh_server_2() -> dict: + """Return connection parameters for SSH Test Server 2.""" + return { + "host": SSH_HOST, + "port": SSH_PORT_2, + "username": SSH_USER, + "password": SSH_PASSWORD, + } + + +# --------------------------------------------------------------------------- +# Integration setup / teardown helper +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def ha_api(ha_token: str) -> requests.Session: + """Return a requests Session pre-configured to call the HA REST API.""" + session = requests.Session() + session.headers["Authorization"] = f"Bearer {ha_token}" + session.headers["Content-Type"] = "application/json" + return session + + +@pytest.fixture() +def ensure_integration(ha_api: requests.Session) -> Generator[None, None, None]: + """Ensure the SSH Command integration is set up before a test runs. + + Tears down the integration (removes the config entry) after the test. + """ + # Check whether the integration is already configured + resp = ha_api.get(f"{HA_URL}/api/config/config_entries/entry") + resp.raise_for_status() + entries_before = { + e["entry_id"] + for e in resp.json() + if e.get("domain") == "ssh_command" + } + + # If not present, initiate the config flow + if not entries_before: + flow_resp = ha_api.post( + f"{HA_URL}/api/config/config_entries/flow", + json={"handler": "ssh_command"}, + ) + flow_resp.raise_for_status() + + yield + + # Teardown: remove any entries that were added during the test + resp = ha_api.get(f"{HA_URL}/api/config/config_entries/entry") + resp.raise_for_status() + for entry in resp.json(): + if entry.get("domain") == "ssh_command" and entry["entry_id"] not in entries_before: + ha_api.delete(f"{HA_URL}/api/config/config_entries/entry/{entry['entry_id']}") diff --git a/tests/playwright/requirements.txt b/tests/playwright/requirements.txt new file mode 100644 index 0000000..e0b541e --- /dev/null +++ b/tests/playwright/requirements.txt @@ -0,0 +1,4 @@ +pytest>=8.0.0,<9.0.0 +pytest-playwright>=0.5.0,<1.0.0 +playwright>=1.44.0,<2.0.0 +requests>=2.32.0,<3.0.0 diff --git a/tests/playwright/test_command_execution.py b/tests/playwright/test_command_execution.py new file mode 100644 index 0000000..d6a3b97 --- /dev/null +++ b/tests/playwright/test_command_execution.py @@ -0,0 +1,141 @@ +"""Playwright E2E tests: SSH command execution against real SSH test servers.""" + +from __future__ import annotations + +import pytest +from typing import Any +import requests + +from conftest import HA_URL + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def execute(ha_api: requests.Session, payload: dict) -> requests.Response: + """Call the ssh_command.execute service and return the raw response.""" + return ha_api.post( + f"{HA_URL}/api/services/ssh_command/execute?return_response", + json=payload, + ) + + +def base_payload(ssh_server: dict, command: str, **kwargs) -> dict: + """Build a minimal execute payload from a server fixture and a command. + + Extra keyword arguments are merged into the payload, allowing callers to + override any field (e.g. ``timeout``, ``check_known_hosts``). + """ + payload = { + "host": ssh_server["host"], + "username": ssh_server["username"], + "password": ssh_server["password"], + "command": command, + "check_known_hosts": False, + } + payload.update(kwargs) + return payload + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestCommandExecution: + """End-to-end tests that execute real commands on the SSH test servers.""" + + def test_echo_command(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """A simple echo command returns the expected string on stdout.""" + resp = execute(ha_api, base_payload(ssh_server_1, "echo hello")) + assert resp.status_code == 200, resp.text + data = resp.json() + assert "hello" in data.get("output", "") + assert data.get("exit_status") == 0 + + def test_pwd_command(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """The pwd command returns a non-empty path.""" + resp = execute(ha_api, base_payload(ssh_server_1, "pwd")) + assert resp.status_code == 200, resp.text + data = resp.json() + assert data.get("output", "").strip() != "" + assert data.get("exit_status") == 0 + + def test_command_stdout_captured(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Multiline output is fully captured.""" + resp = execute(ha_api, base_payload(ssh_server_1, "printf 'line1\\nline2\\nline3\\n'")) + assert resp.status_code == 200, resp.text + output = resp.json().get("output", "") + assert "line1" in output + assert "line2" in output + assert "line3" in output + + def test_command_stderr_captured(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Output written to stderr is captured in the 'error' field.""" + resp = execute(ha_api, base_payload(ssh_server_1, "echo error_message >&2")) + assert resp.status_code == 200, resp.text + data = resp.json() + assert "error_message" in data.get("error", "") + + def test_nonzero_exit_status(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """A failing command returns a non-zero exit status.""" + resp = execute(ha_api, base_payload(ssh_server_1, "exit 42")) + assert resp.status_code == 200, resp.text + assert resp.json().get("exit_status") == 42 + + def test_zero_exit_status(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """A successful command returns exit status 0.""" + resp = execute(ha_api, base_payload(ssh_server_1, "true")) + assert resp.status_code == 200, resp.text + assert resp.json().get("exit_status") == 0 + + def test_command_with_env_variable(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Environment variable expansion works inside commands.""" + resp = execute(ha_api, base_payload(ssh_server_1, "echo $HOME")) + assert resp.status_code == 200, resp.text + assert resp.json().get("output", "").strip() != "" + + def test_second_ssh_server(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_2: dict) -> None: + """Commands can be executed against the second SSH test server.""" + resp = execute(ha_api, base_payload(ssh_server_2, "echo server2")) + assert resp.status_code == 200, resp.text + assert "server2" in resp.json().get("output", "") + + def test_command_timeout_handling(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """A command that exceeds the timeout returns a 400 error.""" + payload = base_payload(ssh_server_1, "sleep 60") + payload["timeout"] = 2 + resp = execute(ha_api, payload) + # HA raises ServiceValidationError for timeout → HTTP 400 + assert resp.status_code == 400, resp.text + + def test_command_not_provided_requires_input(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Omitting both command and input returns a 400 validation error.""" + payload = { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "check_known_hosts": False, + } + resp = execute(ha_api, payload) + assert resp.status_code == 400, resp.text + + def test_no_password_or_key_returns_error(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Omitting both password and key_file returns a 400 validation error.""" + payload = { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "command": "echo hi", + "check_known_hosts": False, + } + resp = execute(ha_api, payload) + assert resp.status_code == 400, resp.text + + def test_long_output_command(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """A command that produces a large amount of output is handled correctly.""" + resp = execute(ha_api, base_payload(ssh_server_1, "seq 1 500")) + assert resp.status_code == 200, resp.text + output = resp.json().get("output", "") + assert "500" in output diff --git a/tests/playwright/test_configuration.py b/tests/playwright/test_configuration.py new file mode 100644 index 0000000..ad7ab2d --- /dev/null +++ b/tests/playwright/test_configuration.py @@ -0,0 +1,170 @@ +"""Playwright E2E tests: SSH Command configuration management.""" + +from __future__ import annotations + +import pytest +from typing import Any +import requests + +from conftest import HA_URL + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def execute(ha_api: requests.Session, payload: dict) -> requests.Response: + """Call the ssh_command.execute service.""" + return ha_api.post( + f"{HA_URL}/api/services/ssh_command/execute?return_response", + json=payload, + ) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestConfiguration: + """Tests covering configuration options of the SSH Command integration.""" + + def test_default_timeout_accepted(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Omitting the timeout field uses the default (30 s) and the call succeeds.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo default_timeout", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 200, resp.text + assert "default_timeout" in resp.json()["output"] + + def test_custom_timeout_accepted(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """An explicit timeout value is accepted by the service schema.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo custom_timeout", + "check_known_hosts": False, + "timeout": 20, + }, + ) + assert resp.status_code == 200, resp.text + assert "custom_timeout" in resp.json()["output"] + + def test_check_known_hosts_false(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Setting check_known_hosts=False bypasses host verification.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo no_host_check", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 200, resp.text + assert "no_host_check" in resp.json()["output"] + + def test_known_hosts_with_check_disabled_rejected(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Providing known_hosts while check_known_hosts=False is a validation error.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo hi", + "check_known_hosts": False, + "known_hosts": "/tmp/known_hosts", + }, + ) + assert resp.status_code == 400, resp.text + + def test_password_auth_configuration(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Password-based authentication is accepted and works against the test server.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo password_auth", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 200, resp.text + assert "password_auth" in resp.json()["output"] + + def test_key_file_not_found_rejected(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Providing a non-existent key_file path results in a validation error.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "key_file": "/nonexistent/id_rsa", + "command": "echo hi", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 400, resp.text + + def test_multiple_servers_independent( + self, + ha_api: requests.Session, + ensure_integration: Any, + ssh_server_1: dict, + ssh_server_2: dict, + ) -> None: + """Commands can be sent to two different SSH servers independently.""" + resp1 = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo server1", + "check_known_hosts": False, + }, + ) + resp2 = execute( + ha_api, + { + "host": ssh_server_2["host"], + "username": ssh_server_2["username"], + "password": ssh_server_2["password"], + "command": "echo server2", + "check_known_hosts": False, + }, + ) + assert resp1.status_code == 200, resp1.text + assert resp2.status_code == 200, resp2.text + assert "server1" in resp1.json()["output"] + assert "server2" in resp2.json()["output"] + + def test_username_configuration(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """The username field is correctly forwarded to the SSH connection.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "whoami", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 200, resp.text + output = resp.json()["output"].strip() + assert output == ssh_server_1["username"] diff --git a/tests/playwright/test_frontend.py b/tests/playwright/test_frontend.py new file mode 100644 index 0000000..a2fa01c --- /dev/null +++ b/tests/playwright/test_frontend.py @@ -0,0 +1,82 @@ +"""Playwright E2E tests: SSH Command frontend / UI interactions.""" + +from __future__ import annotations + +import pytest +from typing import Any +from playwright.sync_api import Page, expect + +from conftest import HA_URL + + +class TestFrontend: + """Tests that exercise the Home Assistant frontend with the SSH Command integration.""" + + def test_home_assistant_frontend_loads(self, page: Page) -> None: + """The Home Assistant frontend loads successfully.""" + page.goto(HA_URL) + page.wait_for_load_state("networkidle") + # HA login page or overview should load + expect(page).not_to_have_title("") + + def test_integrations_page_accessible(self, page: Page) -> None: + """The integrations settings page is accessible.""" + page.goto(f"{HA_URL}/config/integrations") + page.wait_for_load_state("networkidle") + # Page should not show a network error + assert page.url.startswith(HA_URL), f"Unexpected redirect to: {page.url}" + + def test_developer_tools_page_loads(self, page: Page) -> None: + """Developer tools page loads (used for calling services manually).""" + page.goto(f"{HA_URL}/developer-tools/service") + page.wait_for_load_state("networkidle") + assert page.url.startswith(HA_URL) + + def test_ssh_command_visible_in_integrations(self, page: Page, ensure_integration: Any) -> None: + """After setup, SSH Command appears on the integrations page.""" + page.goto(f"{HA_URL}/config/integrations") + page.wait_for_load_state("networkidle") + # Look for the integration card/name on the page + ssh_card = page.get_by_text("SSH Command", exact=False) + expect(ssh_card.first).to_be_visible() + + def test_service_call_via_developer_tools(self, page: Page, ensure_integration: Any) -> None: + """It should be possible to navigate to the service call UI for ssh_command.""" + page.goto(f"{HA_URL}/developer-tools/service") + page.wait_for_load_state("networkidle") + + # Open the service selector dropdown + service_selector = page.locator("ha-service-picker, [data-domain='ssh_command']").first + if service_selector.is_visible(): + service_selector.click() + page.wait_for_timeout(500) + # Look for ssh_command option + ssh_option = page.get_by_text("ssh_command", exact=False) + if ssh_option.is_visible(): + ssh_option.first.click() + + # Page should still be accessible (no crashes) + assert page.url.startswith(HA_URL) + + def test_config_page_shows_integration_info(self, page: Page, ensure_integration: Any) -> None: + """The SSH Command integration detail page shows expected information.""" + page.goto(f"{HA_URL}/config/integrations") + page.wait_for_load_state("networkidle") + + # Try to click on the SSH Command integration card + ssh_link = page.get_by_text("SSH Command", exact=False).first + if ssh_link.is_visible(): + ssh_link.click() + page.wait_for_load_state("networkidle") + # Verify we are still on a valid HA page + assert page.url.startswith(HA_URL) + + def test_no_javascript_errors_on_main_page(self, page: Page) -> None: + """The main HA page does not log critical JavaScript errors.""" + errors: list[str] = [] + page.on("pageerror", lambda exc: errors.append(str(exc))) + page.goto(HA_URL) + page.wait_for_load_state("networkidle") + # Filter out known non-critical errors; check only for unhandled exceptions + critical = [e for e in errors if "ResizeObserver" not in e] + assert len(critical) == 0, f"JavaScript errors: {critical}" diff --git a/tests/playwright/test_integration_setup.py b/tests/playwright/test_integration_setup.py new file mode 100644 index 0000000..ba669a1 --- /dev/null +++ b/tests/playwright/test_integration_setup.py @@ -0,0 +1,159 @@ +"""Playwright E2E tests: SSH Command integration setup via the config flow.""" + +from __future__ import annotations + +import pytest +from typing import Any +import requests +from playwright.sync_api import Page, expect + +from conftest import HA_URL + + +class TestIntegrationSetup: + """Tests that cover adding and removing the SSH Command integration.""" + + def test_integration_page_loads(self, page: Page) -> None: + """The integrations page should load without errors.""" + page.goto(f"{HA_URL}/config/integrations") + page.wait_for_load_state("networkidle") + expect(page).to_have_title(lambda t: "Home Assistant" in t or "Integrations" in t) + + def test_add_integration_via_ui(self, page: Page) -> None: + """Adding the SSH Command integration through the UI config flow works.""" + page.goto(f"{HA_URL}/config/integrations") + page.wait_for_load_state("networkidle") + + # Click the "+ Add integration" button + add_btn = page.get_by_role("button", name="Add integration") + if not add_btn.is_visible(): + # Some HA versions show a FAB or icon button + add_btn = page.locator("[aria-label='Add integration']") + add_btn.click() + + # Search for "SSH Command" in the integration picker + search_box = page.get_by_placeholder("Search") + if not search_box.is_visible(): + search_box = page.locator("input[type='search']") + search_box.fill("SSH Command") + page.wait_for_timeout(500) + + # Select the SSH Command entry + page.get_by_text("SSH Command").first.click() + page.wait_for_timeout(1000) + + # The config flow either shows a form or creates an entry immediately + # (SSH Command uses single_instance_allowed with no form fields). + # Verify we land back on the integrations page or see an abort/success dialog. + page.wait_for_load_state("networkidle") + + def test_integration_appears_in_list(self, ha_api: requests.Session) -> None: + """After setup the SSH Command entry should appear in the config entries API.""" + # Initiate flow and complete it + flow_resp = ha_api.post( + f"{HA_URL}/api/config/config_entries/flow", + json={"handler": "ssh_command"}, + ) + assert flow_resp.status_code in (200, 201), flow_resp.text + + # Verify entry is present + entries_resp = ha_api.get(f"{HA_URL}/api/config/config_entries/entry") + entries_resp.raise_for_status() + domains = [e["domain"] for e in entries_resp.json()] + assert "ssh_command" in domains + + # Cleanup: remove the entry we just added + for entry in entries_resp.json(): + if entry["domain"] == "ssh_command": + ha_api.delete( + f"{HA_URL}/api/config/config_entries/entry/{entry['entry_id']}" + ) + + def test_single_instance_enforced(self, ha_api: requests.Session) -> None: + """A second setup attempt should be aborted by the single-instance guard.""" + # First setup + first = ha_api.post( + f"{HA_URL}/api/config/config_entries/flow", + json={"handler": "ssh_command"}, + ) + assert first.status_code in (200, 201), first.text + + # Second setup should result in an abort + second = ha_api.post( + f"{HA_URL}/api/config/config_entries/flow", + json={"handler": "ssh_command"}, + ) + assert second.status_code in (200, 201), second.text + result_type = second.json().get("type") + # Depending on HA version the abort is returned immediately + assert result_type in ("abort", "create_entry"), ( + f"Expected abort or immediate create_entry, got: {result_type}" + ) + + # Cleanup + entries_resp = ha_api.get(f"{HA_URL}/api/config/config_entries/entry") + for entry in entries_resp.json(): + if entry["domain"] == "ssh_command": + ha_api.delete( + f"{HA_URL}/api/config/config_entries/entry/{entry['entry_id']}" + ) + + def test_remove_integration(self, ha_api: requests.Session) -> None: + """Removing a config entry succeeds and the entry disappears from the list.""" + # Setup + flow_resp = ha_api.post( + f"{HA_URL}/api/config/config_entries/flow", + json={"handler": "ssh_command"}, + ) + assert flow_resp.status_code in (200, 201) + + entries_resp = ha_api.get(f"{HA_URL}/api/config/config_entries/entry") + entries_resp.raise_for_status() + entry_id = next( + (e["entry_id"] for e in entries_resp.json() if e["domain"] == "ssh_command"), + None, + ) + assert entry_id is not None, "Config entry was not created" + + # Delete + del_resp = ha_api.delete( + f"{HA_URL}/api/config/config_entries/entry/{entry_id}" + ) + assert del_resp.status_code in (200, 204) + + # Confirm it's gone + entries_resp2 = ha_api.get(f"{HA_URL}/api/config/config_entries/entry") + domains = [e["domain"] for e in entries_resp2.json()] + assert "ssh_command" not in domains + + def test_connection_error_handling(self, ha_api: requests.Session, ensure_integration: Any) -> None: + """Calling execute with an unreachable host raises a validation error.""" + resp = ha_api.post( + f"{HA_URL}/api/services/ssh_command/execute?return_response", + json={ + "host": "192.0.2.1", # RFC 5737 TEST-NET – guaranteed unreachable + "username": "nobody", + "password": "nopass", + "command": "echo hi", + "check_known_hosts": False, + "timeout": 5, + }, + ) + # HA returns 400 for ServiceValidationError + assert resp.status_code == 400, resp.text + + def test_invalid_credentials_error(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Connecting with wrong credentials returns a permission-denied error.""" + resp = ha_api.post( + f"{HA_URL}/api/services/ssh_command/execute?return_response", + json={ + "host": ssh_server_1["host"], + "port": ssh_server_1["port"], + "username": ssh_server_1["username"], + "password": "wrongpassword", + "command": "echo hi", + "check_known_hosts": False, + "timeout": 10, + }, + ) + assert resp.status_code == 400, resp.text diff --git a/tests/playwright/test_security.py b/tests/playwright/test_security.py new file mode 100644 index 0000000..76c2cdf --- /dev/null +++ b/tests/playwright/test_security.py @@ -0,0 +1,162 @@ +"""Playwright E2E tests: SSH Command security properties.""" + +from __future__ import annotations + +import pytest +from typing import Any +import requests + +from conftest import HA_URL + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def execute(ha_api: requests.Session, payload: dict) -> requests.Response: + """Call the ssh_command.execute service.""" + return ha_api.post( + f"{HA_URL}/api/services/ssh_command/execute?return_response", + json=payload, + ) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestSecurity: + """Tests that validate the security properties of the SSH Command integration.""" + + def test_invalid_password_rejected(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """An incorrect password results in a 400 authentication error.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": "definitely_wrong_password", + "command": "echo hi", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 400, resp.text + + def test_invalid_username_rejected(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """An incorrect username results in a 400 authentication error.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": "nonexistent_user_xyz", + "password": ssh_server_1["password"], + "command": "echo hi", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 400, resp.text + + def test_unreachable_host_rejected(self, ha_api: requests.Session, ensure_integration: Any) -> None: + """Connecting to an unreachable host results in a 400 connection error.""" + resp = execute( + ha_api, + { + "host": "192.0.2.255", # RFC 5737 TEST-NET – documentation address, typically unreachable + "username": "user", + "password": "pass", + "command": "echo hi", + "check_known_hosts": False, + "timeout": 5, + }, + ) + assert resp.status_code == 400, resp.text + + def test_nonexistent_host_rejected(self, ha_api: requests.Session, ensure_integration: Any) -> None: + """Connecting to a non-existent hostname results in a 400 DNS error.""" + resp = execute( + ha_api, + { + "host": "this.host.does.not.exist.invalid", # .invalid TLD is guaranteed non-resolvable (RFC 2606) + "username": "user", + "password": "pass", + "command": "echo hi", + "check_known_hosts": False, + "timeout": 5, + }, + ) + assert resp.status_code == 400, resp.text + + def test_nonexistent_key_file_rejected(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Referencing a key file that does not exist results in a validation error.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "key_file": "/nonexistent/path/id_rsa", + "command": "echo hi", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 400, resp.text + + def test_api_requires_authentication(self) -> None: + """Calling the HA service API without an auth token is rejected with 401.""" + resp = requests.post( + f"{HA_URL}/api/services/ssh_command/execute?return_response", + json={ + "host": "192.0.2.1", + "username": "user", + "password": "pass", + "command": "echo hi", + "check_known_hosts": False, + }, + timeout=10, + ) + assert resp.status_code == 401, resp.text + + def test_known_hosts_conflict_rejected(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Supplying known_hosts with check_known_hosts=False is rejected.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo hi", + "check_known_hosts": False, + "known_hosts": "/tmp/known_hosts_conflict", + }, + ) + assert resp.status_code == 400, resp.text + + def test_no_credentials_rejected(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """A service call that omits both password and key_file is rejected.""" + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "command": "echo hi", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 400, resp.text + + def test_successful_auth_uses_encrypted_connection(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """A successful SSH command is executed (implying an encrypted SSH session).""" + # asyncssh always uses encrypted connections; we verify the round-trip succeeds. + resp = execute( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo encrypted_conn_ok", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 200, resp.text + assert "encrypted_conn_ok" in resp.json()["output"] diff --git a/tests/playwright/test_services.py b/tests/playwright/test_services.py new file mode 100644 index 0000000..c77382b --- /dev/null +++ b/tests/playwright/test_services.py @@ -0,0 +1,172 @@ +"""Playwright E2E tests: ssh_command.execute service behaviour.""" + +from __future__ import annotations + +import pytest +from typing import Any +import requests + +from conftest import HA_URL + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def call_service(ha_api: requests.Session, payload: dict) -> requests.Response: + """POST to the ssh_command execute service and return the raw response.""" + return ha_api.post( + f"{HA_URL}/api/services/ssh_command/execute?return_response", + json=payload, + ) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestServices: + """Tests focused on the HA service interface of SSH Command.""" + + def test_service_registered(self, ha_api: requests.Session, ensure_integration: Any) -> None: + """The ssh_command.execute service should appear in the HA services list.""" + resp = ha_api.get(f"{HA_URL}/api/services") + resp.raise_for_status() + services = resp.json() + domains = {svc["domain"] for svc in services} + assert "ssh_command" in domains + + ssh_services = next( + (svc for svc in services if svc["domain"] == "ssh_command"), None + ) + assert ssh_services is not None + assert "execute" in ssh_services.get("services", {}) + + def test_service_returns_response(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """The service returns a structured response with output/error/exit_status.""" + resp = call_service( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo response_test", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 200, resp.text + data = resp.json() + assert "output" in data + assert "error" in data + assert "exit_status" in data + + def test_service_echo_output(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """The service captures stdout from the remote command.""" + resp = call_service( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo service_output_check", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 200, resp.text + assert "service_output_check" in resp.json()["output"] + + def test_service_with_exit_status_error(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """A command that exits with a non-zero code is still returned as 200 with the exit code.""" + resp = call_service( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "exit 1", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 200, resp.text + assert resp.json()["exit_status"] == 1 + + def test_service_requires_integration_setup(self, ha_api: requests.Session) -> None: + """Calling the service without a configured integration returns 400.""" + # Make sure no integration is set up + entries_resp = ha_api.get(f"{HA_URL}/api/config/config_entries/entry") + for entry in entries_resp.json(): + if entry["domain"] == "ssh_command": + ha_api.delete( + f"{HA_URL}/api/config/config_entries/entry/{entry['entry_id']}" + ) + + resp = call_service( + ha_api, + { + "host": "192.0.2.1", + "username": "user", + "password": "pass", + "command": "echo hi", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 400, resp.text + + def test_service_validation_missing_auth(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """The service rejects calls that lack both password and key_file.""" + resp = call_service( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "command": "echo hi", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 400, resp.text + + def test_service_validation_missing_command_and_input(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """The service rejects calls that lack both command and input.""" + resp = call_service( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "check_known_hosts": False, + }, + ) + assert resp.status_code == 400, resp.text + + def test_service_with_timeout_parameter(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """The timeout parameter is accepted and used by the service.""" + resp = call_service( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo timeout_test", + "check_known_hosts": False, + "timeout": 15, + }, + ) + assert resp.status_code == 200, resp.text + assert "timeout_test" in resp.json()["output"] + + def test_service_stderr_in_response(self, ha_api: requests.Session, ensure_integration: Any, ssh_server_1: dict) -> None: + """Stderr output appears in the 'error' field of the service response.""" + resp = call_service( + ha_api, + { + "host": ssh_server_1["host"], + "username": ssh_server_1["username"], + "password": ssh_server_1["password"], + "command": "echo err_msg >&2", + "check_known_hosts": False, + }, + ) + assert resp.status_code == 200, resp.text + assert "err_msg" in resp.json()["error"]