Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ jobs:
run: uv tool run ty check --python-version 3.10

- name: Build the stack
run: docker compose up -d --wait mysql postgres presto trino clickhouse
# --profile full unlocks profile-gated clickhouse; only named services start
run: docker compose --profile full up -d --wait mysql postgres clickhouse

- name: Run tests
env:
Expand Down
45 changes: 45 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
.PHONY: up up-full down test-unit test demo

## Start PostgreSQL + MySQL (lightweight, fast startup)
up:
docker compose up -d --wait postgres mysql

## Start all services including ClickHouse, Presto, Trino, Vertica
up-full:
docker compose --profile full up -d --wait

## Stop all services and remove volumes
down:
docker compose --profile full down -v

## Run unit tests (no database required)
test-unit:
uv run pytest tests/test_query.py tests/test_utils.py -x

## Run full test suite against PG + MySQL (starts containers if needed)
## To also test Presto/Trino/Vertica, run `make up-full` first and set:
## export DATADIFF_PRESTO_URI="presto://test@localhost:8080/memory/default"
## export DATADIFF_TRINO_URI="trino://test@localhost:8081/memory/default"
## export DATADIFF_VERTICA_URI="vertica://vertica:Password1@localhost:5433/vertica"
test: up
uv run pytest tests/ \
-o addopts="--timeout=300 --tb=short" \
--ignore=tests/test_database_types.py \
--ignore=tests/test_dbt_config_validators.py \
--ignore=tests/test_main.py

## Run data-diff against seed data to showcase diffing
demo: up
@echo "=== PostgreSQL: ratings_source vs ratings_target ==="
uv run python -m data_diff \
postgresql://postgres:Password1@localhost/postgres \
ratings_source ratings_target \
--key-columns id \
--columns rating
@echo ""
@echo "=== MySQL: ratings_source vs ratings_target ==="
uv run python -m data_diff \
mysql://mysql:Password1@localhost/mysql \
ratings_source ratings_target \
--key-columns id \
--columns rating
31 changes: 17 additions & 14 deletions data_diff/databases/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
CHECKSUM_HEXDIGITS,
CHECKSUM_OFFSET,
MD5_HEXDIGITS,
TIMESTAMP_PRECISION_POS,
BaseDialect,
ConnectError,
ThreadedDatabase,
Expand Down Expand Up @@ -115,8 +114,14 @@ def md5_as_hex(self, s: str) -> str:
return f"md5({s})"

def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
def _add_padding(coltype: TemporalType, timestamp6: str):
return f"RPAD(LEFT({timestamp6}, {TIMESTAMP_PRECISION_POS + coltype.precision}), {TIMESTAMP_PRECISION_POS + 6}, '0')"
def _truncate_and_pad(coltype: TemporalType, timestamp6: str):
"""Truncate a 6-digit-precision timestamp to target precision, then zero-pad back to 6 digits."""
truncated = f"LEFT({timestamp6}, length({timestamp6}) - (6 - {coltype.precision}))"
return f"RPAD({truncated}, length({timestamp6}), '0')"

def _zero_pad(coltype: TemporalType, already_truncated: str):
"""Zero-pad an already-truncated timestamp back to 6 fractional digits."""
return f"RPAD({already_truncated}, length({already_truncated}) + (6 - {coltype.precision}), '0')"

try:
is_date = coltype.is_date
Expand All @@ -141,30 +146,28 @@ def _add_padding(coltype: TemporalType, timestamp6: str):
null_case_end = "END"

# 294277 or 4714 BC would be out of range, make sure we can't round to that
# TODO test timezones for overflow?
max_timestamp = "294276-12-31 23:59:59.0000"
min_timestamp = "4713-01-01 00:00:00.00 BC"
timestamp = f"least('{max_timestamp}'::timestamp(6), {value}::timestamp(6))"
timestamp = f"greatest('{min_timestamp}'::timestamp(6), {timestamp})"
ts_type = "timestamptz(6)" if isinstance(coltype, TimestampTZ) else "timestamp(6)"
timestamp = f"least('{max_timestamp}'::{ts_type}, {value}::{ts_type})"
timestamp = f"greatest('{min_timestamp}'::{ts_type}, {timestamp})"

interval = format((0.5 * (10 ** (-coltype.precision))), f".{coltype.precision + 1}f")

rounded_timestamp = (
f"left(to_char(least('{max_timestamp}'::timestamp, {timestamp})"
f"left(to_char(least('{max_timestamp}'::{ts_type}, {timestamp})"
f"+ interval '{interval}', 'YYYY-mm-dd HH24:MI:SS.US'),"
f"length(to_char(least('{max_timestamp}'::timestamp, {timestamp})"
f"length(to_char(least('{max_timestamp}'::{ts_type}, {timestamp})"
f"+ interval '{interval}', 'YYYY-mm-dd HH24:MI:SS.US')) - (6-{coltype.precision}))"
)

padded = _add_padding(coltype, rounded_timestamp)
padded = _zero_pad(coltype, rounded_timestamp)
return f"{null_case_begin} {padded} {null_case_end}"

