A high-performance PostgreSQL Change Data Capture (CDC) tool that streams database changes in real-time to multiple destination databases via logical replication.
| Destination | Driver | Feature Flag |
|---|---|---|
| MySQL | SQLx | mysql (default) |
| SQL Server | Tiberius (TDS) | sqlserver (default) |
| SQLite | SQLx | sqlite (default) |
| Kafka | rdkafka (librdkafka) | kafka |
- Crash-safe persistence - File-based producer-consumer with automatic crash recovery
- Resumable processing - Restart from the exact position within large transaction files
- Streaming transactions - Handles PostgreSQL protocol v2+ in-progress transaction streaming
- SQL compression - Optional gzip compression for transaction files with streaming decompression
- Schema mapping - Configurable PostgreSQL schema to destination database/schema translation
- Prometheus metrics - Built-in HTTP metrics endpoint (feature:
metrics) - Graceful shutdown - Coordinated producer-consumer shutdown with LSN persistence
PostgreSQL with logical replication enabled:
ALTER SYSTEM SET wal_level = logical;
-- Restart PostgreSQL after this change
CREATE PUBLICATION my_publication FOR ALL TABLES;
CREATE USER replicator WITH REPLICATION LOGIN PASSWORD 'password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator;git clone https://github.com/isdaniel/pg2any
cd pg2any
docker-compose up -d
make build && RUST_LOG=info make runuse pg2any_lib::{load_config_from_env, run_cdc_app};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = load_config_from_env()?;
run_cdc_app(config, None).await?;
Ok(())
}use pg2any_lib::{Config, DestinationType};
let config = Config::builder()
.source_connection_string("postgresql://user:pass@localhost:5432/db?replication=database")
.destination_type(DestinationType::MySQL)
.destination_connection_string("mysql://root:pass@localhost:3306/replica_db")
.replication_slot_name("cdc_slot")
.publication_name("cdc_pub")
.streaming(true)
.build()?;pg2any uses a file-based producer-consumer pattern for reliable, crash-safe transaction processing:
PostgreSQL WAL Stream
|
v
+-----------+ +------------------+ +-----------+
| Producer | ----> | File System | ----> | Consumer |
| (WAL | | | | (SQL |
| Reader) | | sql_data_tx/ | | Executor)|
| | | sql_received_tx/ | | |
+-----------+ | sql_pending_tx/ | +-----------+
+------------------+ |
v
Destination DB
- BEGIN - Create
.sqlfile insql_data_tx/and.metainsql_received_tx/ - Events - Append SQL commands to
.sqlfile via buffered writes (8MB buffer) - COMMIT - Move
.metafromsql_received_tx/tosql_pending_tx/; notify consumer - Execute - Consumer reads pending
.meta, executes SQL from.sqlin batches - Cleanup - Delete both
.metaand.sqlfiles on success; update flush LSN
On restart, pg2any:
- Cleans up incomplete transactions from
sql_received_tx/(uncommitted) - Replays committed transactions from
sql_pending_tx/(committed but not yet applied) - Resumes from
last_executed_command_indexwithin partially-executed transaction files - Starts replication from the persisted
flush_lsn
sequenceDiagram
participant PG as PostgreSQL WAL
participant Producer as Producer Task
participant FS as File System
participant Channel as Notification Channel
participant Consumer as Consumer Task
participant Dest as Destination DB
participant LSN as LSN Tracker
Note over PG,LSN: Transaction Processing Flow
PG->>Producer: BEGIN (tx_id: 12345)
Producer->>FS: Create 12345.sql in sql_data_tx/
Producer->>FS: Create 12345.meta in sql_received_tx/
PG->>Producer: INSERT event
Producer->>Producer: Generate SQL
Producer->>FS: Append to buffer (8MB)
PG->>Producer: UPDATE event
Producer->>Producer: Generate SQL
Producer->>FS: Append to buffer
PG->>Producer: DELETE event
Producer->>Producer: Generate SQL
Producer->>FS: Append to buffer
Note over Producer,FS: Buffer reaches 8MB
Producer->>FS: Flush buffer to 12345.sql
PG->>Producer: COMMIT (LSN: 0/1A2B3C4D)
Producer->>FS: Flush remaining buffer
Producer->>FS: Move 12345.meta to sql_pending_tx/
Producer->>Channel: Send notification
Channel->>Consumer: Transaction ready
Consumer->>FS: Read 12345.meta from sql_pending_tx/
Consumer->>FS: Read SQL commands from 12345.sql
loop For each SQL batch
Consumer->>Dest: BEGIN TRANSACTION
Consumer->>Dest: Execute SQL batch
Consumer->>Dest: COMMIT TRANSACTION
Consumer->>FS: Update pending .meta progress
end
Consumer->>FS: Delete 12345.meta and 12345.sql
Consumer->>LSN: Update and persist flush_lsn
graph TB
Crash([System Crash]) --> Restart[Restart pg2any]
Restart --> LoadMeta[Load LSN Metadata]
LoadMeta --> CheckMeta{Metadata Exists?}
CheckMeta -->|No| StartFresh[Start from Latest]
CheckMeta -->|Yes| GetLSN[Extract flush_lsn]
GetLSN --> CleanReceived[Cleanup sql_received_tx/]
CleanReceived --> ProcessPending[Process sql_pending_tx/]
ProcessPending --> SortByTimestamp[Sort by commit_timestamp]
SortByTimestamp --> ForEachPending{For Each .meta}
ForEachPending -->|More files| CheckResume{Has resume index?}
ForEachPending -->|Done| SetStartLSN[Set start_lsn = flush_lsn]
CheckResume -->|Yes| ResumeFromIndex[Resume from last_executed_command_index + 1]
CheckResume -->|No| ReadAll[Read All SQL Commands]
ResumeFromIndex --> Execute[Execute Remaining Commands]
ReadAll --> Execute
Execute --> UpdateLSN[Update LSN Metadata]
UpdateLSN --> DeleteFiles[Delete .meta and .sql]
DeleteFiles --> ForEachPending
SetStartLSN --> StartReplication[Start Replication]
StartFresh --> StartReplication
StartReplication --> NormalOperation([Normal Operation])
style Crash fill:#ff6b6b
style NormalOperation fill:#51cf66
style ResumeFromIndex fill:#ffd43b
All configuration is via environment variables (ideal for containers) or the ConfigBuilder API.
| Variable | Description | Example |
|---|---|---|
CDC_SOURCE_CONNECTION_STRING |
PostgreSQL connection string | postgresql://user:pass@host:5432/db?replication=database |
CDC_DEST_TYPE |
Target database type | MySQL, SqlServer, SQLite, Kafka |
CDC_DEST_URI |
Destination connection string | See format table below |
| Database | Format | Example |
|---|---|---|
| MySQL | mysql://user:pass@host:port/db |
mysql://root:pass@localhost:3306/mydb |
| SQL Server | sqlserver://user:pass@host:port/db |
sqlserver://sa:pass@localhost:1433/master |
| SQLite | File path | ./replica.db or /data/replica.db |
| Kafka | Broker list | broker1:9092,broker2:9092 |
| Variable | Default | Description |
|---|---|---|
CDC_REPLICATION_SLOT |
cdc_slot |
PostgreSQL replication slot name |
CDC_PUBLICATION |
cdc_pub |
PostgreSQL publication name |
CDC_PROTOCOL_VERSION |
1 |
Replication protocol version (1-4) |
CDC_STREAMING |
true |
Stream in-progress transactions (requires protocol v2+) |
CDC_SCHEMA_MAPPING |
Schema translation, e.g. public:cdc_db,sales:sales_db |
|
CDC_BUFFER_SIZE |
500 |
Transaction channel capacity |
CDC_TRANSACTION_SEGMENT_SIZE_MB |
64 |
Max segment file size in MB |
CDC_CONNECTION_TIMEOUT |
30 |
Connection timeout (seconds) |
CDC_QUERY_TIMEOUT |
10 |
Query timeout (seconds) |
CDC_LAST_LSN_FILE |
./pg2any_last_lsn |
Base path for LSN metadata file |
CDC_TRANSACTION_FILE_BASE_PATH |
./ |
Base directory for transaction files |
PG2ANY_ENABLE_COMPRESSION |
false |
Enable gzip compression for SQL files |
RUST_LOG |
pg2any=debug |
Log level |
Enable with feature flag metrics. Exposes Prometheus-compatible metrics on port 8080.
# Key metrics
pg2any_events_processed_total
pg2any_transactions_processed_total
pg2any_replication_lag_seconds
pg2any_events_per_second
pg2any_errors_total
pg2any_source_connection_status
pg2any_destination_connection_statusThe Docker Compose setup includes a full observability stack: Prometheus (:9090), Node Exporter, PostgreSQL Exporter, and MySQL Exporter with predefined alert rules.
make build # Build the application
make test # Run full test suite
make check # Cargo check + validation
make format # Format code with rustfmt
make before-git-push # Pre-commit validation
# Docker environment
make docker-start # Start databases + monitoring
make docker-stop # Stop all services
# Chaos & integration tests
make chaos-test-mysql-full
make chaos-test-sqlserver-full
make chaos-test-sqlite-full
make chaos-test-kafka-full
make pgbench-test-mysql-full[features]
default = ["mysql", "sqlserver", "sqlite"]
mysql = ["sqlx/mysql"]
sqlserver = ["tiberius"]
sqlite = ["sqlx/sqlite"]
kafka = ["rdkafka", "futures-util", "base64"]
metrics = ["hyper", "hyper-util", "http-body-util", "prometheus"]| Crate | Purpose |
|---|---|
| pg_walstream | PostgreSQL logical replication protocol |
| tokio | Async runtime |
| sqlx | MySQL + SQLite async driver |
| tiberius | SQL Server TDS protocol |
| rdkafka | Kafka producer (librdkafka wrapper) |
| serde / serde_json | Serialization |
| prometheus | Metrics collection |
| thiserror | Error handling |
flate2 / async-compression |
SQL file compression |
Apache-2.0
- PostgreSQL Logical Replication Protocol
- PostgreSQL WAL Internals
- pg_walstream - Underlying replication library