From ff2d4bc8bf7423056f3b4314ff255ba90fc9ef68 Mon Sep 17 00:00:00 2001 From: TheNewAutonomy Date: Thu, 26 Feb 2026 08:55:46 +0000 Subject: [PATCH] p2p: harden WAN joins with dial backoff and rebroadcast Add a bootstrap dial manager with exponential backoff + jitter to maintain min peer connectivity. Also add multi-hop rebroadcast + dedup to the simple TCP transport and align it with the versioned envelope wire wrapper. Made-with: Cursor --- crates/catalyst-network/src/service.rs | 87 ++++++++-- crates/catalyst-network/src/simple.rs | 215 +++++++++++++++++++++---- 2 files changed, 262 insertions(+), 40 deletions(-) diff --git a/crates/catalyst-network/src/service.rs b/crates/catalyst-network/src/service.rs index 5917adf..2d0a18d 100644 --- a/crates/catalyst-network/src/service.rs +++ b/crates/catalyst-network/src/service.rs @@ -42,6 +42,18 @@ const PER_PEER_MAX_MSGS_PER_SEC: u32 = 200; const PER_PEER_MAX_BYTES_PER_SEC: usize = 8 * 1024 * 1024; // 8 MiB/s per peer const IDENTIFY_PROTOCOL_VERSION: &str = "catalyst/1"; +#[derive(Debug, Clone)] +struct DialBackoff { + attempts: u32, + next_at: Instant, +} + +impl DialBackoff { + fn can_attempt(&self, now: Instant) -> bool { + now >= self.next_at + } +} + #[derive(Debug, Clone)] struct PeerBudget { window_start: Instant, @@ -166,7 +178,7 @@ impl NetworkService { // Gossipsub let gossipsub_config = gossipsub::ConfigBuilder::default() .validation_mode(gossipsub::ValidationMode::Permissive) - .heartbeat_interval(Duration::from_secs(1)) + .heartbeat_interval(config.gossip.heartbeat_interval) .build() .map_err(|e| NetworkError::ConfigError(e.to_string()))?; @@ -176,7 +188,7 @@ impl NetworkService { ) .map_err(|e| NetworkError::ConfigError(e.to_string()))?; - let topic = gossipsub::IdentTopic::new("catalyst/envelope/1"); + let topic = gossipsub::IdentTopic::new(config.gossip.topic_name.clone()); gossipsub .subscribe(&topic) .map_err(|e| NetworkError::ConfigError(e.to_string()))?; @@ -248,23 +260,53 @@ impl NetworkService { let peer_conns = self.peer_conns.clone(); let topic = self.topic.clone(); - // Bootstrap dials (best-effort). - let bootstrap_addrs: Vec = self - .config - .peer - .bootstrap_peers - .iter() - .map(|(_pid, addr)| addr.clone()) - .collect(); - for addr in bootstrap_addrs { - let _ = swarm.dial(addr); - } + // Bootstrap dial manager (WAN-hardening): retry with backoff+jitter until we meet `min_peers`. + let bootstrap: Vec<(PeerId, Multiaddr)> = self.config.peer.bootstrap_peers.clone(); + let min_peers = self.config.peer.min_peers; + let max_attempts = self.config.peer.max_retry_attempts; + let base_backoff = self.config.peer.retry_backoff; + let mut bootstrap_tick = tokio::time::interval(self.config.discovery.bootstrap_interval); + bootstrap_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let mut dial_backoff: HashMap = HashMap::new(); + let mut incompatible: HashSet = HashSet::new(); let handle = tokio::spawn(async move { let start = Instant::now(); let mut budgets: HashMap = HashMap::new(); loop { tokio::select! { + _ = bootstrap_tick.tick() => { + let connected = peer_conns.read().await.len(); + if connected >= min_peers { + continue; + } + let now = Instant::now(); + for (pid, addr) in &bootstrap { + if incompatible.contains(pid) { + continue; + } + if let Some(st) = dial_backoff.get(pid) { + if !st.can_attempt(now) { + continue; + } + if max_attempts > 0 && st.attempts >= max_attempts { + continue; + } + } + + // Schedule next attempt before dialing to avoid tight loops. + let attempts = dial_backoff.get(pid).map(|s| s.attempts).unwrap_or(0) + 1; + let backoff = compute_backoff(base_backoff, attempts).unwrap_or(base_backoff); + let jitter = jitter_ms(pid, attempts); + dial_backoff.insert(*pid, DialBackoff { + attempts, + next_at: now + backoff + Duration::from_millis(jitter), + }); + + swarm.behaviour_mut().gossipsub.add_explicit_peer(pid); + let _ = swarm.dial(addr.clone()); + } + } ev = swarm.select_next_some() => { match ev { SwarmEvent::Behaviour(BehaviourEvent::Mdns(e)) => match e { @@ -330,12 +372,14 @@ impl NetworkService { pv, IDENTIFY_PROTOCOL_VERSION ); + incompatible.insert(peer_id); swarm.disconnect_peer_id(peer_id); } } } SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => { swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + dial_backoff.remove(&peer_id); { let mut m = peer_conns.write().await; *m.entry(peer_id).or_insert(0) += 1; @@ -427,6 +471,23 @@ impl NetworkService { } } +fn compute_backoff(base: Duration, attempts: u32) -> Option { + let pow = attempts.saturating_sub(1).min(10); + let mult = 1u64.checked_shl(pow)?; + let ms = base.as_millis().saturating_mul(mult as u128); + let ms = ms.min(60_000); + Some(Duration::from_millis(ms as u64)) +} + +fn jitter_ms(peer_id: &PeerId, attempts: u32) -> u64 { + use std::hash::{Hash, Hasher}; + let mut h = std::collections::hash_map::DefaultHasher::new(); + peer_id.hash(&mut h); + attempts.hash(&mut h); + let v = h.finish(); + (v % 250) as u64 +} + fn load_or_generate_keypair(path: &Path) -> NetworkResult { if let Ok(bytes) = std::fs::read(path) { if let Ok(kp) = identity::Keypair::from_protobuf_encoding(&bytes) { diff --git a/crates/catalyst-network/src/simple.rs b/crates/catalyst-network/src/simple.rs index 83edb1c..9b27374 100644 --- a/crates/catalyst-network/src/simple.rs +++ b/crates/catalyst-network/src/simple.rs @@ -12,11 +12,13 @@ use crate::config::NetworkConfig; use crate::error::{NetworkError, NetworkResult}; use catalyst_utils::logging::*; -use catalyst_utils::network::MessageEnvelope; +use catalyst_utils::network::{ + decode_envelope_wire, encode_envelope_wire, EnvelopeWireError, MessageEnvelope, RoutingInfo, +}; use futures::{SinkExt, StreamExt}; use libp2p::Multiaddr; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; @@ -71,17 +73,27 @@ pub struct NetworkService { stats: Arc>, event_tx: Arc>>>, tasks: Arc>>>, + local_id: String, + seen: Arc>>, } impl NetworkService { pub async fn new(config: NetworkConfig) -> NetworkResult { config.validate()?; + let local_id = config + .peer + .listen_addresses + .get(0) + .map(|a| a.to_string()) + .unwrap_or_else(|| "unknown".to_string()); Ok(Self { config, peers: Arc::new(Mutex::new(HashMap::new())), stats: Arc::new(RwLock::new(NetworkStats::default())), event_tx: Arc::new(RwLock::new(Vec::new())), tasks: Arc::new(Mutex::new(Vec::new())), + local_id, + seen: Arc::new(Mutex::new(HashMap::new())), }) } @@ -115,28 +127,12 @@ impl NetworkService { self.tasks.lock().await.push(handle); } - // Dial bootstrap peers (best-effort). - for (_peer_id, addr) in &self.config.peer.bootstrap_peers { - if let Some(socket) = multiaddr_to_socketaddr(addr) { - let svc = self.clone(); - let handle = tokio::spawn(async move { - match TcpStream::connect(socket).await { - Ok(stream) => { - let _ = svc.emit(NetworkEvent::PeerConnected { addr: socket }).await; - svc.spawn_connection(socket, stream).await; - } - Err(e) => { - let _ = svc.emit(NetworkEvent::Error { - error: NetworkError::Timeout { duration: std::time::Duration::from_secs(0) }, - }) - .await; - log_warn!(LogCategory::Network, "Failed to connect to bootstrap {}: {}", socket, e); - } - } - }); - self.tasks.lock().await.push(handle); - } - } + // Dial bootstrap peers with retry/backoff/jitter until we meet `min_peers`. + let svc = self.clone(); + let handle = tokio::spawn(async move { + svc.bootstrap_dial_loop().await; + }); + self.tasks.lock().await.push(handle); Ok(()) } @@ -180,7 +176,7 @@ impl NetworkService { /// Broadcast a message envelope to all connected peers. pub async fn broadcast_envelope(&self, envelope: &MessageEnvelope) -> NetworkResult<()> { let bytes = - bincode::serialize(envelope).map_err(|e| NetworkError::SerializationFailed(e.to_string()))?; + encode_envelope_wire(envelope).map_err(|e| NetworkError::SerializationFailed(e.to_string()))?; let peers = self.peers.lock().await; for (_addr, tx) in peers.iter() { @@ -197,6 +193,9 @@ impl NetworkService { let peers = self.peers.clone(); let stats = self.stats.clone(); let svc = self.clone(); + let local_id = self.local_id.clone(); + let seen = self.seen.clone(); + let dedup_window = self.config.gossip.duplicate_detection_window; let (out_tx, mut out_rx) = mpsc::unbounded_channel::>(); { @@ -231,11 +230,31 @@ impl NetworkService { if !budget.allow(now, bytes.len()) { continue; } - let env: MessageEnvelope = match bincode::deserialize(&bytes) { + let env: MessageEnvelope = match decode_envelope_wire(&bytes) { Ok(e) => e, + Err(EnvelopeWireError::UnsupportedVersion { got, local }) => { + log_warn!( + LogCategory::Network, + "Dropping frame from {} due to unsupported envelope version (got={} local={})", + peer_addr, + got, + local + ); + continue; + } Err(_) => continue, }; + // Dedup by envelope id within a sliding window. + { + let mut s = seen.lock().await; + s.retain(|_, t| now.duration_since(*t) <= dedup_window); + if s.contains_key(&env.id) { + continue; + } + s.insert(env.id.clone(), now); + } + { let mut st = stats.write().await; st.messages_received += 1; @@ -243,10 +262,28 @@ impl NetworkService { let _ = svc .emit(NetworkEvent::MessageReceived { - envelope: env, + envelope: env.clone(), from: peer_addr, }) .await; + + // Multi-hop rebroadcast: forward broadcast envelopes to all peers except sender + // with hop/loop limits. + if env.target.is_none() && should_forward(&env, &local_id) { + if let Some(fwd) = forwarded(env, &local_id) { + let bytes = match encode_envelope_wire(&fwd) { + Ok(b) => b, + Err(_) => continue, + }; + let peers = peers.lock().await; + for (addr, tx) in peers.iter() { + if *addr == peer_addr { + continue; + } + let _ = tx.send(bytes.clone()); + } + } + } } { @@ -262,6 +299,78 @@ impl NetworkService { self.tasks.lock().await.push(reader); } + async fn bootstrap_dial_loop(&self) { + let bootstrap: Vec = self + .config + .peer + .bootstrap_peers + .iter() + .filter_map(|(_peer_id, addr)| multiaddr_to_socketaddr(addr)) + .collect(); + + if bootstrap.is_empty() { + return; + } + + let mut backoff: HashMap = HashMap::new(); + let mut incompatible: HashSet = HashSet::new(); + + let base = self.config.peer.retry_backoff; + let max_attempts = self.config.peer.max_retry_attempts; + let mut tick = tokio::time::interval(self.config.discovery.bootstrap_interval); + tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + tick.tick().await; + + let connected = self.peers.lock().await.len(); + if connected >= self.config.peer.min_peers { + continue; + } + + for addr in &bootstrap { + if incompatible.contains(addr) { + continue; + } + + let now = std::time::Instant::now(); + let (attempts, next_at) = backoff.get(addr).cloned().unwrap_or((0, now)); + if now < next_at { + continue; + } + if max_attempts > 0 && attempts >= max_attempts { + continue; + } + + let socket = *addr; + let svc = self.clone(); + let attempts_next = attempts.saturating_add(1); + let delay = compute_backoff(base, attempts_next).unwrap_or(base); + let jitter = jitter_ms(socket, attempts_next); + backoff.insert( + *addr, + ( + attempts_next, + now + delay + std::time::Duration::from_millis(jitter), + ), + ); + + let handle = tokio::spawn(async move { + match TcpStream::connect(socket).await { + Ok(stream) => { + let _ = svc.emit(NetworkEvent::PeerConnected { addr: socket }).await; + svc.spawn_connection(socket, stream).await; + } + Err(e) => { + log_warn!(LogCategory::Network, "bootstrap dial failed {}: {}", socket, e); + } + } + }); + self.tasks.lock().await.push(handle); + } + } + } + async fn emit(&self, event: NetworkEvent) -> NetworkResult<()> { let senders = self.event_tx.read().await; for tx in senders.iter() { @@ -279,8 +388,60 @@ impl Clone for NetworkService { stats: self.stats.clone(), event_tx: self.event_tx.clone(), tasks: self.tasks.clone(), + local_id: self.local_id.clone(), + seen: self.seen.clone(), + } + } +} + +fn should_forward(env: &MessageEnvelope, local_id: &str) -> bool { + if env.is_expired() { + return false; + } + if let Some(r) = &env.routing_info { + if r.hop_count >= r.max_hops { + return false; + } + if r.visited_nodes.iter().any(|v| v == local_id) { + return false; } } + true +} + +fn forwarded(mut env: MessageEnvelope, local_id: &str) -> Option { + if env.is_expired() { + return None; + } + let mut r = env.routing_info.take().unwrap_or_else(|| RoutingInfo::new(10)); + if r.hop_count >= r.max_hops { + return None; + } + if r.visited_nodes.iter().any(|v| v == local_id) { + return None; + } + r.visited_nodes.push(local_id.to_string()); + r.hop_count = r.hop_count.saturating_add(1); + env.routing_info = Some(r); + Some(env) +} + +fn compute_backoff(base: std::time::Duration, attempts: u32) -> Option { + // base * 2^(attempts-1), clamped to 60s + let pow = attempts.saturating_sub(1).min(10); // 2^10 = 1024x + let mult = 1u64.checked_shl(pow)?; + let ms = base.as_millis().saturating_mul(mult as u128); + let ms = ms.min(60_000); + Some(std::time::Duration::from_millis(ms as u64)) +} + +fn jitter_ms(addr: SocketAddr, attempts: u32) -> u64 { + use std::hash::{Hash, Hasher}; + let mut h = std::collections::hash_map::DefaultHasher::new(); + addr.hash(&mut h); + attempts.hash(&mut h); + let v = h.finish(); + (v % 250) as u64 } fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Option {