# TODO years with > 4 digits not padded correctly
# current w/ precision 6: 294276-12-31 23:59:59.0000
# should be 294276-12-31 23:59:59.000000
else:
rounded_timestamp = f"to_char({value}::timestamp(6), 'YYYY-mm-dd HH24:MI:SS.US')"
padded = _add_padding(coltype, rounded_timestamp)
ts_type = "timestamptz(6)" if isinstance(coltype, TimestampTZ) else "timestamp(6)"
rounded_timestamp = f"to_char({value}::{ts_type}, 'YYYY-mm-dd HH24:MI:SS.US')"
padded = _truncate_and_pad(coltype, rounded_timestamp)
return padded

def normalize_number(self, value: str, coltype: FractionalType) -> str:
Expand Down
22 changes: 18 additions & 4 deletions data_diff/queries/ast_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,8 @@ class Join(ExprNode, ITable, Root):
def schema(self) -> Schema:
if not self.columns:
raise ValueError("Join must specify columns explicitly (SELECT * not yet implemented).")
s = self.source_tables[0].schema # TODO validate types match between both tables
# No cross-table type validation needed: join combines columns from both tables rather than unioning rows
s = self.source_tables[0].schema
return type(s)({c.name: c.type for c in self.columns})

def on(self, *exprs) -> Self:
Expand Down Expand Up @@ -553,15 +554,28 @@ class TableOp(ExprNode, ITable, Root):

@property
def type(self):
# TODO ensure types of both tables are compatible
return self.table1.type
t1 = self.table1.type
t2 = self.table2.type
if t1 is None or t2 is None:
return None
if type(t1) is not type(t2):
raise QueryBuilderError(f"Type mismatch in {self.op}: got {type(t1).__name__} and {type(t2).__name__}")
return t1

@property
def schema(self) -> Schema:
s1 = self.table1.schema
s2 = self.table2.schema
if s1 is None or s2 is None:
raise QueryBuilderError(f"Cannot validate {self.op}: one or both tables have no schema defined")
if len(s1) != len(s2):
raise ValueError(f"TableOp requires tables with matching schema lengths, got {len(s1)} and {len(s2)}.")
raise QueryBuilderError(f"Schema length mismatch in {self.op}: got {len(s1)} and {len(s2)} columns")
for (name1, type1), (name2, type2) in zip(s1.items(), s2.items()):
if type(type1) is not type(type2):
raise QueryBuilderError(
f"Type mismatch in {self.op}: column {name1!r} is {type(type1).__name__} "
f"but column {name2!r} is {type(type2).__name__}"
)
return s1


Expand Down
59 changes: 59 additions & 0 deletions dev/seed/mysql/01_seed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
-- Seed data for demonstrating data-diff capabilities.
-- Auto-executed by MySQL on first container startup.

