Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions CODE_MAP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# CODE_MAP.md — data-diff Codebase Navigation

> Quick-reference map for AI agents and new contributors. ~150 lines, dense by design.

## Package Layout

```
data_diff/
├── __init__.py # Public API: connect_to_table(), diff_tables(), Algorithm
├── __main__.py # CLI entry point (Click): `data-diff` command
├── version.py # __version__ = "1.0.0"
├── config.py # TOML config parsing (--conf flag)
├── errors.py # Custom exceptions (all dbt-integration and config-related)
├── schema.py # create_schema() factory, RawColumnInfo, Schema type alias
├── utils.py # CaseAwareMapping, CaseInsensitiveDict, ArithString, safezip
├── _compat.py # Compatibility shims (tomllib)
├── queries/ # SQL query builder (AST-based)
│ ├── api.py # User-facing: table(), cte(), join(), leftjoin(), or_()
│ ├── ast_classes.py # AST nodes: Select, Join, Cte, Column, BinOp, DDL stmts
│ ├── base.py # SqeletonError, SKIP sentinel, args_as_tuple()
│ └── extras.py # Checksum, NormalizeAsString (diff-specific query helpers)
├── abcs/ # Abstract base classes
│ ├── database_types.py # DbPath, ColType hierarchy, Collation
│ └── compiler.py # AbstractCompiler, Compilable protocol
├── databases/ # Database drivers (one file per backend)
│ ├── base.py # Database ABC, BaseDialect, connection pooling
│ ├── _connect.py # connect(dsn) → Database instance
│ ├── postgresql.py # PostgreSQL (psycopg2)
│ ├── mysql.py # MySQL (mysql-connector-python)
│ ├── snowflake.py # Snowflake (snowflake-connector-python)
│ ├── bigquery.py # BigQuery (google-cloud-bigquery)
│ ├── redshift.py # Redshift (extends PostgreSQL)
│ ├── databricks.py # Databricks (databricks-sql-connector)
│ ├── duckdb.py # DuckDB (duckdb)
│ ├── clickhouse.py # ClickHouse (clickhouse-driver)
│ ├── mssql.py # SQL Server (pyodbc)
│ ├── oracle.py # Oracle (oracledb)
│ ├── trino.py # Trino (trino)
│ ├── presto.py # Presto (presto-python-client)
│ └── vertica.py # Vertica (vertica-python)
├── diff_tables.py # Algorithm enum, TableDiffer ABC, DiffResultWrapper
├── hashdiff_tables.py # HashDiffer: cross-DB bisection diff (checksum + download)
├── joindiff_tables.py # JoinDiffer: same-DB outer-join diff (single query)
├── table_segment.py # TableSegment: key ranges, split_key_space(), checksums
├── info_tree.py # InfoTree: hierarchical diff metadata tracking
├── thread_utils.py # PriorityThreadPoolExecutor, ThreadedYielder
├── query_utils.py # drop_table(), append_to_table() helpers
├── format.py # Output formatting (JSONL, human-readable)
├── parse_time.py # Relative time parsing ("5min", "1day")
├── lexicographic_space.py # String key range splitting
├── dbt.py # dbt integration: dbt_diff()
├── dbt_parser.py # DbtParser: manifest/profile parsing
└── dbt_config_validators.py # dbt config validation
```

## Entry Points

| Entry Point | Location | Description |
|-------------|----------|-------------|
| CLI | `__main__.py:main()` | Click command: `data-diff db1 table1 db2 table2 -k id` |
| Python API | `__init__.py:diff_tables()` | Primary function: takes two `TableSegment`s, returns diff iterator |
| Python API | `__init__.py:connect_to_table()` | Convenience: DSN string → `TableSegment` |
| pyproject.toml | `[project.scripts]` | `data-diff = "data_diff.__main__:main"` |

## Core Data Flow

```
CLI / API call
connect(dsn) → Database instance
db.query_table_schema() → Schema (column names + types)
TableSegment(db, path, key_columns, schema)
Algorithm selection (AUTO → JOINDIFF if same-db, else HASHDIFF)
├─── HASHDIFF (cross-database) ──────────────────────────┐
│ 1. Checksum full table on both sides │
│ 2. If mismatch → bisect key range (factor=32) │
│ 3. Recurse until segment < threshold (16384 rows) │
│ 4. Download small segments, compare locally │
│ 5. diff_sets() → yield ("+", row) / ("-", row) │
│ │
├─── JOINDIFF (same database) ───────────────────────────┐
│ 1. FULL OUTER JOIN on key columns │
│ 2. CASE WHEN to detect exclusive/changed rows │
│ 3. Optional: materialize results to temp table │
│ 4. Stream results → yield ("+", row) / ("-", row) │
│ │
DiffResultWrapper (streaming iterator + stats)
Output: human-readable / JSONL / stats summary
```

## Query Builder Architecture

```
api.py (user-facing functions)
│ table(), cte(), join(), select(), where()
ast_classes.py (immutable AST nodes)
│ Select, Join, Cte, Column, BinOp, Code, ...
Database.dialect.compile(compiler, node)
│ Each driver overrides compilation for its SQL dialect
Raw SQL string → db.query(sql)
```

