diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..d5ed23d --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,25 @@ +name: CI + +on: + push: + branches: + - main + - tna_testnet + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Rust cache + uses: Swatinem/rust-cache@v2 + + - name: Test (workspace) + run: cargo test --workspace --locked + diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..5471107 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,47 @@ +name: Release + +on: + push: + tags: + - "v*" + workflow_dispatch: + +permissions: + contents: write + +jobs: + build-linux: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Rust cache + uses: Swatinem/rust-cache@v2 + + - name: Build catalyst-cli (release) + run: cargo build -p catalyst-cli --release --locked + + - name: Package + checksums + shell: bash + run: | + set -euo pipefail + mkdir -p dist + cp target/release/catalyst-cli dist/ + sha256sum dist/catalyst-cli > dist/SHA256SUMS + VERSION="${GITHUB_REF_NAME:-manual}" + TAR="catalyst-cli-${VERSION}-x86_64-unknown-linux-gnu.tar.gz" + tar -czf "$TAR" -C dist catalyst-cli SHA256SUMS + sha256sum "$TAR" > "${TAR}.sha256" + + - name: Upload release assets (tag builds) + if: startsWith(github.ref, 'refs/tags/') + uses: softprops/action-gh-release@v2 + with: + files: | + catalyst-cli-${{ github.ref_name }}-x86_64-unknown-linux-gnu.tar.gz + catalyst-cli-${{ github.ref_name }}-x86_64-unknown-linux-gnu.tar.gz.sha256 + diff --git a/Cargo.lock b/Cargo.lock index 1f8c6dd..c9f9f84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1177,6 +1177,7 @@ dependencies = [ "serde_json", "sha2 0.10.9", "tar", + "tempfile", "thiserror 1.0.69", "tokio", "tokio-util", @@ -1451,6 +1452,7 @@ name = "catalyst-utils" version = "0.1.0" dependencies = [ "async-trait", + "bincode", "hex", "libp2p", "prometheus", diff --git a/crates/catalyst-cli/Cargo.toml b/crates/catalyst-cli/Cargo.toml index 44a4928..4a75275 100644 --- a/crates/catalyst-cli/Cargo.toml +++ b/crates/catalyst-cli/Cargo.toml @@ -54,6 +54,9 @@ bytes = "1.11.1" tokio-util = { workspace = true, features = ["io"] } hickory-resolver = "0.25.2" +[dev-dependencies] +tempfile = "3" + [features] default = [] dev = [] diff --git a/crates/catalyst-cli/src/commands.rs b/crates/catalyst-cli/src/commands.rs index 2f7cf99..4057f65 100644 --- a/crates/catalyst-cli/src/commands.rs +++ b/crates/catalyst-cli/src/commands.rs @@ -325,6 +325,52 @@ pub async fn db_backup(data_dir: &Path, out_dir: &Path, archive: Option<&Path>) Ok(()) } +/// Print RocksDB/storage statistics for an existing node data directory. +pub async fn db_stats(data_dir: &Path) -> Result<()> { + let mut cfg = StorageConfigLib::default(); + cfg.data_dir = data_dir.to_path_buf(); + let store = StorageManager::new(cfg).await?; + + let st = store.get_statistics().await?; + println!( + "state_root: {}", + st.current_state_root + .map(|h| format!("0x{}", hex::encode(h))) + .unwrap_or_else(|| "null".to_string()) + ); + println!("pending_transactions: {}", st.pending_transactions); + + // Column-family key counts are a simple proxy for growth hotspots. + let mut cfs: Vec<(String, u64)> = st.column_family_stats.into_iter().collect(); + cfs.sort_by(|a, b| a.0.cmp(&b.0)); + for (cf, n) in cfs { + println!("cf_keys.{}: {}", cf, n); + } + + // Memory usage estimates. + let mut mem: Vec<(String, u64)> = st.memory_usage.into_iter().collect(); + mem.sort_by(|a, b| a.0.cmp(&b.0)); + for (k, v) in mem { + println!("mem.{}: {}", k, v); + } + + if let Some(stats) = st.database_stats { + println!("rocksdb_stats:\n{}", stats); + } + Ok(()) +} + +/// Run maintenance (flush + manual compaction + snapshot cleanup). +pub async fn db_maintenance(data_dir: &Path) -> Result<()> { + let mut cfg = StorageConfigLib::default(); + cfg.data_dir = data_dir.to_path_buf(); + let store = StorageManager::new(cfg).await?; + + store.maintenance().await?; + println!("maintenance_ok: true"); + Ok(()) +} + pub async fn db_restore(data_dir: &Path, from_dir: &Path) -> Result<()> { // Optional pre-flight: if metadata is present, load it for post-restore verification. let meta_path = from_dir.join("catalyst_snapshot.json"); diff --git a/crates/catalyst-cli/src/config.rs b/crates/catalyst-cli/src/config.rs index 03817b7..c50a419 100644 --- a/crates/catalyst-cli/src/config.rs +++ b/crates/catalyst-cli/src/config.rs @@ -178,6 +178,40 @@ pub struct StorageConfig { /// Enable database compression pub compression_enabled: bool, + + /// Enable pruning of historical RPC/indexer metadata (blocks/tx history). + /// + /// When enabled, the node will delete old `metadata` keys for cycles older than + /// `history_keep_cycles` behind the applied head. Authenticated account state is not affected. + #[serde(default)] + pub history_prune_enabled: bool, + + /// Number of cycles (seconds) of history to retain behind head when pruning is enabled. + /// + /// `0` means "keep all history" (no pruning), even if `history_prune_enabled=true`. + #[serde(default = "default_history_keep_cycles")] + pub history_keep_cycles: u64, + + /// Minimum time between prune runs. + #[serde(default = "default_history_prune_interval_seconds")] + pub history_prune_interval_seconds: u64, + + /// Maximum number of cycles to prune per run (bounds runtime overhead). + #[serde(default = "default_history_prune_batch_cycles")] + pub history_prune_batch_cycles: u64, +} + +fn default_history_keep_cycles() -> u64 { + // Default retention window for pruned nodes: 7 days at 1 cycle/sec. + 7 * 24 * 60 * 60 +} + +fn default_history_prune_interval_seconds() -> u64 { + 300 +} + +fn default_history_prune_batch_cycles() -> u64 { + 1_000 } /// Consensus configuration @@ -433,6 +467,10 @@ impl Default for NodeConfig { write_buffer_size_mb: 64, max_open_files: 1000, compression_enabled: true, + history_prune_enabled: false, + history_keep_cycles: default_history_keep_cycles(), + history_prune_interval_seconds: default_history_prune_interval_seconds(), + history_prune_batch_cycles: default_history_prune_batch_cycles(), }, consensus: ConsensusConfig { cycle_duration_seconds: 60, @@ -649,6 +687,18 @@ impl NodeConfig { if self.storage.capacity_gb == 0 && self.storage.enabled { return Err(anyhow::anyhow!("Storage capacity must be > 0 when storage is enabled")); } + if self.storage.history_prune_enabled { + if self.storage.history_prune_interval_seconds == 0 { + return Err(anyhow::anyhow!( + "storage.history_prune_interval_seconds must be > 0 when pruning is enabled" + )); + } + if self.storage.history_prune_batch_cycles == 0 { + return Err(anyhow::anyhow!( + "storage.history_prune_batch_cycles must be > 0 when pruning is enabled" + )); + } + } // Validate RPC configuration if self.rpc.enabled && self.rpc.port == 0 { diff --git a/crates/catalyst-cli/src/main.rs b/crates/catalyst-cli/src/main.rs index fdc1b23..11c408f 100644 --- a/crates/catalyst-cli/src/main.rs +++ b/crates/catalyst-cli/src/main.rs @@ -11,6 +11,7 @@ mod commands; mod config; mod tx; mod sync; +mod pruning; mod dfs_store; mod identity; mod evm; @@ -154,6 +155,18 @@ enum Commands { #[arg(long)] from_dir: PathBuf, }, + /// Show local DB stats for a data directory + DbStats { + /// Data directory (same as config.storage.data_dir) + #[arg(long)] + data_dir: PathBuf, + }, + /// Run DB maintenance (flush, compact, snapshot cleanup) + DbMaintenance { + /// Data directory (same as config.storage.data_dir) + #[arg(long)] + data_dir: PathBuf, + }, /// Publish snapshot metadata into the node DB (served via RPC for fast-sync tooling) SnapshotPublish { /// Data directory (same as config.storage.data_dir) of the RPC node @@ -413,6 +426,9 @@ async fn main() -> Result<()> { .collect(); } + // Enforce bounded, safe parameters before starting. + node_config.validate()?; + start_node(node_config, generate_txs, tx_interval_ms).await?; } Commands::GenerateIdentity { output } => { @@ -441,6 +457,12 @@ async fn main() -> Result<()> { Commands::DbRestore { data_dir, from_dir } => { commands::db_restore(&data_dir, &from_dir).await?; } + Commands::DbStats { data_dir } => { + commands::db_stats(&data_dir).await?; + } + Commands::DbMaintenance { data_dir } => { + commands::db_maintenance(&data_dir).await?; + } Commands::SnapshotPublish { data_dir, snapshot_dir, archive_url, archive_path, ttl_seconds } => { commands::snapshot_publish(&data_dir, &snapshot_dir, &archive_url, &archive_path, ttl_seconds).await?; } diff --git a/crates/catalyst-cli/src/node.rs b/crates/catalyst-cli/src/node.rs index 8c14ac8..8f81b9c 100644 --- a/crates/catalyst-cli/src/node.rs +++ b/crates/catalyst-cli/src/node.rs @@ -703,6 +703,12 @@ async fn apply_lsu_to_storage_without_root_check( } } + // Prune persisted mempool against updated nonces. + prune_persisted_mempool(store).await; + + // Opportunistic disk-bounding: prune old historical metadata (if enabled). + crate::pruning::maybe_prune_history(store).await; + // Flush without recomputing. for cf_name in store.engine().cf_names() { let _ = store.engine().flush_cf(&cf_name); @@ -1775,6 +1781,9 @@ async fn apply_lsu_to_storage( // Prune persisted mempool against updated nonces. prune_persisted_mempool(store).await; + // Opportunistic disk-bounding: prune old historical metadata (if enabled). + crate::pruning::maybe_prune_history(store).await; + Ok(state_root) } @@ -2111,6 +2120,36 @@ impl CatalystNode { .map_err(|e| anyhow::anyhow!("genesis/identity initialization failed: {e}"))?; } + // Persist storage pruning knobs into DB metadata so apply paths can enforce bounded disk + // growth without needing to thread config through every callsite. + if let Some(store) = &storage { + let enabled: u8 = if self.config.storage.history_prune_enabled { 1 } else { 0 }; + let _ = store + .set_metadata("storage:history_prune_enabled", &[enabled]) + .await; + let _ = store + .set_metadata( + "storage:history_keep_cycles", + &self.config.storage.history_keep_cycles.to_le_bytes(), + ) + .await; + let _ = store + .set_metadata( + "storage:history_prune_interval_seconds", + &self.config + .storage + .history_prune_interval_seconds + .to_le_bytes(), + ) + .await; + let _ = store + .set_metadata( + "storage:history_prune_batch_cycles", + &self.config.storage.history_prune_batch_cycles.to_le_bytes(), + ) + .await; + } + // Auto-register as a worker (on-chain) for validator nodes. if self.config.validator { if let Some(store) = &storage { diff --git a/crates/catalyst-cli/src/pruning.rs b/crates/catalyst-cli/src/pruning.rs new file mode 100644 index 0000000..666a8d0 --- /dev/null +++ b/crates/catalyst-cli/src/pruning.rs @@ -0,0 +1,309 @@ +use anyhow::Result; +use catalyst_storage::StorageManager; +use catalyst_utils::state::state_keys; + +const META_PRUNE_ENABLED: &str = "storage:history_prune_enabled"; +const META_KEEP_CYCLES: &str = "storage:history_keep_cycles"; +const META_PRUNE_INTERVAL_SECS: &str = "storage:history_prune_interval_seconds"; +const META_PRUNE_BATCH_CYCLES: &str = "storage:history_prune_batch_cycles"; +const META_LAST_PRUNE_RUN_MS: &str = "storage:history_prune_last_run_ms"; +const META_PRUNED_UP_TO_CYCLE: &str = "storage:history_pruned_up_to_cycle"; + +#[derive(Debug, Clone)] +struct PruneCfg { + enabled: bool, + keep_cycles: u64, + interval_secs: u64, + batch_cycles: u64, +} + +#[derive(Debug, Clone)] +pub struct PruneReport { + pub pruned_from_cycle: u64, + pub pruned_to_cycle: u64, + pub deleted_cycles: u64, + pub deleted_tx_entries: u64, + pub deleted_keys: u64, +} + +fn decode_u64_le(bytes: &[u8]) -> Option { + if bytes.len() != 8 { + return None; + } + let mut arr = [0u8; 8]; + arr.copy_from_slice(bytes); + Some(u64::from_le_bytes(arr)) +} + +fn decode_bool(bytes: &[u8]) -> Option { + if bytes.len() != 1 { + return None; + } + Some(bytes[0] != 0) +} + +fn now_ms() -> u64 { + catalyst_utils::utils::current_timestamp_ms() +} + +fn meta_delete(store: &StorageManager, key: &str) { + let k = state_keys::metadata_key(key); + let _ = store.engine().delete("metadata", &k); +} + +fn consensus_lsu_key(cycle: u64) -> String { + format!("consensus:lsu:{cycle}") +} +fn consensus_lsu_hash_key(cycle: u64) -> String { + format!("consensus:lsu_hash:{cycle}") +} +fn consensus_lsu_cid_key(cycle: u64) -> String { + format!("consensus:lsu_cid:{cycle}") +} +fn consensus_lsu_state_root_key(cycle: u64) -> String { + format!("consensus:lsu_state_root:{cycle}") +} +fn cycle_by_lsu_hash_key(lsu_hash: &[u8; 32]) -> String { + format!("consensus:cycle_by_lsu_hash:{}", hex::encode(lsu_hash)) +} +fn cycle_txids_key(cycle: u64) -> String { + format!("tx:cycle:{cycle}:txids") +} +fn tx_raw_key(txid: &[u8; 32]) -> String { + format!("tx:raw:{}", hex::encode(txid)) +} +fn tx_meta_key(txid: &[u8; 32]) -> String { + format!("tx:meta:{}", hex::encode(txid)) +} + +async fn load_prune_cfg(store: &StorageManager) -> PruneCfg { + let enabled = store + .get_metadata(META_PRUNE_ENABLED) + .await + .ok() + .flatten() + .and_then(|b| decode_bool(&b)) + .unwrap_or(false); + let keep_cycles = store + .get_metadata(META_KEEP_CYCLES) + .await + .ok() + .flatten() + .and_then(|b| decode_u64_le(&b)) + .unwrap_or(0); + let interval_secs = store + .get_metadata(META_PRUNE_INTERVAL_SECS) + .await + .ok() + .flatten() + .and_then(|b| decode_u64_le(&b)) + .unwrap_or(300); + let batch_cycles = store + .get_metadata(META_PRUNE_BATCH_CYCLES) + .await + .ok() + .flatten() + .and_then(|b| decode_u64_le(&b)) + .unwrap_or(1_000); + PruneCfg { + enabled, + keep_cycles, + interval_secs: interval_secs.max(1), + batch_cycles: batch_cycles.max(1), + } +} + +async fn prune_once( + store: &StorageManager, + keep_cycles: u64, + batch_cycles: u64, +) -> Result> { + if keep_cycles == 0 { + return Ok(None); + } + + let head = store + .get_metadata("consensus:last_applied_cycle") + .await + .ok() + .flatten() + .and_then(|b| decode_u64_le(&b)) + .unwrap_or(0); + + // Keep the most recent `keep_cycles` cycles (inclusive of head). + let prune_up_to = head.saturating_sub(keep_cycles).saturating_sub(1); + if prune_up_to == 0 { + return Ok(None); + } + + let start = store + .get_metadata(META_PRUNED_UP_TO_CYCLE) + .await + .ok() + .flatten() + .and_then(|b| decode_u64_le(&b)) + .map(|last| last.saturating_add(1)) + .unwrap_or(0); + + if start == 0 && prune_up_to == 0 { + return Ok(None); + } + if start > prune_up_to { + return Ok(None); + } + + let end = prune_up_to.min(start.saturating_add(batch_cycles.saturating_sub(1))); + + let mut deleted_keys: u64 = 0; + let mut deleted_tx_entries: u64 = 0; + + for cycle in start..=end { + // Load txids BEFORE deleting the per-cycle index. + let txids: Vec<[u8; 32]> = store + .get_metadata(&cycle_txids_key(cycle)) + .await + .ok() + .flatten() + .and_then(|b| bincode::deserialize::>(&b).ok()) + .unwrap_or_default(); + + for txid in &txids { + meta_delete(store, &tx_raw_key(txid)); + meta_delete(store, &tx_meta_key(txid)); + deleted_tx_entries = deleted_tx_entries.saturating_add(1); + deleted_keys = deleted_keys.saturating_add(2); + } + meta_delete(store, &cycle_txids_key(cycle)); + deleted_keys = deleted_keys.saturating_add(1); + + // Consensus history keys. + let lsu_hash = store + .get_metadata(&consensus_lsu_hash_key(cycle)) + .await + .ok() + .flatten() + .and_then(|b| { + if b.len() == 32 { + let mut h = [0u8; 32]; + h.copy_from_slice(&b); + Some(h) + } else { + None + } + }); + + meta_delete(store, &consensus_lsu_key(cycle)); + meta_delete(store, &consensus_lsu_hash_key(cycle)); + meta_delete(store, &consensus_lsu_cid_key(cycle)); + meta_delete(store, &consensus_lsu_state_root_key(cycle)); + deleted_keys = deleted_keys.saturating_add(4); + + if let Some(h) = lsu_hash { + meta_delete(store, &cycle_by_lsu_hash_key(&h)); + deleted_keys = deleted_keys.saturating_add(1); + } + } + + // Persist progress (best-effort). + let _ = store + .set_metadata(META_PRUNED_UP_TO_CYCLE, &end.to_le_bytes()) + .await; + + Ok(Some(PruneReport { + pruned_from_cycle: start, + pruned_to_cycle: end, + deleted_cycles: end.saturating_sub(start).saturating_add(1), + deleted_tx_entries, + deleted_keys, + })) +} + +/// Opportunistically prune historical RPC/indexer metadata to keep disk usage bounded. +/// +/// This is controlled by metadata keys written from the node config on startup. +pub async fn maybe_prune_history(store: &StorageManager) { + let cfg = load_prune_cfg(store).await; + if !cfg.enabled { + return; + } + + let now = now_ms(); + let last = store + .get_metadata(META_LAST_PRUNE_RUN_MS) + .await + .ok() + .flatten() + .and_then(|b| decode_u64_le(&b)) + .unwrap_or(0); + + if now.saturating_sub(last) < cfg.interval_secs.saturating_mul(1000) { + return; + } + + // Set last-run first to avoid repeated work under heavy apply loops (best-effort). + let _ = store + .set_metadata(META_LAST_PRUNE_RUN_MS, &now.to_le_bytes()) + .await; + + let _ = prune_once(store, cfg.keep_cycles, cfg.batch_cycles).await; +} + +#[cfg(test)] +mod tests { + use super::*; + use catalyst_storage::StorageConfig as StorageConfigLib; + use tempfile::TempDir; + + #[tokio::test] + async fn test_prune_once_deletes_expected_keys() { + let td = TempDir::new().unwrap(); + let mut cfg = StorageConfigLib::default(); + cfg.data_dir = td.path().to_path_buf(); + let store = StorageManager::new(cfg).await.unwrap(); + + // Fake head at 100, keep last 10 -> prune up to 89. + store.set_metadata("consensus:last_applied_cycle", &100u64.to_le_bytes()) + .await + .unwrap(); + + // Populate a couple cycles of history in the prune range. + for cycle in 50u64..=52u64 { + store.set_metadata(&consensus_lsu_key(cycle), b"lsu").await.unwrap(); + let h = [cycle as u8; 32]; + store.set_metadata(&consensus_lsu_hash_key(cycle), &h).await.unwrap(); + store.set_metadata(&cycle_by_lsu_hash_key(&h), &cycle.to_le_bytes()) + .await + .unwrap(); + store.set_metadata(&consensus_lsu_cid_key(cycle), b"cid").await.unwrap(); + store.set_metadata(&consensus_lsu_state_root_key(cycle), &[0u8; 32]) + .await + .unwrap(); + + // One tx per cycle. + let txid = [cycle as u8; 32]; + let txids = vec![txid]; + store.set_metadata(&cycle_txids_key(cycle), &bincode::serialize(&txids).unwrap()) + .await + .unwrap(); + store.set_metadata(&tx_raw_key(&txid), b"raw").await.unwrap(); + store.set_metadata(&tx_meta_key(&txid), b"meta").await.unwrap(); + } + + let rep = prune_once(&store, 10, 3).await.unwrap().unwrap(); + assert_eq!(rep.pruned_from_cycle, 0); // first run starts at 0 + assert_eq!(rep.pruned_to_cycle, 2); // batch of 3 cycles (0..2) + + // Now set progress to 49 and prune the populated cycles. + store.set_metadata(META_PRUNED_UP_TO_CYCLE, &49u64.to_le_bytes()) + .await + .unwrap(); + let rep2 = prune_once(&store, 10, 10).await.unwrap().unwrap(); + assert_eq!(rep2.pruned_from_cycle, 50); + assert_eq!(rep2.pruned_to_cycle, 59.min(89)); // limited by batch + + // Ensure one known key was deleted. + let k = store.get_metadata(&consensus_lsu_key(50)).await.unwrap(); + assert!(k.is_none()); + } +} + diff --git a/crates/catalyst-core/tests/wire_vectors.rs b/crates/catalyst-core/tests/wire_vectors.rs new file mode 100644 index 0000000..cc2402a --- /dev/null +++ b/crates/catalyst-core/tests/wire_vectors.rs @@ -0,0 +1,77 @@ +use catalyst_core::protocol as corep; + +fn hex32(s: &str) -> [u8; 32] { + let s = s.trim().strip_prefix("0x").unwrap_or(s.trim()); + let bytes = hex::decode(s).expect("hex decode"); + assert_eq!(bytes.len(), 32); + let mut out = [0u8; 32]; + out.copy_from_slice(&bytes); + out +} + +#[test] +fn tx_v2_wire_and_txid_vectors_are_stable() { + // If you intentionally change any canonical serialization rules for `CTX2` or the v2 signing + // payload, update these golden vectors in the same PR and explain why in #207. + + // Fixed chain domain. + let chain_id: u64 = 200820092; + let genesis_hash = hex32("0x1111111111111111111111111111111111111111111111111111111111111111"); + + // Fixed keys and payload. + let pk_sender = hex32("0x2222222222222222222222222222222222222222222222222222222222222222"); + let pk_recipient = hex32("0x3333333333333333333333333333333333333333333333333333333333333333"); + + let tx = corep::Transaction { + core: corep::TransactionCore { + tx_type: corep::TransactionType::NonConfidentialTransfer, + entries: vec![ + corep::TransactionEntry { + public_key: pk_sender, + amount: corep::EntryAmount::NonConfidential(-123), + }, + corep::TransactionEntry { + public_key: pk_recipient, + amount: corep::EntryAmount::NonConfidential(123), + }, + ], + nonce: 7, + lock_time: 0, + fees: 1, + data: vec![], + }, + signature_scheme: corep::sig_scheme::SCHNORR_V1, + // Signature bytes are part of the wire tx. Use a fixed blob (not a real signature). + signature: corep::AggregatedSignature(vec![0x44; 64]), + sender_pubkey: None, + timestamp: 1700000000000, + }; + + let wire = corep::encode_wire_tx_v2(&tx).expect("encode v2"); + let wire_hex = format!("0x{}", hex::encode(&wire)); + + // NOTE: This value is expected to change only when the canonical encoding rules change. + const EXPECT_WIRE_HEX: &str = "0x43545832000200000022222222222222222222222222222222222222222222222222222222222222220085ffffffffffffff3333333333333333333333333333333333333333333333333333333333333333007b00000000000000070000000000000000000000010000000000000000000000004000000044444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444000068e5cf8b010000"; + assert_eq!( + wire_hex, EXPECT_WIRE_HEX, + "CTX2 wire bytes changed; update golden vectors intentionally" + ); + + let txid = corep::tx_id_v2(&tx).expect("tx_id_v2"); + let txid_hex = format!("0x{}", hex::encode(txid)); + const EXPECT_TXID_HEX: &str = + "0x548c6ac95792c2ee8f12f13f7113115837a5732769184ed2c61ca5c0b1732427"; + assert_eq!(txid_hex, EXPECT_TXID_HEX, "tx_id_v2 changed"); + + // Signing payload v2 vector. + let payload = tx + .signing_payload_v2(chain_id, genesis_hash) + .or_else(|_| tx.signing_payload_v1(chain_id, genesis_hash)) + .expect("signing payload"); + let payload_hex = format!("0x{}", hex::encode(payload)); + const EXPECT_PAYLOAD_HEX: &str = "0x434154414c5953545f5349475f56327c45f80b0000000011111111111111111111111111111111111111111111111111111111111111110000000200000022222222222222222222222222222222222222222222222222222222222222220085ffffffffffffff3333333333333333333333333333333333333333333333333333333333333333007b000000000000000700000000000000000000000100000000000000000000000068e5cf8b010000"; + assert_eq!( + payload_hex, EXPECT_PAYLOAD_HEX, + "v2 signing payload changed; update golden vectors intentionally" + ); +} diff --git a/crates/catalyst-dfs/benches/dfs_benchmarks.rs b/crates/catalyst-dfs/benches/dfs_benchmarks.rs index fb2e16e..cd0a396 100644 --- a/crates/catalyst-dfs/benches/dfs_benchmarks.rs +++ b/crates/catalyst-dfs/benches/dfs_benchmarks.rs @@ -1,349 +1,92 @@ -//! Benchmark tests for Catalyst DFS performance -//! -//! Run with: cargo bench +//! Benchmark tests for Catalyst DFS performance. +//! +//! Run with: +//! - `cargo bench -p catalyst-dfs` -use catalyst_dfs::{DfsConfig, DfsFactory, ContentCategory, CategorizedStorage, DistributedFileSystem}; -use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId}; +use catalyst_dfs::{ContentId, DfsConfig, DfsFactory, DistributedFileSystem}; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; use tempfile::TempDir; use tokio::runtime::Runtime; -use std::sync::Arc; -// Helper function to create DFS instance for benchmarks -async fn create_test_dfs() -> Box { - group.finish(); -} - -fn bench_content_addressing(c: &mut Criterion) { - let mut group = c.benchmark_group("content_addressing"); - - let sizes = vec![1024, 10240, 102400, 1048576]; - - for size in sizes { - group.benchmark_with_input( - BenchmarkId::new("cid_generation", size), - &size, - |b, &size| { - let data = vec![0u8; size]; - b.iter(|| { - black_box(catalyst_dfs::ContentId::from_data(&data).unwrap()) - }); - }, - ); - } - - group.finish(); +async fn create_test_dfs() -> (TempDir, Box) { + let temp_dir = TempDir::new().unwrap(); + let config = DfsConfig::test_config(temp_dir.path().to_path_buf()); + let dfs = DfsFactory::create(&config).await.unwrap(); + (temp_dir, dfs) } -fn bench_metadata_operations(c: &mut Criterion) { +fn bench_put(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - - c.bench_function("metadata_lookup", |b| { - let dfs = rt.block_on(create_test_dfs()); - let data = vec![0u8; 1024]; - let cid = rt.block_on(dfs.put(data)).unwrap(); - - b.to_async(&rt).iter(|| async { - black_box(dfs.metadata(&cid).await.unwrap()) - }); - }); - - c.bench_function("has_content", |b| { - let dfs = rt.block_on(create_test_dfs()); - let data = vec![0u8; 1024]; - let cid = rt.block_on(dfs.put(data)).unwrap(); - - b.to_async(&rt).iter(|| async { - black_box(dfs.has(&cid).await.unwrap()) - }); - }); - - c.bench_function("pin_unpin", |b| { - let dfs = rt.block_on(create_test_dfs()); - let data = vec![0u8; 1024]; - let cid = rt.block_on(dfs.put(data)).unwrap(); - - b.to_async(&rt).iter(|| async { - dfs.pin(&cid).await.unwrap(); - black_box(dfs.unpin(&cid).await.unwrap()) - }); - }); -} + let mut group = c.benchmark_group("dfs_put"); -fn bench_categorized_storage(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - let mut group = c.benchmark_group("categorized"); - - let categories = vec![ - ContentCategory::Contract, - ContentCategory::LedgerUpdate, - ContentCategory::AppData, - ContentCategory::Media, - ContentCategory::File, - ]; - - for category in categories { - group.benchmark_with_input( - BenchmarkId::new("put_categorized", format!("{:?}", category)), - &category, - |b, category| { - let dfs = rt.block_on(create_test_dfs()); - b.to_async(&rt).iter(|| async { - let data = vec![0u8; 1024]; - black_box(dfs.put_categorized(data, category.clone()).await.unwrap()) - }); - }, - ); + for size in [1024usize, 10 * 1024, 100 * 1024, 1024 * 1024] { + group.bench_with_input(BenchmarkId::new("put", size), &size, |b, &size| { + b.to_async(&rt).iter_with_setup( + || rt.block_on(create_test_dfs()), + |(_td, dfs)| async move { + let data = vec![0u8; size]; + black_box(dfs.put(data).await.unwrap()) + }, + ); + }); } - - group.finish(); -} -fn bench_concurrent_operations(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - let mut group = c.benchmark_group("concurrent"); - - let thread_counts = vec![1, 2, 4, 8, 16]; - - for thread_count in thread_counts { - group.benchmark_with_input( - BenchmarkId::new("concurrent_puts", thread_count), - &thread_count, - |b, &thread_count| { - b.to_async(&rt).iter(|| async { - let dfs = Arc::new(create_test_dfs().await); - let mut handles = Vec::new(); - - for i in 0..thread_count { - let dfs_clone = Arc::clone(&dfs); - let handle = tokio::spawn(async move { - let data = vec![i as u8; 1024]; - dfs_clone.put(data).await.unwrap() - }); - handles.push(handle); - } - - let results = futures::future::join_all(handles).await; - black_box(results) - }); - }, - ); - } - group.finish(); } -fn bench_garbage_collection(c: &mut Criterion) { +fn bench_get(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - - c.bench_function("gc_empty", |b| { - let mut config = DfsConfig::test_config(TempDir::new().unwrap().path().to_path_buf()); - config.enable_gc = true; - - b.to_async(&rt).iter(|| async { - let dfs = DfsFactory::create(&config).await.unwrap(); - black_box(dfs.gc().await.unwrap()) - }); - }); - - c.bench_function("gc_with_content", |b| { - b.to_async(&rt).iter(|| async { - let mut config = DfsConfig::test_config(TempDir::new().unwrap().path().to_path_buf()); - config.enable_gc = true; - let dfs = DfsFactory::create(&config).await.unwrap(); - - // Add some content - for i in 0..100 { - let data = vec![i as u8; 1024]; - dfs.put(data).await.unwrap(); - } - - black_box(dfs.gc().await.unwrap()) - }); - }); -} + let mut group = c.benchmark_group("dfs_get"); -fn bench_list_operations(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - let mut group = c.benchmark_group("list_operations"); - - let content_counts = vec![10, 100, 1000]; - - for count in content_counts { - group.benchmark_with_input( - BenchmarkId::new("list_all", count), - &count, - |b, &count| { - b.to_async(&rt).iter_with_setup( - || { - rt.block_on(async { - let dfs = create_test_dfs().await; - - // Populate with content - for i in 0..count { - let data = vec![i as u8; 100]; - dfs.put(data).await.unwrap(); - } - - dfs - }) - }, - |dfs| async move { - black_box(dfs.list().await.unwrap()) - }, - ); - }, - ); - - group.benchmark_with_input( - BenchmarkId::new("list_by_category", count), - &count, - |b, &count| { - b.to_async(&rt).iter_with_setup( - || { - rt.block_on(async { - let dfs = create_test_dfs().await; - - // Populate with categorized content - for i in 0..count { - let data = vec![i as u8; 100]; - dfs.put_categorized(data, ContentCategory::AppData).await.unwrap(); - } - - dfs - }) - }, - |dfs| async move { - black_box(dfs.list_by_category(ContentCategory::AppData).await.unwrap()) - }, - ); - }, - ); + for size in [1024usize, 10 * 1024, 100 * 1024] { + group.bench_with_input(BenchmarkId::new("get", size), &size, |b, &size| { + b.to_async(&rt).iter_with_setup( + || { + rt.block_on(async { + let (td, dfs) = create_test_dfs().await; + let data = vec![0u8; size]; + let cid: ContentId = dfs.put(data).await.unwrap(); + (td, dfs, cid) + }) + }, + |(_td, dfs, cid)| async move { black_box(dfs.get(&cid).await.unwrap()) }, + ); + }); } - + group.finish(); } -fn bench_stats_operations(c: &mut Criterion) { +fn bench_has(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - - c.bench_function("stats_empty", |b| { - let dfs = rt.block_on(create_test_dfs()); - b.to_async(&rt).iter(|| async { - black_box(dfs.stats().await.unwrap()) - }); - }); - - c.bench_function("stats_with_content", |b| { + + c.bench_function("dfs_has", |b| { b.to_async(&rt).iter_with_setup( || { rt.block_on(async { - let dfs = create_test_dfs().await; - - // Add content - for i in 0..100 { - let data = vec![i as u8; 1024]; - dfs.put(data).await.unwrap(); - } - - dfs + let (td, dfs) = create_test_dfs().await; + let cid = dfs.put(vec![1u8; 1024]).await.unwrap(); + (td, dfs, cid) }) }, - |dfs| async move { - black_box(dfs.stats().await.unwrap()) - }, + |(_td, dfs, cid)| async move { black_box(dfs.has(&cid).await.unwrap()) }, ); }); } -fn bench_provider_operations(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - c.bench_function("provider_announce", |b| { - let provider = catalyst_dfs::DfsContentProvider::new(); - let cid = catalyst_dfs::ContentId::from_data(b"test").unwrap(); - - b.to_async(&rt).iter(|| async { - black_box(provider.provide(&cid).await.unwrap()) - }); - }); - - c.bench_function("provider_find", |b| { - let provider = catalyst_dfs::DfsContentProvider::new(); - let cid = catalyst_dfs::ContentId::from_data(b"test").unwrap(); - rt.block_on(provider.provide(&cid)).unwrap(); - - b.to_async(&rt).iter(|| async { - black_box(provider.find_providers(&cid).await.unwrap()) - }); - }); -} - -criterion_group!( - benches, - bench_content_storage, - bench_content_retrieval, - bench_content_addressing, - bench_metadata_operations, - bench_categorized_storage, - bench_concurrent_operations, - bench_garbage_collection, - bench_list_operations, - bench_stats_operations, - bench_provider_operations -); - -criterion_main!(benches);let temp_dir = TempDir::new().unwrap(); - let config = DfsConfig::test_config(temp_dir.path().to_path_buf()); - DfsFactory::create(&config).await.unwrap() -} +fn bench_cid_from_data(c: &mut Criterion) { + let mut group = c.benchmark_group("dfs_cid"); -fn bench_content_storage(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - let mut group = c.benchmark_group("storage"); - - // Test different content sizes - let sizes = vec![1024, 10240, 102400, 1048576]; // 1KB, 10KB, 100KB, 1MB - - for size in sizes { - group.benchmark_with_input( - BenchmarkId::new("put", size), - &size, - |b, &size| { - let dfs = rt.block_on(create_test_dfs()); - b.to_async(&rt).iter(|| async { - let data = vec![0u8; size]; - black_box(dfs.put(data).await.unwrap()) - }); - }, - ); + for size in [32usize, 1024, 10 * 1024, 100 * 1024] { + group.bench_with_input(BenchmarkId::new("from_data", size), &size, |b, &size| { + let data = vec![0u8; size]; + b.iter(|| black_box(ContentId::from_data(&data).unwrap())); + }); } - + group.finish(); } -fn bench_content_retrieval(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - let mut group = c.benchmark_group("retrieval"); - - let sizes = vec![1024, 10240, 102400, 1048576]; - - for size in sizes { - group.benchmark_with_input( - BenchmarkId::new("get", size), - &size, - |b, &size| { - let dfs = rt.block_on(create_test_dfs()); - let data = vec![0u8; size]; - let cid = rt.block_on(dfs.put(data)).unwrap(); - - b.to_async(&rt).iter(|| async { - black_box(dfs.get(&cid).await.unwrap()) - }); - }, - ); - } \ No newline at end of file +criterion_group!(benches, bench_put, bench_get, bench_has, bench_cid_from_data); +criterion_main!(benches); + diff --git a/crates/catalyst-network/benches/network_benchmarks.rs b/crates/catalyst-network/benches/network_benchmarks.rs index 2faa039..1667c96 100644 --- a/crates/catalyst-network/benches/network_benchmarks.rs +++ b/crates/catalyst-network/benches/network_benchmarks.rs @@ -1,558 +1,62 @@ -// ==================== benches/network_benchmarks.rs ==================== -//! Network performance benchmarks +//! Network performance benchmarks (message creation/serialization). +//! +//! Run with: +//! - `cargo bench -p catalyst-network` -use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId}; -use catalyst_network::{ - NetworkService, NetworkConfig, - messaging::{NetworkMessage, RoutingStrategy}, +use catalyst_network::messaging::{NetworkMessage, RoutingStrategy}; +use catalyst_utils::{ + network::{BasicMessage, MessageType}, + serialization::{CatalystDeserialize, CatalystSerialize}, }; -use catalyst_utils::network::PingMessage; -use libp2p::PeerId; -use tokio::runtime::Runtime; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; fn benchmark_message_creation(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - c.bench_function("message_creation", |b| { + c.bench_function("network_message_creation", |b| { b.iter(|| { - let ping = PingMessage { - timestamp: catalyst_utils::utils::current_timestamp(), - data: vec![0u8; 1024], + let msg = BasicMessage { + content: "hello".to_string(), + message_type: MessageType::ConsensusSync, }; - - let message = NetworkMessage::new( - black_box(&ping), - "sender".to_string(), - None, - RoutingStrategy::Broadcast, - ).unwrap(); - - black_box(message); + let nm = NetworkMessage::new(&msg, "sender".to_string(), None, RoutingStrategy::Broadcast) + .unwrap(); + black_box(nm); }); }); } fn benchmark_message_serialization(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - let mut group = c.benchmark_group("message_serialization"); - - for size in [100, 1000, 10000, 100000].iter() { - group.bench_with_input(BenchmarkId::new("serialize", size), size, |b, &size| { - let ping = PingMessage { - timestamp: catalyst_utils::utils::current_timestamp(), - data: vec![0u8; size], + let mut group = c.benchmark_group("network_message_serialization"); + for size in [32usize, 1024, 10 * 1024, 100 * 1024] { + group.bench_with_input(BenchmarkId::new("serialize", size), &size, |b, &size| { + let msg = BasicMessage { + content: "x".repeat(size), + message_type: MessageType::ConsensusSync, }; - - let message = NetworkMessage::new( - &ping, - "sender".to_string(), - None, - RoutingStrategy::Broadcast, - ).unwrap(); - + let nm = NetworkMessage::new(&msg, "sender".to_string(), None, RoutingStrategy::Broadcast) + .unwrap(); b.iter(|| { - let serialized = catalyst_utils::serialization::CatalystSerialize::serialize(black_box(&message)).unwrap(); - black_box(serialized); + let bytes = CatalystSerialize::serialize(black_box(&nm)).unwrap(); + black_box(bytes); }); }); - - group.bench_with_input(BenchmarkId::new("deserialize", size), size, |b, &size| { - let ping = PingMessage { - timestamp: catalyst_utils::utils::current_timestamp(), - data: vec![0u8; size], + + group.bench_with_input(BenchmarkId::new("deserialize", size), &size, |b, &size| { + let msg = BasicMessage { + content: "x".repeat(size), + message_type: MessageType::ConsensusSync, }; - - let message = NetworkMessage::new( - &ping, - "sender".to_string(), - None, - RoutingStrategy::Broadcast, - ).unwrap(); - - let serialized = catalyst_utils::serialization::CatalystSerialize::serialize(&message).unwrap(); - + let nm = NetworkMessage::new(&msg, "sender".to_string(), None, RoutingStrategy::Broadcast) + .unwrap(); + let bytes = CatalystSerialize::serialize(&nm).unwrap(); b.iter(|| { - let deserialized = catalyst_utils::serialization::CatalystDeserialize::deserialize(black_box(&serialized)).unwrap(); - black_box(deserialized); + let decoded = NetworkMessage::deserialize(black_box(&bytes)).unwrap(); + black_box(decoded); }); }); } - group.finish(); } -fn benchmark_peer_discovery(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - c.bench_function("peer_discovery_add", |b| { - b.to_async(&rt).iter(|| async { - let config = catalyst_network::config::DiscoveryConfig::default(); - let discovery = catalyst_network::discovery::PeerDiscovery::new(&config); - - let peer_id = PeerId::random(); - let addresses = vec!["/ip4/127.0.0.1/tcp/8080".parse().unwrap()]; - - let event = discovery.add_peer( - black_box(peer_id), - black_box(addresses), - catalyst_network::discovery::DiscoveryMethod::Manual, - ).await; - - black_box(event); - }); - }); -} - -fn benchmark_reputation_updates(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - c.bench_function("reputation_update", |b| { - b.to_async(&rt).iter(|| async { - let config = catalyst_network::config::ReputationConfig::default(); - let reputation = catalyst_network::reputation::ReputationManager::new(&config); - - let peer_id = PeerId::random(); - - reputation.update_peer_score( - black_box(peer_id), - black_box(5.0), - "benchmark test", - ).await; - }); - }); -} - -criterion_group!( - benches, - benchmark_message_creation, - benchmark_message_serialization, - benchmark_peer_discovery, - benchmark_reputation_updates -); +criterion_group!(benches, benchmark_message_creation, benchmark_message_serialization); criterion_main!(benches); -// ==================== tests/integration_tests.rs ==================== -//! Integration tests for catalyst-network - -use catalyst_network::{ - NetworkService, NetworkConfig, - messaging::{NetworkMessage, RoutingStrategy, MessageHandler}, - discovery::{PeerDiscovery, DiscoveryMethod}, - reputation::ReputationManager, -}; -use catalyst_utils::{ - network::PingMessage, - CatalystResult, -}; -use libp2p::PeerId; -use std::time::Duration; -use tokio::time::timeout; - -#[tokio::test] -async fn test_network_service_lifecycle() { - let config = NetworkConfig::default(); - let service = NetworkService::new(config).await.unwrap(); - - // Test initial state - assert_eq!(service.get_state().await, catalyst_network::service::ServiceState::Stopped); - - // Start service - service.start().await.unwrap(); - assert_eq!(service.get_state().await, catalyst_network::service::ServiceState::Running); - - // Stop service - service.stop().await.unwrap(); - assert_eq!(service.get_state().await, catalyst_network::service::ServiceState::Stopped); -} - -#[tokio::test] -async fn test_message_routing() { - use catalyst_network::messaging::{MessageRouter, PingHandler}; - - let router = MessageRouter::new(); - router.register_handler(PingHandler).await.unwrap(); - - let ping = PingMessage { - timestamp: catalyst_utils::utils::current_timestamp(), - data: Vec::new(), - }; - - let message = NetworkMessage::new( - &ping, - "sender".to_string(), - None, - RoutingStrategy::Broadcast, - ).unwrap(); - - let sender = PeerId::random(); - let responses = router.route_message(message, sender).await.unwrap(); - - // Should get a response from the ping handler - assert!(!responses.is_empty()); -} - -#[tokio::test] -async fn test_peer_discovery() { - let config = catalyst_network::config::DiscoveryConfig::default(); - let discovery = PeerDiscovery::new(&config); - - let peer_id = PeerId::random(); - let addresses = vec!["/ip4/127.0.0.1/tcp/8080".parse().unwrap()]; - - // Add peer - let event = discovery.add_peer( - peer_id, - addresses.clone(), - DiscoveryMethod::Manual, - ).await; - - // Verify event - match event { - catalyst_network::discovery::DiscoveryEvent::PeerDiscovered { peer_id: discovered_id, .. } => { - assert_eq!(discovered_id, peer_id); - } - _ => panic!("Expected PeerDiscovered event"), - } - - // Get peer - let peer_info = discovery.get_peer(&peer_id).await.unwrap(); - assert_eq!(peer_info.peer_id, peer_id); - assert!(!peer_info.addresses.is_empty()); -} - -#[tokio::test] -async fn test_reputation_management() { - let config = catalyst_network::config::ReputationConfig::default(); - let reputation = ReputationManager::new(&config); - - let peer_id = PeerId::random(); - - // Initial score should be default - let initial_score = reputation.get_peer_score(&peer_id).await; - assert_eq!(initial_score, config.initial_reputation); - - // Update score - reputation.update_peer_score(peer_id, 10.0, "test increase").await; - let updated_score = reputation.get_peer_score(&peer_id).await; - assert_eq!(updated_score, initial_score + 10.0); - - // Check connection decision - assert!(reputation.should_connect_to_peer(&peer_id).await); - - // Lower score below threshold - reputation.update_peer_score(peer_id, -100.0, "test decrease").await; - assert!(!reputation.should_connect_to_peer(&peer_id).await); -} - -#[tokio::test] -async fn test_bandwidth_management() { - let config = catalyst_network::config::BandwidthConfig::default(); - let bandwidth = catalyst_network::bandwidth::BandwidthManager::new(&config); - - // Test bandwidth checking - assert!(bandwidth.can_send(1024).await); - assert!(bandwidth.can_receive(1024).await); - - // Record usage - bandwidth.record_upload(1024).await; - bandwidth.record_download(2048).await; - - let usage = bandwidth.get_usage().await; - assert_eq!(usage.total_uploaded, 1024); - assert_eq!(usage.total_downloaded, 2048); -} - -#[tokio::test] -async fn test_security_validation() { - let config = catalyst_network::config::SecurityConfig::default(); - let security = catalyst_network::security::SecurityManager::new(&config); - - let peer_id = PeerId::random(); - let ip = "127.0.0.1".parse().unwrap(); - - // Should allow by default - assert!(security.validate_peer_connection(&peer_id, &ip).await.is_ok()); - - // Block peer - security.block_peer(peer_id).await; - - // Should now reject - assert!(security.validate_peer_connection(&peer_id, &ip).await.is_err()); -} - -#[tokio::test] -async fn test_health_monitoring() { - let config = catalyst_network::config::MonitoringConfig::default(); - let monitor = catalyst_network::monitoring::HealthMonitor::new(&config); - - // Initial health check - monitor.check_health().await.unwrap(); - - let health = monitor.get_health().await; - assert!(health.connection_health >= 0.0 && health.connection_health <= 1.0); - assert!(health.bandwidth_health >= 0.0 && health.bandwidth_health <= 1.0); - assert!(health.latency_health >= 0.0 && health.latency_health <= 1.0); -} - -#[tokio::test] -async fn test_gossip_protocol() { - let config = catalyst_network::config::GossipConfig::default(); - let gossip = catalyst_network::gossip::GossipProtocol::new(&config); - - let ping = PingMessage { - timestamp: catalyst_utils::utils::current_timestamp(), - data: vec![1, 2, 3], - }; - - let message = NetworkMessage::new( - &ping, - "sender".to_string(), - None, - RoutingStrategy::Gossip, - ).unwrap(); - - // Publish message - gossip.publish_message(message).await.unwrap(); - - let stats = gossip.get_stats().await; - assert_eq!(stats.messages_published, 1); -} - -#[tokio::test] -async fn test_network_configuration() { - let mut config = NetworkConfig::default(); - - // Should be valid by default - assert!(config.validate().is_ok()); - - // Make invalid - config.peer.min_peers = 100; - config.peer.max_peers = 50; - - // Should now be invalid - assert!(config.validate().is_err()); -} - -#[tokio::test] -async fn test_message_serialization_roundtrip() { - let ping = PingMessage { - timestamp: catalyst_utils::utils::current_timestamp(), - data: vec![1, 2, 3, 4, 5], - }; - - let original = NetworkMessage::new( - &ping, - "sender".to_string(), - Some("target".to_string()), - RoutingStrategy::Direct("target".to_string()), - ).unwrap(); - - // Serialize - let serialized = catalyst_utils::serialization::CatalystSerialize::serialize(&original).unwrap(); - - // Deserialize - let deserialized = catalyst_utils::serialization::CatalystDeserialize::deserialize(&serialized).unwrap(); - - // Compare - assert_eq!(original.routing.hop_count, deserialized.routing.hop_count); - assert_eq!(original.transport.size_bytes, deserialized.transport.size_bytes); - assert_eq!(original.message_type(), deserialized.message_type()); -} - -// Test helper for creating test networks -pub struct TestNetwork { - pub services: Vec, - pub configs: Vec, -} - -impl TestNetwork { - pub async fn new(node_count: usize) -> Self { - let mut services = Vec::new(); - let mut configs = Vec::new(); - - for i in 0..node_count { - let mut config = NetworkConfig::default(); - - // Use different ports for each node - config.peer.listen_addresses = vec![ - format!("/ip4/127.0.0.1/tcp/{}", 9000 + i).parse().unwrap(), - ]; - - let service = NetworkService::new(config.clone()).await.unwrap(); - - services.push(service); - configs.push(config); - } - - Self { services, configs } - } - - pub async fn start_all(&self) { - for service in &self.services { - service.start().await.unwrap(); - } - - // Wait for services to start - tokio::time::sleep(Duration::from_millis(100)).await; - } - - pub async fn stop_all(&self) { - for service in &self.services { - service.stop().await.unwrap(); - } - } - - pub async fn connect_peers(&self) { - // Connect each peer to the next one in a ring - for i in 0..self.services.len() { - let next = (i + 1) % self.services.len(); - - if let Some(addr) = self.configs[next].peer.listen_addresses.first() { - // Would connect services[i] to services[next] at addr - // This requires implementing peer connection in the test - } - } - } -} - -#[tokio::test] -async fn test_multi_node_network() { - let network = TestNetwork::new(3).await; - - // Start all nodes - network.start_all().await; - - // Connect peers - network.connect_peers().await; - - // Wait for connections to establish - tokio::time::sleep(Duration::from_millis(500)).await; - - // Test message propagation - let ping = PingMessage { - timestamp: catalyst_utils::utils::current_timestamp(), - data: vec![1, 2, 3], - }; - - // Send from first node - if let Err(e) = network.services[0].send_message(&ping, None).await { - println!("Message send failed: {}", e); - // This is expected in the test environment - } - - // Stop all nodes - network.stop_all().await; -} - -// ==================== tests/property_tests.rs ==================== -//! Property-based tests using proptest - -use proptest::prelude::*; -use catalyst_network::{ - messaging::{NetworkMessage, RoutingStrategy}, - discovery::{PeerDiscovery, DiscoveryMethod}, -}; -use catalyst_utils::network::PingMessage; -use libp2p::PeerId; - -proptest! { - #[test] - fn test_message_serialization_roundtrip( - data in prop::collection::vec(any::(), 0..10000), - timestamp in any::(), - ) { - let rt = tokio::runtime::Runtime::new().unwrap(); - - rt.block_on(async { - let ping = PingMessage { - timestamp, - data: data.clone(), - }; - - let original = NetworkMessage::new( - &ping, - "sender".to_string(), - None, - RoutingStrategy::Broadcast, - ).unwrap(); - - // Serialize and deserialize - let serialized = catalyst_utils::serialization::CatalystSerialize::serialize(&original).unwrap(); - let deserialized = catalyst_utils::serialization::CatalystDeserialize::deserialize(&serialized).unwrap(); - - // Properties that should hold - prop_assert_eq!(original.routing.hop_count, deserialized.routing.hop_count); - prop_assert_eq!(original.transport.size_bytes, deserialized.transport.size_bytes); - prop_assert_eq!(original.message_type(), deserialized.message_type()); - }); - } - - #[test] - fn test_peer_discovery_invariants( - peer_count in 1..100usize, - score_adjustments in prop::collection::vec(-10.0..10.0f64, 0..50), - ) { - let rt = tokio::runtime::Runtime::new().unwrap(); - - rt.block_on(async { - let config = catalyst_network::config::DiscoveryConfig::default(); - let discovery = PeerDiscovery::new(&config); - - let mut peer_ids = Vec::new(); - - // Add peers - for _ in 0..peer_count { - let peer_id = PeerId::random(); - let addresses = vec!["/ip4/127.0.0.1/tcp/8080".parse().unwrap()]; - - discovery.add_peer(peer_id, addresses, DiscoveryMethod::Manual).await; - peer_ids.push(peer_id); - } - - let peers = discovery.get_peers().await; - prop_assert_eq!(peers.len(), peer_count); - - // Apply score adjustments - for (i, &adjustment) in score_adjustments.iter().enumerate() { - if let Some(&peer_id) = peer_ids.get(i % peer_ids.len()) { - discovery.update_peer_score(&peer_id, adjustment).await; - } - } - - // Check that all peer scores are within valid range - let peers_after = discovery.get_peers().await; - for peer_info in peers_after.values() { - prop_assert!(peer_info.score >= 0.0); - prop_assert!(peer_info.score <= 100.0); - } - }); - } - - #[test] - fn test_reputation_score_bounds( - initial_score in 0.0..100.0f64, - adjustments in prop::collection::vec(-50.0..50.0f64, 0..100), - ) { - let rt = tokio::runtime::Runtime::new().unwrap(); - - rt.block_on(async { - let mut config = catalyst_network::config::ReputationConfig::default(); - config.initial_reputation = initial_score; - - let reputation = catalyst_network::reputation::ReputationManager::new(&config); - let peer_id = PeerId::random(); - - // Apply all adjustments - for adjustment in adjustments { - reputation.update_peer_score(peer_id, adjustment, "property test").await; - } - - let final_score = reputation.get_peer_score(&peer_id).await; - - // Score should always be within bounds - prop_assert!(final_score >= 0.0); - prop_assert!(final_score <= config.max_reputation); - }); - } -} \ No newline at end of file diff --git a/crates/catalyst-network/src/service.rs b/crates/catalyst-network/src/service.rs index bcbc57c..5917adf 100644 --- a/crates/catalyst-network/src/service.rs +++ b/crates/catalyst-network/src/service.rs @@ -11,7 +11,7 @@ use crate::{ }; use catalyst_utils::logging::*; -use catalyst_utils::network::MessageEnvelope; +use catalyst_utils::network::{decode_envelope_wire, encode_envelope_wire, EnvelopeWireError, MessageEnvelope}; use futures::StreamExt; use libp2p::{ @@ -40,6 +40,7 @@ use tokio::sync::{mpsc, Mutex, RwLock}; const MAX_GOSSIP_MESSAGE_BYTES: usize = 8 * 1024 * 1024; // 8 MiB hard cap per message const PER_PEER_MAX_MSGS_PER_SEC: u32 = 200; const PER_PEER_MAX_BYTES_PER_SEC: usize = 8 * 1024 * 1024; // 8 MiB/s per peer +const IDENTIFY_PROTOCOL_VERSION: &str = "catalyst/1"; #[derive(Debug, Clone)] struct PeerBudget { @@ -186,7 +187,7 @@ impl NetworkService { // Identify + Ping let identify = identify::Behaviour::new(identify::Config::new( - "catalyst/1.0".to_string(), + IDENTIFY_PROTOCOL_VERSION.to_string(), id_keys.public(), )); let ping = ping::Behaviour::new(ping::Config::new()); @@ -296,8 +297,18 @@ impl NetworkService { if !b.allow(now, message.data.len()) { continue; } - let env: MessageEnvelope = match bincode::deserialize(&message.data) { + let env: MessageEnvelope = match decode_envelope_wire(&message.data) { Ok(e) => e, + Err(EnvelopeWireError::UnsupportedVersion { got, local }) => { + log_warn!( + LogCategory::Network, + "Dropping message from {} due to unsupported envelope version (got={} local={})", + propagation_source, + got, + local + ); + continue; + } Err(_) => continue, }; { @@ -308,6 +319,21 @@ impl NetworkService { let _ = emit(&event_tx, NetworkEvent::MessageReceived { envelope: env, from: propagation_source }).await; } } + SwarmEvent::Behaviour(BehaviourEvent::Identify(e)) => { + if let identify::Event::Received { peer_id, info } = e { + let pv = info.protocol_version; + if pv != IDENTIFY_PROTOCOL_VERSION { + log_warn!( + LogCategory::Network, + "Disconnecting peer {}: incompatible protocol_version={} (local={})", + peer_id, + pv, + IDENTIFY_PROTOCOL_VERSION + ); + swarm.disconnect_peer_id(peer_id); + } + } + } SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => { swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); { @@ -388,7 +414,8 @@ impl NetworkService { } pub async fn broadcast_envelope(&self, envelope: &MessageEnvelope) -> NetworkResult<()> { - let bytes = bincode::serialize(envelope).map_err(|e| NetworkError::SerializationFailed(e.to_string()))?; + let bytes = encode_envelope_wire(envelope) + .map_err(|e| NetworkError::SerializationFailed(e.to_string()))?; let _ = self.cmd_tx.send(Cmd::Publish(bytes)); Ok(()) } diff --git a/crates/catalyst-storage/src/manager.rs b/crates/catalyst-storage/src/manager.rs index bab1f23..f9b56c4 100644 --- a/crates/catalyst-storage/src/manager.rs +++ b/crates/catalyst-storage/src/manager.rs @@ -69,6 +69,37 @@ impl StorageManager { // Run any pending migrations migration_manager.run_migrations().await?; + + // Record storage version marker (best-effort) to make upgrades easier to reason about. + // This is informational today (we don't hard-fail), but it helps operators detect when a + // data directory was created by a different storage version. + { + const META_STORAGE_VERSION: &str = "storage:version"; + let key = state_keys::metadata_key(META_STORAGE_VERSION); + let existing = engine + .get("metadata", &key) + .ok() + .flatten() + .and_then(|b| { + if b.len() != 4 { + return None; + } + let mut arr = [0u8; 4]; + arr.copy_from_slice(&b); + Some(u32::from_le_bytes(arr)) + }); + if let Some(v) = existing { + if v != crate::STORAGE_VERSION { + log_warn!( + LogCategory::Storage, + "Storage version mismatch (db={} code={}); consider snapshot restore if you see unexpected behavior", + v, + crate::STORAGE_VERSION + ); + } + } + let _ = engine.put("metadata", &key, &crate::STORAGE_VERSION.to_le_bytes()); + } // Create transaction semaphore let transaction_semaphore = Arc::new(Semaphore::new( diff --git a/crates/catalyst-utils/Cargo.toml b/crates/catalyst-utils/Cargo.toml index d668c49..2a90ffd 100644 --- a/crates/catalyst-utils/Cargo.toml +++ b/crates/catalyst-utils/Cargo.toml @@ -16,6 +16,7 @@ async-trait = "0.1" # Serialization serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +bincode = { workspace = true } # Utilities hex = "0.4" rand = "0.8" diff --git a/crates/catalyst-utils/src/network.rs b/crates/catalyst-utils/src/network.rs index 1fc0413..46e8684 100644 --- a/crates/catalyst-utils/src/network.rs +++ b/crates/catalyst-utils/src/network.rs @@ -56,6 +56,60 @@ pub trait NetworkMessage: Send + Sync + Clone { /// for all `NetworkMessage` implementations unless overridden. pub const PROTOCOL_VERSION: u32 = 1; +/// Magic prefix for the canonical `MessageEnvelope` wire encoding. +/// +/// This helps nodes reject random/invalid payloads cleanly and allows versioned decoding. +pub const ENVELOPE_WIRE_MAGIC: [u8; 4] = *b"CENV"; + +/// Versioned wire encoding for `MessageEnvelope`. +/// +/// Layout: +/// - magic: 4 bytes (`ENVELOPE_WIRE_MAGIC`) +/// - version: u32 LE (currently `PROTOCOL_VERSION`) +/// - payload: `bincode(MessageEnvelope)` +pub fn encode_envelope_wire(envelope: &MessageEnvelope) -> CatalystResult> { + let payload = bincode::serialize(envelope) + .map_err(|e| CatalystError::Serialization(format!("envelope encode failed: {e}")))?; + + let mut out = Vec::with_capacity(ENVELOPE_WIRE_MAGIC.len() + 4 + payload.len()); + out.extend_from_slice(&ENVELOPE_WIRE_MAGIC); + out.extend_from_slice(&PROTOCOL_VERSION.to_le_bytes()); + out.extend_from_slice(&payload); + Ok(out) +} + +#[derive(Debug, thiserror::Error)] +pub enum EnvelopeWireError { + #[error("wire bytes too short (len={len})")] + TooShort { len: usize }, + #[error("invalid envelope magic")] + BadMagic, + #[error("unsupported envelope version (got={got}, local={local})")] + UnsupportedVersion { got: u32, local: u32 }, + #[error("envelope decode failed: {0}")] + DecodeFailed(String), +} + +pub fn decode_envelope_wire(bytes: &[u8]) -> Result { + if bytes.len() < 8 { + return Err(EnvelopeWireError::TooShort { len: bytes.len() }); + } + if bytes[0..4] != ENVELOPE_WIRE_MAGIC { + return Err(EnvelopeWireError::BadMagic); + } + let mut ver = [0u8; 4]; + ver.copy_from_slice(&bytes[4..8]); + let got = u32::from_le_bytes(ver); + if got != PROTOCOL_VERSION { + return Err(EnvelopeWireError::UnsupportedVersion { + got, + local: PROTOCOL_VERSION, + }); + } + bincode::deserialize(&bytes[8..]) + .map_err(|e| EnvelopeWireError::DecodeFailed(e.to_string())) +} + /// Message types used in the Catalyst network protocol #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum MessageType { @@ -569,6 +623,44 @@ mod tests { assert_eq!(envelope.payload, payload); assert!(!envelope.is_expired()); } + + #[test] + fn test_envelope_wire_roundtrip() { + let env = MessageEnvelope::new( + MessageType::ConsensusSync, + "node_a".to_string(), + None, + b"hello".to_vec(), + ); + let wire = encode_envelope_wire(&env).unwrap(); + let decoded = decode_envelope_wire(&wire).unwrap(); + assert_eq!(decoded.message_type, env.message_type); + assert_eq!(decoded.sender, env.sender); + assert_eq!(decoded.target, env.target); + assert_eq!(decoded.payload, env.payload); + assert_eq!(decoded.version, PROTOCOL_VERSION); + } + + #[test] + fn test_envelope_wire_version_mismatch_rejected() { + let env = MessageEnvelope::new( + MessageType::ConsensusSync, + "node_a".to_string(), + None, + b"hello".to_vec(), + ); + let mut wire = encode_envelope_wire(&env).unwrap(); + // Overwrite version with an unsupported one. + wire[4..8].copy_from_slice(&(PROTOCOL_VERSION + 1).to_le_bytes()); + let err = decode_envelope_wire(&wire).unwrap_err(); + match err { + EnvelopeWireError::UnsupportedVersion { got, local } => { + assert_eq!(got, PROTOCOL_VERSION + 1); + assert_eq!(local, PROTOCOL_VERSION); + } + other => panic!("unexpected error: {other:?}"), + } + } #[test] fn test_routing_info() { diff --git a/docs/node-operator-guide.md b/docs/node-operator-guide.md index a5314d4..4dd0486 100644 --- a/docs/node-operator-guide.md +++ b/docs/node-operator-guide.md @@ -232,6 +232,75 @@ mv /var/lib/catalyst//data "/var/lib/catalyst//data.old.$ts" Then start EU first, then the other validators. +## Storage growth / history pruning (recommended for long-running networks) + +By default, nodes retain historical block + transaction metadata indefinitely (useful for explorers), +which means disk usage grows without bound over time. + +For “regular validator nodes” on long-running public networks, it’s recommended to run in **pruned** +mode and keep only a sliding window of recent history. This does **not** affect current balances or +EVM state (authenticated state lives in the RocksDB `accounts` column family); it only prunes old +RPC/indexer metadata such as: +- `consensus:lsu:*` per-cycle history +- `tx:*` per-cycle tx indices and raw/meta blobs + +Example config (keep last 7 days at 1 cycle/sec): + +```toml +[storage] +history_prune_enabled = true +history_keep_cycles = 604800 # 7 * 24 * 60 * 60 +history_prune_interval_seconds = 300 # run at most every 5 minutes +history_prune_batch_cycles = 1000 # prune up to 1000 cycles per run +``` + +Notes: +- If you run a public explorer/indexer, keep at least one **archival** node (set `history_prune_enabled = false`). +- In pruned mode, very old `catalyst_getBlockByNumber` / `catalyst_getTransactionByHash` calls may return `null`. + +### Useful maintenance commands + +Inspect local DB growth hot-spots (key counts per column family + RocksDB stats): + +```bash +./target/release/catalyst-cli db-stats --data-dir /var/lib/catalyst//data +``` + +Force a maintenance pass (flush + manual compaction + snapshot cleanup): + +```bash +./target/release/catalyst-cli db-maintenance --data-dir /var/lib/catalyst//data +``` + +## Upgrades, backups, and rollback safety + +For long-running public networks, treat upgrades as potentially stateful operations. + +Recommended workflow: + +- **Before upgrading**: + - take a snapshot backup: + +```bash +./target/release/catalyst-cli db-backup \ + --data-dir /var/lib/catalyst//data \ + --out-dir "/var/lib/catalyst//backup.$(date +%s)" +``` + +- **If an upgrade goes wrong**: + - stop the service + - restore from the backup directory: + +```bash +./target/release/catalyst-cli db-restore \ + --data-dir /var/lib/catalyst//data \ + --from-dir "/var/lib/catalyst//backup." +``` + +Notes: +- The storage layer records an informational on-disk version marker (`storage:version`) to help detect mismatches across upgrades. +- For public networks, prefer snapshot-based recovery rather than ad-hoc manual edits to the DB directory. + ## Troubleshooting ### “Insufficient data collected: got 1, required 2” diff --git a/docs/protocol-params.md b/docs/protocol-params.md new file mode 100644 index 0000000..abb19bf --- /dev/null +++ b/docs/protocol-params.md @@ -0,0 +1,45 @@ +# Protocol parameters (safety + governance notes) + +This document describes how Catalyst parameters are currently configured and validated, and what rules operators should follow when changing them. + +## Parameter sources + +Today, parameters come from three places: + +- **Compile-time defaults**: `NodeConfig::default()` in `crates/catalyst-cli/src/config.rs`. +- **Config file**: `catalyst.toml` (or `--config `), parsed into `NodeConfig`. +- **CLI flags**: e.g. `catalyst-cli start --validator --rpc ...`, which override config fields at runtime. + +There is no on-chain governance or parameter voting in this implementation snapshot. + +## Safety rule: chain identity is immutable + +These identify a chain: +- `protocol.chain_id` +- `protocol.network_id` +- `protocol.genesis_hash` (persisted at genesis) + +Changing `protocol.chain_id` or `protocol.network_id` for a running network creates a **new chain**. Do not change them except for intentional resets. + +## Validation and safe bounds + +On startup, the node validates the config via `NodeConfig::validate()` (called from `catalyst-cli start`). + +Examples of enforced bounds: +- `protocol.chain_id > 0` +- `protocol.network_id` non-empty +- `network.max_peers >= network.min_peers` +- `network.listen_addresses` non-empty +- `consensus.cycle_duration_seconds >= 10` +- `consensus.min_producer_count >= 1` +- `consensus.max_producer_count >= consensus.min_producer_count` +- pruning knobs are validated when enabled (e.g. non-zero interval and batch size) + +If validation fails, the node refuses to start with an actionable error. + +## Recommended operational policy + +- **Public networks**: treat changes to consensus parameters as coordinated upgrades. +- **Defaults**: keep conservative defaults, and use explicit overrides only when measured needs justify them. +- **Rollback**: before changing parameters, take a DB snapshot backup (see `docs/node-operator-guide.md`). + diff --git a/docs/release-process.md b/docs/release-process.md new file mode 100644 index 0000000..79c16bb --- /dev/null +++ b/docs/release-process.md @@ -0,0 +1,71 @@ +# Release process (checksums + compatibility gates) + +This project uses Git tags and GitHub releases to produce deployable artifacts for operators. + +## Goals + +- Provide **checksummed** binaries for deployment. +- Ensure protocol/wire compatibility changes are **intentional** and reviewed. +- Keep upgrades operationally safe via snapshot backup/restore workflows. + +## Versioning policy (current) + +- **Tags**: `vX.Y.Z` (project release version). +- **Binary version**: reported by `catalyst_version` and `catalyst-cli --version`. +- **Protocol changes**: + - must be accompanied by updated docs (e.g. `docs/wallet-interop.md`) + - must update golden vectors (see below) in the same PR, with rationale in the PR description. + +## CI + +GitHub Actions runs: + +- `cargo test --workspace --locked` + +See `.github/workflows/ci.yml`. + +## Wire-format compatibility gate + +`crates/catalyst-core/tests/wire_vectors.rs` contains golden vectors for: +- `CTX2` wire encoding (`encode_wire_tx_v2`) +- `tx_id_v2` +- v2 signing payload (`CATALYST_SIG_V2`) + +Any change to canonical serialization will fail CI until the vectors are updated intentionally. + +## Network wire versioning (MessageEnvelope) + +`MessageEnvelope` is encoded on the network using a **versioned** wrapper: + +- **magic**: `CENV` +- **version**: `PROTOCOL_VERSION` (u32 LE) +- **payload**: `bincode(MessageEnvelope)` + +Nodes reject unknown versions, and libp2p identify advertises a matching protocol string (`catalyst/1`). + +Version bump policy (current): +- **Bump `PROTOCOL_VERSION`** only for intentionally breaking, network-wide changes to envelope decoding. +- Keep the identify protocol string in sync with the major version (`catalyst/`). + +## Creating a release + +1) Ensure `main` is green and merged. +2) Create and push a tag: + +```bash +git tag -a vX.Y.Z -m "vX.Y.Z" +git push origin vX.Y.Z +``` + +3) The release workflow builds `catalyst-cli` and uploads: +- `catalyst-cli-vX.Y.Z-x86_64-unknown-linux-gnu.tar.gz` +- `catalyst-cli-vX.Y.Z-x86_64-unknown-linux-gnu.tar.gz.sha256` + +See `.github/workflows/release.yml`. + +## Upgrade / rollback safety + +For production upgrades, always take a snapshot backup first and be prepared to restore. + +See `docs/node-operator-guide.md` for `db-backup` / `db-restore` workflows. +