CREATE TABLE ratings_source (
id INT PRIMARY KEY,
user_id INT NOT NULL,
movie_id INT NOT NULL,
rating DECIMAL(2,1) NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE ratings_target (
id INT PRIMARY KEY,
user_id INT NOT NULL,
movie_id INT NOT NULL,
rating DECIMAL(2,1) NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- Populate source with 1000 rows via stored procedure (MySQL lacks generate_series)
DELIMITER //
CREATE PROCEDURE seed_ratings()
BEGIN
DECLARE i INT DEFAULT 1;
WHILE i <= 1000 DO
INSERT INTO ratings_source (id, user_id, movie_id, rating, created_at)
VALUES (
i,
1 + (i % 200),
1 + (i % 50),
1 + (i % 5),
DATE_ADD('2025-01-01', INTERVAL i MINUTE)
);
SET i = i + 1;
END WHILE;
END //
DELIMITER ;

CALL seed_ratings();
DROP PROCEDURE seed_ratings;

-- Copy all rows into target
INSERT INTO ratings_target SELECT * FROM ratings_source;

-- Introduce diffs:
-- 5 deleted rows (IDs 10-14 missing from target)
DELETE FROM ratings_target WHERE id BETWEEN 10 AND 14;

-- 5 extra rows in target only (IDs 1001-1005)
INSERT INTO ratings_target (id, user_id, movie_id, rating, created_at) VALUES
(1001, 201, 51, 4.0, '2025-06-01 00:00:00'),
(1002, 202, 52, 3.0, '2025-06-02 00:00:00'),
(1003, 203, 53, 5.0, '2025-06-03 00:00:00'),
(1004, 204, 54, 2.0, '2025-06-04 00:00:00'),
(1005, 205, 55, 1.0, '2025-06-05 00:00:00');

-- 10 updated ratings (IDs 100-109 have different ratings in target)
UPDATE ratings_target SET rating = rating + 0.5 WHERE id BETWEEN 100 AND 109 AND rating < 5.0;
UPDATE ratings_target SET rating = 1.0 WHERE id BETWEEN 100 AND 109 AND rating >= 5.0;
47 changes: 47 additions & 0 deletions dev/seed/postgres/01_seed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
-- Seed data for demonstrating data-diff capabilities.
-- Auto-executed by PostgreSQL on first container startup.

CREATE TABLE ratings_source (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
movie_id INTEGER NOT NULL,
rating NUMERIC(2,1) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now()
);

CREATE TABLE ratings_target (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
movie_id INTEGER NOT NULL,
rating NUMERIC(2,1) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now()
);

-- Populate source with 1000 rows
INSERT INTO ratings_source (id, user_id, movie_id, rating, created_at)
SELECT
g AS id,
1 + (g % 200) AS user_id,
1 + (g % 50) AS movie_id,
(1 + (g % 5))::NUMERIC(2,1) AS rating,
'2025-01-01'::TIMESTAMP + (g || ' minutes')::INTERVAL AS created_at
FROM generate_series(1, 1000) AS g;

-- Copy all rows into target
INSERT INTO ratings_target SELECT * FROM ratings_source;

-- Introduce diffs:
-- 5 deleted rows (IDs 10-14 missing from target)
DELETE FROM ratings_target WHERE id BETWEEN 10 AND 14;

-- 5 extra rows in target only (IDs 1001-1005)
INSERT INTO ratings_target (id, user_id, movie_id, rating, created_at) VALUES
(1001, 201, 51, 4.0, '2025-06-01 00:00:00'),
(1002, 202, 52, 3.0, '2025-06-02 00:00:00'),
(1003, 203, 53, 5.0, '2025-06-03 00:00:00'),
(1004, 204, 54, 2.0, '2025-06-04 00:00:00'),
(1005, 205, 55, 1.0, '2025-06-05 00:00:00');

-- 10 updated ratings (IDs 100-109 have different ratings in target)
UPDATE ratings_target SET rating = rating + 0.5 WHERE id BETWEEN 100 AND 109 AND rating < 5.0;
UPDATE ratings_target SET rating = 1.0 WHERE id BETWEEN 100 AND 109 AND rating >= 5.0;
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
restart: always
volumes:
- postgresql-data:/var/lib/postgresql/data:delegated
- ./dev/seed/postgres:/docker-entrypoint-initdb.d:ro
ports:
- '5432:5432'
expose:
Expand Down Expand Up @@ -42,6 +43,7 @@ services:
restart: always
volumes:
- mysql-data:/var/lib/mysql:delegated
- ./dev/seed/mysql:/docker-entrypoint-initdb.d:ro
user: mysql
ports:
- '3306:3306'
Expand All @@ -61,6 +63,7 @@ services:
clickhouse:
container_name: dd-clickhouse
image: clickhouse/clickhouse-server:24.3
profiles: [full]
restart: always
volumes:
- clickhouse-data:/var/lib/clickhouse:delegated
Expand Down Expand Up @@ -88,6 +91,7 @@ services:

# prestodb.dbapi.connect(host="127.0.0.1", user="presto").cursor().execute('SELECT * FROM system.runtime.nodes')
presto:
profiles: [full]
container_name: dd-presto
build:
context: ./dev
Expand All @@ -101,6 +105,7 @@ services:
- local

trino:
profiles: [full]
container_name: dd-trino
image: 'trinodb/trino:439'
hostname: trino
Expand All @@ -118,6 +123,7 @@ services:

vertica:
container_name: dd-vertica
profiles: [full]
image: vertica/vertica-ce:24.1.0-0
restart: always
volumes:
Expand Down
10 changes: 4 additions & 6 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@
TEST_BIGQUERY_CONN_STRING: str = os.environ.get("DATADIFF_BIGQUERY_URI") or None
TEST_REDSHIFT_CONN_STRING: str = os.environ.get("DATADIFF_REDSHIFT_URI") or None
TEST_ORACLE_CONN_STRING: str = None
TEST_DATABRICKS_CONN_STRING: str = os.environ.get("DATADIFF_DATABRICKS_URI")
TEST_DATABRICKS_CONN_STRING: str = os.environ.get("DATADIFF_DATABRICKS_URI") or None
TEST_TRINO_CONN_STRING: str = os.environ.get("DATADIFF_TRINO_URI") or None
# clickhouse uri for provided docker - "clickhouse://clickhouse:Password1@localhost:9000/clickhouse"
TEST_CLICKHOUSE_CONN_STRING: str = os.environ.get("DATADIFF_CLICKHOUSE_URI")
# vertica uri provided for docker - "vertica://vertica:Password1@localhost:5433/vertica"
TEST_VERTICA_CONN_STRING: str = os.environ.get("DATADIFF_VERTICA_URI")
TEST_CLICKHOUSE_CONN_STRING: str = os.environ.get("DATADIFF_CLICKHOUSE_URI") or None
TEST_VERTICA_CONN_STRING: str = os.environ.get("DATADIFF_VERTICA_URI") or None
TEST_DUCKDB_CONN_STRING: str = "duckdb://main:@:memory:"
TEST_MSSQL_CONN_STRING: str = os.environ.get("DATADIFF_MSSQL_URI")
TEST_MSSQL_CONN_STRING: str = os.environ.get("DATADIFF_MSSQL_URI") or None


DEFAULT_N_SAMPLES = 50
Expand Down
Loading
Loading