From b8c2869d55283c3d4d298e2ef3fe16e3aa731bb1 Mon Sep 17 00:00:00 2001 From: Daniel Song Date: Tue, 3 Mar 2026 20:21:43 -0800 Subject: [PATCH 1/3] fix: harden schema properties and add CODE_MAP.md Resolves #22 and #23: harden schema property error handling in Cte, Join, and Select classes. Replace type(s)(...) with s.new(...), use safezip for length validation, add CTE params type validator, and wrap inner schema resolution with error context. Add CODE_MAP.md for AI agent and contributor codebase navigation. Co-Authored-By: Claude Opus 4.6 --- CODE_MAP.md | 165 +++++++++++++++++++++++++++++++ data_diff/queries/ast_classes.py | 29 ++++-- 2 files changed, 185 insertions(+), 9 deletions(-) create mode 100644 CODE_MAP.md diff --git a/CODE_MAP.md b/CODE_MAP.md new file mode 100644 index 00000000..7e79fc08 --- /dev/null +++ b/CODE_MAP.md @@ -0,0 +1,165 @@ +# CODE_MAP.md — data-diff Codebase Navigation + +> Quick-reference map for AI agents and new contributors. ~150 lines, dense by design. + +## Package Layout + +``` +data_diff/ +├── __init__.py # Public API: connect_to_table(), diff_tables(), Algorithm +├── __main__.py # CLI entry point (Click): `data-diff` command +├── version.py # __version__ = "1.0.0" +├── config.py # TOML config parsing (--conf flag) +├── errors.py # Custom exceptions (dbt + core) +├── schema.py # create_schema() factory, RawColumnInfo, Schema type alias +├── utils.py # CaseAwareMapping, CaseInsensitiveDict, ArithString, safezip +├── _compat.py # Compatibility shims (tomllib) +│ +├── queries/ # SQL query builder (AST-based) +│ ├── api.py # User-facing: table(), cte(), join(), leftjoin(), or_() +│ ├── ast_classes.py # AST nodes: Select, Join, Cte, Column, BinOp, DDL stmts +│ ├── base.py # SqeletonError, SKIP sentinel, args_as_tuple() +│ └── extras.py # Checksum, NormalizeAsString (diff-specific query helpers) +│ +├── abcs/ # Abstract base classes +│ ├── database_types.py # DbPath, ColType hierarchy, Collation +│ └── compiler.py # AbstractCompiler, Compilable protocol +│ +├── databases/ # Database drivers (one file per backend) +│ ├── base.py # Database ABC, BaseDialect, connection pooling +│ ├── _connect.py # connect(dsn) → Database instance +│ ├── postgresql.py # PostgreSQL (psycopg2) +│ ├── mysql.py # MySQL (mysql-connector-python) +│ ├── snowflake.py # Snowflake (snowflake-connector-python) +│ ├── bigquery.py # BigQuery (google-cloud-bigquery) +│ ├── redshift.py # Redshift (extends PostgreSQL) +│ ├── databricks.py # Databricks (databricks-sql-connector) +│ ├── duckdb.py # DuckDB (duckdb) +│ ├── clickhouse.py # ClickHouse (clickhouse-driver) +│ ├── mssql.py # SQL Server (pyodbc) +│ ├── oracle.py # Oracle (oracledb) +│ ├── trino.py # Trino (trino) +│ ├── presto.py # Presto (presto-python-client) +│ └── vertica.py # Vertica (vertica-python) +│ +├── diff_tables.py # Algorithm enum, TableDiffer ABC, DiffResultWrapper +├── hashdiff_tables.py # HashDiffer: cross-DB bisection diff (checksum + download) +├── joindiff_tables.py # JoinDiffer: same-DB outer-join diff (single query) +├── table_segment.py # TableSegment: key ranges, split_key_space(), checksums +├── info_tree.py # InfoTree: hierarchical diff metadata tracking +│ +├── thread_utils.py # PriorityThreadPoolExecutor, ThreadedYielder +├── query_utils.py # drop_table(), append_to_table() helpers +├── format.py # Output formatting (JSONL, human-readable) +├── parse_time.py # Relative time parsing ("5min", "1day") +├── lexicographic_space.py # String key range splitting +│ +├── dbt.py # dbt integration: dbt_diff() +├── dbt_parser.py # DbtParser: manifest/profile parsing +└── dbt_config_validators.py # dbt config validation +``` + +## Entry Points + +| Entry Point | Location | Description | +|-------------|----------|-------------| +| CLI | `__main__.py:main()` | Click command: `data-diff db1 table1 db2 table2 -k id` | +| Python API | `__init__.py:diff_tables()` | Primary function: takes two `TableSegment`s, returns diff iterator | +| Python API | `__init__.py:connect_to_table()` | Convenience: DSN string → `TableSegment` | +| pyproject.toml | `[project.scripts]` | `data-diff = "data_diff.__main__:main"` | + +## Core Data Flow + +``` +CLI / API call + │ + ▼ +connect(dsn) → Database instance + │ + ▼ +db.query_table_schema() → Schema (column names + types) + │ + ▼ +TableSegment(db, path, key_columns, schema) + │ + ▼ +Algorithm selection (AUTO → JOINDIFF if same-db, else HASHDIFF) + │ + ├─── HASHDIFF (cross-database) ──────────────────────────┐ + │ 1. Checksum full table on both sides │ + │ 2. If mismatch → bisect key range (factor=32) │ + │ 3. Recurse until segment < threshold (16384 rows) │ + │ 4. Download small segments, compare locally │ + │ 5. diff_sets() → yield ("+", row) / ("-", row) │ + │ │ + ├─── JOINDIFF (same database) ───────────────────────────┐ + │ 1. FULL OUTER JOIN on key columns │ + │ 2. CASE WHEN to detect exclusive/changed rows │ + │ 3. Optional: materialize results to temp table │ + │ 4. Stream results → yield ("+", row) / ("-", row) │ + │ │ + ▼ +DiffResultWrapper (streaming iterator + stats) + │ + ▼ +Output: human-readable / JSONL / stats summary +``` + +## Query Builder Architecture + +``` +api.py (user-facing functions) + │ table(), cte(), join(), select(), where() + ▼ +ast_classes.py (immutable AST nodes) + │ Select, Join, Cte, Column, BinOp, Code, ... + ▼ +Database.dialect.compile(compiler, node) + │ Each driver overrides compilation for its SQL dialect + ▼ +Raw SQL string → db.query(sql) +``` + +Key pattern: AST nodes are `@attrs.define(frozen=True)` — modifications return new instances via `attrs.evolve()`. + +## Test Organization + +``` +tests/ +├── test_query.py # Query AST construction + CTE schema tests +├── test_sql.py # SQL generation across dialects +├── test_database.py # DB integration tests (skip with --ignore) +├── test_diff_tables.py # Diff framework + threading +├── test_joindiff.py # JoinDiffer algorithm +├── test_utils.py # Utility functions (UUID, case-aware dicts) +├── test_thread_utils.py # PriorityThreadPoolExecutor +├── test_api.py # Public API surface +├── test_cli.py # CLI argument parsing +├── test_duckdb.py # DuckDB-specific +├── test_postgresql.py # PostgreSQL-specific +├── test_mssql.py # SQL Server-specific +├── test_parse_time.py # Time parsing +├── test_datetime_parsing.py # Datetime parsing +├── test_format.py # Output formatting +├── test_config.py # TOML config +├── test_dbt*.py # dbt integration (3 files) +├── test_mesh.py # Multi-dim segmentation +├── test_main.py # CLI main function +├── test_database_types.py # Column type system +├── common.py # Shared fixtures +└── conftest.py # pytest configuration +``` + +Run unit tests: `uv run pytest tests/ -x -q --ignore=tests/test_database.py` +Run query tests only: `uv run pytest tests/test_query.py -x -q` + +## Key Types + +| Type | Location | Purpose | +|------|----------|---------| +| `Schema` | `schema.py` | Type alias for `CaseAwareMapping[str, ColType]` | +| `TableSegment` | `table_segment.py` | Table + key columns + range bounds | +| `DbPath` | `abcs/database_types.py` | `tuple[str, ...]` — schema-qualified table path | +| `Database` | `databases/base.py` | ABC for all database drivers | +| `ExprNode` | `queries/ast_classes.py` | Base class for all query AST nodes | +| `ITable` | `queries/ast_classes.py` | Interface for table-like query nodes | diff --git a/data_diff/queries/ast_classes.py b/data_diff/queries/ast_classes.py index a1d81992..458c215e 100644 --- a/data_diff/queries/ast_classes.py +++ b/data_diff/queries/ast_classes.py @@ -8,7 +8,7 @@ from data_diff.abcs.database_types import DbPath from data_diff.queries.base import SKIP, SqeletonError, args_as_tuple from data_diff.schema import Schema -from data_diff.utils import ArithString +from data_diff.utils import ArithString, safezip class QueryBuilderError(SqeletonError): @@ -485,7 +485,7 @@ def schema(self) -> Schema: raise ValueError("Join must specify columns explicitly (SELECT * not yet implemented).") # No cross-table type validation needed: join combines columns from both tables rather than unioning rows s = self.source_tables[0].schema - return type(s)({c.name: c.type for c in self.columns}) + return s.new({c.name: c.type for c in self.columns}) def on(self, *exprs) -> Self: """Add an ON clause, for filtering the result of the cartesian product (i.e. the JOIN)""" @@ -596,7 +596,7 @@ def schema(self) -> Schema: s = self.table.schema if s is None or self.columns is None: return s - return type(s)({c.name: c.type for c in self.columns}) + return s.new({c.name: c.type for c in self.columns}) @classmethod def make(cls, table: ITable, distinct: bool = SKIP, optimizer_hints: str = SKIP, **kwargs): @@ -641,24 +641,35 @@ def make(cls, table: ITable, distinct: bool = SKIP, optimizer_hints: str = SKIP, class Cte(ExprNode, ITable): table: Expr name: str | None = None - params: Sequence[str] | None = None + params: Sequence[str] | None = attrs.field(default=None) + + @params.validator + def _validate_params(self, attribute, value): + if value is not None: + for i, p in enumerate(value): + if not isinstance(p, str): + raise TypeError(f"CTE params[{i}] must be str, got {type(p).__name__}") @property def source_table(self) -> "ITable": return self.table @property - def schema(self) -> Schema: - s = self.table.schema + def schema(self) -> Schema | None: + try: + s = self.table.schema + except QueryBuilderError as exc: + raise QueryBuilderError(f"Failed to resolve schema for CTE: {exc}") from exc if not self.params: return s if s is None: raise QueryBuilderError(f"CTE params were provided ({self.params!r}) but the source table has no schema") - if len(self.params) != len(s): + try: + result = s.new(dict(safezip(self.params, s.values()))) + except ValueError as e: raise QueryBuilderError( f"CTE params length ({len(self.params)}) does not match source schema length ({len(s)})" - ) - result = type(s)(dict(zip(self.params, s.values()))) + ) from e if len(result) != len(s): raise QueryBuilderError(f"CTE params contain duplicate column names: {self.params!r}") return result From 032f90222b5fab411eba49dcf94668fb37d06d7b Mon Sep 17 00:00:00 2001 From: Daniel Song Date: Tue, 3 Mar 2026 20:27:09 -0800 Subject: [PATCH 2/3] fix: address PR review findings - Add null guard to Join.schema to prevent AttributeError on None schema - Change Join's ValueError to QueryBuilderError for consistent error hierarchy - Use QB_TypeError in CTE params validator instead of plain TypeError - Widen Cte.schema catch to include ValueError from inner schema resolution - Include CTE name in wrapped error message for better diagnostics - Narrow try block around safezip to avoid masking errors from s.new() - Fix CODE_MAP.md inaccuracies: frozen claim, Schema type alias, errors.py Co-Authored-By: Claude Opus 4.6 --- CODE_MAP.md | 6 +++--- data_diff/queries/ast_classes.py | 14 +++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/CODE_MAP.md b/CODE_MAP.md index 7e79fc08..206ab061 100644 --- a/CODE_MAP.md +++ b/CODE_MAP.md @@ -10,7 +10,7 @@ data_diff/ ├── __main__.py # CLI entry point (Click): `data-diff` command ├── version.py # __version__ = "1.0.0" ├── config.py # TOML config parsing (--conf flag) -├── errors.py # Custom exceptions (dbt + core) +├── errors.py # Custom exceptions (all dbt-integration and config-related) ├── schema.py # create_schema() factory, RawColumnInfo, Schema type alias ├── utils.py # CaseAwareMapping, CaseInsensitiveDict, ArithString, safezip ├── _compat.py # Compatibility shims (tomllib) @@ -120,7 +120,7 @@ Database.dialect.compile(compiler, node) Raw SQL string → db.query(sql) ``` -Key pattern: AST nodes are `@attrs.define(frozen=True)` — modifications return new instances via `attrs.evolve()`. +Key pattern: Most AST nodes use `@attrs.define(frozen=True)` (some like `ExprNode`, `_ResolveColumn` are mutable). Where immutability holds, `attrs.evolve()` returns modified copies. ## Test Organization @@ -157,7 +157,7 @@ Run query tests only: `uv run pytest tests/test_query.py -x -q` | Type | Location | Purpose | |------|----------|---------| -| `Schema` | `schema.py` | Type alias for `CaseAwareMapping[str, ColType]` | +| `Schema` | `schema.py` | Type alias for `CaseAwareMapping` (used as `CaseAwareMapping[str, ColType]`) | | `TableSegment` | `table_segment.py` | Table + key columns + range bounds | | `DbPath` | `abcs/database_types.py` | `tuple[str, ...]` — schema-qualified table path | | `Database` | `databases/base.py` | ABC for all database drivers | diff --git a/data_diff/queries/ast_classes.py b/data_diff/queries/ast_classes.py index 458c215e..e8e6dea8 100644 --- a/data_diff/queries/ast_classes.py +++ b/data_diff/queries/ast_classes.py @@ -482,9 +482,11 @@ class Join(ExprNode, ITable, Root): @property def schema(self) -> Schema: if not self.columns: - raise ValueError("Join must specify columns explicitly (SELECT * not yet implemented).") + raise QueryBuilderError("Join must specify columns explicitly (SELECT * not yet implemented).") # No cross-table type validation needed: join combines columns from both tables rather than unioning rows s = self.source_tables[0].schema + if s is None: + raise QueryBuilderError("Cannot resolve Join schema: source table has no schema defined") return s.new({c.name: c.type for c in self.columns}) def on(self, *exprs) -> Self: @@ -648,7 +650,7 @@ def _validate_params(self, attribute, value): if value is not None: for i, p in enumerate(value): if not isinstance(p, str): - raise TypeError(f"CTE params[{i}] must be str, got {type(p).__name__}") + raise QB_TypeError(f"CTE params[{i}] must be str, got {type(p).__name__}") @property def source_table(self) -> "ITable": @@ -658,18 +660,20 @@ def source_table(self) -> "ITable": def schema(self) -> Schema | None: try: s = self.table.schema - except QueryBuilderError as exc: - raise QueryBuilderError(f"Failed to resolve schema for CTE: {exc}") from exc + except (QueryBuilderError, ValueError) as exc: + name_hint = f" '{self.name}'" if self.name else "" + raise QueryBuilderError(f"Failed to resolve schema for CTE{name_hint}") from exc if not self.params: return s if s is None: raise QueryBuilderError(f"CTE params were provided ({self.params!r}) but the source table has no schema") try: - result = s.new(dict(safezip(self.params, s.values()))) + pairs = dict(safezip(self.params, s.values())) except ValueError as e: raise QueryBuilderError( f"CTE params length ({len(self.params)}) does not match source schema length ({len(s)})" ) from e + result = s.new(pairs) if len(result) != len(s): raise QueryBuilderError(f"CTE params contain duplicate column names: {self.params!r}") return result From 9b02ec62ad4a0fceb148ac616a48968fb368d406 Mon Sep 17 00:00:00 2001 From: Daniel Song Date: Tue, 3 Mar 2026 20:50:19 -0800 Subject: [PATCH 3/3] fix: include original error in CTE schema wrapping message Add the original exception text to the wrapped QueryBuilderError message so callers inspecting str(e) see the root cause without needing to walk __cause__. Also document why ValueError is in the catch tuple. Co-Authored-By: Claude Opus 4.6 --- data_diff/queries/ast_classes.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/data_diff/queries/ast_classes.py b/data_diff/queries/ast_classes.py index e8e6dea8..2df227dc 100644 --- a/data_diff/queries/ast_classes.py +++ b/data_diff/queries/ast_classes.py @@ -661,8 +661,10 @@ def schema(self) -> Schema | None: try: s = self.table.schema except (QueryBuilderError, ValueError) as exc: + # ValueError caught because some ITable.schema implementations (e.g. TableOp) + # still raise ValueError for validation errors pre-dating QueryBuilderError. name_hint = f" '{self.name}'" if self.name else "" - raise QueryBuilderError(f"Failed to resolve schema for CTE{name_hint}") from exc + raise QueryBuilderError(f"Failed to resolve schema for CTE{name_hint}: {exc}") from exc if not self.params: return s if s is None: