diff --git a/CODE_MAP.md b/CODE_MAP.md new file mode 100644 index 00000000..206ab061 --- /dev/null +++ b/CODE_MAP.md @@ -0,0 +1,165 @@ +# CODE_MAP.md — data-diff Codebase Navigation + +> Quick-reference map for AI agents and new contributors. ~150 lines, dense by design. + +## Package Layout + +``` +data_diff/ +├── __init__.py # Public API: connect_to_table(), diff_tables(), Algorithm +├── __main__.py # CLI entry point (Click): `data-diff` command +├── version.py # __version__ = "1.0.0" +├── config.py # TOML config parsing (--conf flag) +├── errors.py # Custom exceptions (all dbt-integration and config-related) +├── schema.py # create_schema() factory, RawColumnInfo, Schema type alias +├── utils.py # CaseAwareMapping, CaseInsensitiveDict, ArithString, safezip +├── _compat.py # Compatibility shims (tomllib) +│ +├── queries/ # SQL query builder (AST-based) +│ ├── api.py # User-facing: table(), cte(), join(), leftjoin(), or_() +│ ├── ast_classes.py # AST nodes: Select, Join, Cte, Column, BinOp, DDL stmts +│ ├── base.py # SqeletonError, SKIP sentinel, args_as_tuple() +│ └── extras.py # Checksum, NormalizeAsString (diff-specific query helpers) +│ +├── abcs/ # Abstract base classes +│ ├── database_types.py # DbPath, ColType hierarchy, Collation +│ └── compiler.py # AbstractCompiler, Compilable protocol +│ +├── databases/ # Database drivers (one file per backend) +│ ├── base.py # Database ABC, BaseDialect, connection pooling +│ ├── _connect.py # connect(dsn) → Database instance +│ ├── postgresql.py # PostgreSQL (psycopg2) +│ ├── mysql.py # MySQL (mysql-connector-python) +│ ├── snowflake.py # Snowflake (snowflake-connector-python) +│ ├── bigquery.py # BigQuery (google-cloud-bigquery) +│ ├── redshift.py # Redshift (extends PostgreSQL) +│ ├── databricks.py # Databricks (databricks-sql-connector) +│ ├── duckdb.py # DuckDB (duckdb) +│ ├── clickhouse.py # ClickHouse (clickhouse-driver) +│ ├── mssql.py # SQL Server (pyodbc) +│ ├── oracle.py # Oracle (oracledb) +│ ├── trino.py # Trino (trino) +│ ├── presto.py # Presto (presto-python-client) +│ └── vertica.py # Vertica (vertica-python) +│ +├── diff_tables.py # Algorithm enum, TableDiffer ABC, DiffResultWrapper +├── hashdiff_tables.py # HashDiffer: cross-DB bisection diff (checksum + download) +├── joindiff_tables.py # JoinDiffer: same-DB outer-join diff (single query) +├── table_segment.py # TableSegment: key ranges, split_key_space(), checksums +├── info_tree.py # InfoTree: hierarchical diff metadata tracking +│ +├── thread_utils.py # PriorityThreadPoolExecutor, ThreadedYielder +├── query_utils.py # drop_table(), append_to_table() helpers +├── format.py # Output formatting (JSONL, human-readable) +├── parse_time.py # Relative time parsing ("5min", "1day") +├── lexicographic_space.py # String key range splitting +│ +├── dbt.py # dbt integration: dbt_diff() +├── dbt_parser.py # DbtParser: manifest/profile parsing +└── dbt_config_validators.py # dbt config validation +``` + +## Entry Points + +| Entry Point | Location | Description | +|-------------|----------|-------------| +| CLI | `__main__.py:main()` | Click command: `data-diff db1 table1 db2 table2 -k id` | +| Python API | `__init__.py:diff_tables()` | Primary function: takes two `TableSegment`s, returns diff iterator | +| Python API | `__init__.py:connect_to_table()` | Convenience: DSN string → `TableSegment` | +| pyproject.toml | `[project.scripts]` | `data-diff = "data_diff.__main__:main"` | + +## Core Data Flow + +``` +CLI / API call + │ + ▼ +connect(dsn) → Database instance + │ + ▼ +db.query_table_schema() → Schema (column names + types) + │ + ▼ +TableSegment(db, path, key_columns, schema) + │ + ▼ +Algorithm selection (AUTO → JOINDIFF if same-db, else HASHDIFF) + │ + ├─── HASHDIFF (cross-database) ──────────────────────────┐ + │ 1. Checksum full table on both sides │ + │ 2. If mismatch → bisect key range (factor=32) │ + │ 3. Recurse until segment < threshold (16384 rows) │ + │ 4. Download small segments, compare locally │ + │ 5. diff_sets() → yield ("+", row) / ("-", row) │ + │ │ + ├─── JOINDIFF (same database) ───────────────────────────┐ + │ 1. FULL OUTER JOIN on key columns │ + │ 2. CASE WHEN to detect exclusive/changed rows │ + │ 3. Optional: materialize results to temp table │ + │ 4. Stream results → yield ("+", row) / ("-", row) │ + │ │ + ▼ +DiffResultWrapper (streaming iterator + stats) + │ + ▼ +Output: human-readable / JSONL / stats summary +``` + +## Query Builder Architecture + +``` +api.py (user-facing functions) + │ table(), cte(), join(), select(), where() + ▼ +ast_classes.py (immutable AST nodes) + │ Select, Join, Cte, Column, BinOp, Code, ... + ▼ +Database.dialect.compile(compiler, node) + │ Each driver overrides compilation for its SQL dialect + ▼ +Raw SQL string → db.query(sql) +``` + +Key pattern: Most AST nodes use `@attrs.define(frozen=True)` (some like `ExprNode`, `_ResolveColumn` are mutable). Where immutability holds, `attrs.evolve()` returns modified copies. + +## Test Organization + +``` +tests/ +├── test_query.py # Query AST construction + CTE schema tests +├── test_sql.py # SQL generation across dialects +├── test_database.py # DB integration tests (skip with --ignore) +├── test_diff_tables.py # Diff framework + threading +├── test_joindiff.py # JoinDiffer algorithm +├── test_utils.py # Utility functions (UUID, case-aware dicts) +├── test_thread_utils.py # PriorityThreadPoolExecutor +├── test_api.py # Public API surface +├── test_cli.py # CLI argument parsing +├── test_duckdb.py # DuckDB-specific +├── test_postgresql.py # PostgreSQL-specific +├── test_mssql.py # SQL Server-specific +├── test_parse_time.py # Time parsing +├── test_datetime_parsing.py # Datetime parsing +├── test_format.py # Output formatting +├── test_config.py # TOML config +├── test_dbt*.py # dbt integration (3 files) +├── test_mesh.py # Multi-dim segmentation +├── test_main.py # CLI main function +├── test_database_types.py # Column type system +├── common.py # Shared fixtures +└── conftest.py # pytest configuration +``` + +Run unit tests: `uv run pytest tests/ -x -q --ignore=tests/test_database.py` +Run query tests only: `uv run pytest tests/test_query.py -x -q` + +## Key Types + +| Type | Location | Purpose | +|------|----------|---------| +| `Schema` | `schema.py` | Type alias for `CaseAwareMapping` (used as `CaseAwareMapping[str, ColType]`) | +| `TableSegment` | `table_segment.py` | Table + key columns + range bounds | +| `DbPath` | `abcs/database_types.py` | `tuple[str, ...]` — schema-qualified table path | +| `Database` | `databases/base.py` | ABC for all database drivers | +| `ExprNode` | `queries/ast_classes.py` | Base class for all query AST nodes | +| `ITable` | `queries/ast_classes.py` | Interface for table-like query nodes | diff --git a/data_diff/queries/ast_classes.py b/data_diff/queries/ast_classes.py index a1d81992..2df227dc 100644 --- a/data_diff/queries/ast_classes.py +++ b/data_diff/queries/ast_classes.py @@ -8,7 +8,7 @@ from data_diff.abcs.database_types import DbPath from data_diff.queries.base import SKIP, SqeletonError, args_as_tuple from data_diff.schema import Schema -from data_diff.utils import ArithString +from data_diff.utils import ArithString, safezip class QueryBuilderError(SqeletonError): @@ -482,10 +482,12 @@ class Join(ExprNode, ITable, Root): @property def schema(self) -> Schema: if not self.columns: - raise ValueError("Join must specify columns explicitly (SELECT * not yet implemented).") + raise QueryBuilderError("Join must specify columns explicitly (SELECT * not yet implemented).") # No cross-table type validation needed: join combines columns from both tables rather than unioning rows s = self.source_tables[0].schema - return type(s)({c.name: c.type for c in self.columns}) + if s is None: + raise QueryBuilderError("Cannot resolve Join schema: source table has no schema defined") + return s.new({c.name: c.type for c in self.columns}) def on(self, *exprs) -> Self: """Add an ON clause, for filtering the result of the cartesian product (i.e. the JOIN)""" @@ -596,7 +598,7 @@ def schema(self) -> Schema: s = self.table.schema if s is None or self.columns is None: return s - return type(s)({c.name: c.type for c in self.columns}) + return s.new({c.name: c.type for c in self.columns}) @classmethod def make(cls, table: ITable, distinct: bool = SKIP, optimizer_hints: str = SKIP, **kwargs): @@ -641,24 +643,39 @@ def make(cls, table: ITable, distinct: bool = SKIP, optimizer_hints: str = SKIP, class Cte(ExprNode, ITable): table: Expr name: str | None = None - params: Sequence[str] | None = None + params: Sequence[str] | None = attrs.field(default=None) + + @params.validator + def _validate_params(self, attribute, value): + if value is not None: + for i, p in enumerate(value): + if not isinstance(p, str): + raise QB_TypeError(f"CTE params[{i}] must be str, got {type(p).__name__}") @property def source_table(self) -> "ITable": return self.table @property - def schema(self) -> Schema: - s = self.table.schema + def schema(self) -> Schema | None: + try: + s = self.table.schema + except (QueryBuilderError, ValueError) as exc: + # ValueError caught because some ITable.schema implementations (e.g. TableOp) + # still raise ValueError for validation errors pre-dating QueryBuilderError. + name_hint = f" '{self.name}'" if self.name else "" + raise QueryBuilderError(f"Failed to resolve schema for CTE{name_hint}: {exc}") from exc if not self.params: return s if s is None: raise QueryBuilderError(f"CTE params were provided ({self.params!r}) but the source table has no schema") - if len(self.params) != len(s): + try: + pairs = dict(safezip(self.params, s.values())) + except ValueError as e: raise QueryBuilderError( f"CTE params length ({len(self.params)}) does not match source schema length ({len(s)})" - ) - result = type(s)(dict(zip(self.params, s.values()))) + ) from e + result = s.new(pairs) if len(result) != len(s): raise QueryBuilderError(f"CTE params contain duplicate column names: {self.params!r}") return result