Key pattern: Most AST nodes use `@attrs.define(frozen=True)` (some like `ExprNode`, `_ResolveColumn` are mutable). Where immutability holds, `attrs.evolve()` returns modified copies.

## Test Organization

```
tests/
├── test_query.py # Query AST construction + CTE schema tests
├── test_sql.py # SQL generation across dialects
├── test_database.py # DB integration tests (skip with --ignore)
├── test_diff_tables.py # Diff framework + threading
├── test_joindiff.py # JoinDiffer algorithm
├── test_utils.py # Utility functions (UUID, case-aware dicts)
├── test_thread_utils.py # PriorityThreadPoolExecutor
├── test_api.py # Public API surface
├── test_cli.py # CLI argument parsing
├── test_duckdb.py # DuckDB-specific
├── test_postgresql.py # PostgreSQL-specific
├── test_mssql.py # SQL Server-specific
├── test_parse_time.py # Time parsing
├── test_datetime_parsing.py # Datetime parsing
├── test_format.py # Output formatting
├── test_config.py # TOML config
├── test_dbt*.py # dbt integration (3 files)
├── test_mesh.py # Multi-dim segmentation
├── test_main.py # CLI main function
├── test_database_types.py # Column type system
├── common.py # Shared fixtures
└── conftest.py # pytest configuration
```

Run unit tests: `uv run pytest tests/ -x -q --ignore=tests/test_database.py`
Run query tests only: `uv run pytest tests/test_query.py -x -q`

## Key Types

| Type | Location | Purpose |
|------|----------|---------|
| `Schema` | `schema.py` | Type alias for `CaseAwareMapping` (used as `CaseAwareMapping[str, ColType]`) |
| `TableSegment` | `table_segment.py` | Table + key columns + range bounds |
| `DbPath` | `abcs/database_types.py` | `tuple[str, ...]` — schema-qualified table path |
| `Database` | `databases/base.py` | ABC for all database drivers |
| `ExprNode` | `queries/ast_classes.py` | Base class for all query AST nodes |
| `ITable` | `queries/ast_classes.py` | Interface for table-like query nodes |
37 changes: 27 additions & 10 deletions data_diff/queries/ast_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from data_diff.abcs.database_types import DbPath
from data_diff.queries.base import SKIP, SqeletonError, args_as_tuple
from data_diff.schema import Schema
from data_diff.utils import ArithString
from data_diff.utils import ArithString, safezip


class QueryBuilderError(SqeletonError):
Expand Down Expand Up @@ -482,10 +482,12 @@ class Join(ExprNode, ITable, Root):
@property
def schema(self) -> Schema:
if not self.columns:
raise ValueError("Join must specify columns explicitly (SELECT * not yet implemented).")
raise QueryBuilderError("Join must specify columns explicitly (SELECT * not yet implemented).")
# No cross-table type validation needed: join combines columns from both tables rather than unioning rows
s = self.source_tables[0].schema
return type(s)({c.name: c.type for c in self.columns})
if s is None:
raise QueryBuilderError("Cannot resolve Join schema: source table has no schema defined")
return s.new({c.name: c.type for c in self.columns})

def on(self, *exprs) -> Self:
"""Add an ON clause, for filtering the result of the cartesian product (i.e. the JOIN)"""
Expand Down Expand Up @@ -596,7 +598,7 @@ def schema(self) -> Schema:
s = self.table.schema
if s is None or self.columns is None:
return s
return type(s)({c.name: c.type for c in self.columns})
return s.new({c.name: c.type for c in self.columns})

@classmethod
def make(cls, table: ITable, distinct: bool = SKIP, optimizer_hints: str = SKIP, **kwargs):
Expand Down Expand Up @@ -641,24 +643,39 @@ def make(cls, table: ITable, distinct: bool = SKIP, optimizer_hints: str = SKIP,
class Cte(ExprNode, ITable):
table: Expr
name: str | None = None
params: Sequence[str] | None = None
params: Sequence[str] | None = attrs.field(default=None)

@params.validator
def _validate_params(self, attribute, value):
if value is not None:
for i, p in enumerate(value):
if not isinstance(p, str):
raise QB_TypeError(f"CTE params[{i}] must be str, got {type(p).__name__}")

@property
def source_table(self) -> "ITable":
return self.table

@property
def schema(self) -> Schema:
s = self.table.schema
def schema(self) -> Schema | None:
try:
s = self.table.schema
except (QueryBuilderError, ValueError) as exc:
# ValueError caught because some ITable.schema implementations (e.g. TableOp)
# still raise ValueError for validation errors pre-dating QueryBuilderError.
name_hint = f" '{self.name}'" if self.name else ""
raise QueryBuilderError(f"Failed to resolve schema for CTE{name_hint}: {exc}") from exc
if not self.params:
return s
if s is None:
raise QueryBuilderError(f"CTE params were provided ({self.params!r}) but the source table has no schema")
if len(self.params) != len(s):
try:
pairs = dict(safezip(self.params, s.values()))
except ValueError as e:
raise QueryBuilderError(
f"CTE params length ({len(self.params)}) does not match source schema length ({len(s)})"
)
result = type(s)(dict(zip(self.params, s.values())))
) from e
result = s.new(pairs)
if len(result) != len(s):
raise QueryBuilderError(f"CTE params contain duplicate column names: {self.params!r}")
return result
Expand Down
Loading