Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 74 additions & 13 deletions crates/catalyst-network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ const PER_PEER_MAX_MSGS_PER_SEC: u32 = 200;
const PER_PEER_MAX_BYTES_PER_SEC: usize = 8 * 1024 * 1024; // 8 MiB/s per peer
const IDENTIFY_PROTOCOL_VERSION: &str = "catalyst/1";

#[derive(Debug, Clone)]
struct DialBackoff {
attempts: u32,
next_at: Instant,
}

impl DialBackoff {
fn can_attempt(&self, now: Instant) -> bool {
now >= self.next_at
}
}

#[derive(Debug, Clone)]
struct PeerBudget {
window_start: Instant,
Expand Down Expand Up @@ -166,7 +178,7 @@ impl NetworkService {
// Gossipsub
let gossipsub_config = gossipsub::ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Permissive)
.heartbeat_interval(Duration::from_secs(1))
.heartbeat_interval(config.gossip.heartbeat_interval)
.build()
.map_err(|e| NetworkError::ConfigError(e.to_string()))?;

Expand All @@ -176,7 +188,7 @@ impl NetworkService {
)
.map_err(|e| NetworkError::ConfigError(e.to_string()))?;

let topic = gossipsub::IdentTopic::new("catalyst/envelope/1");
let topic = gossipsub::IdentTopic::new(config.gossip.topic_name.clone());
gossipsub
.subscribe(&topic)
.map_err(|e| NetworkError::ConfigError(e.to_string()))?;
Expand Down Expand Up @@ -248,23 +260,53 @@ impl NetworkService {
let peer_conns = self.peer_conns.clone();
let topic = self.topic.clone();

// Bootstrap dials (best-effort).
let bootstrap_addrs: Vec<Multiaddr> = self
.config
.peer
.bootstrap_peers
.iter()
.map(|(_pid, addr)| addr.clone())
.collect();
for addr in bootstrap_addrs {
let _ = swarm.dial(addr);
}
// Bootstrap dial manager (WAN-hardening): retry with backoff+jitter until we meet `min_peers`.
let bootstrap: Vec<(PeerId, Multiaddr)> = self.config.peer.bootstrap_peers.clone();
let min_peers = self.config.peer.min_peers;
let max_attempts = self.config.peer.max_retry_attempts;
let base_backoff = self.config.peer.retry_backoff;
let mut bootstrap_tick = tokio::time::interval(self.config.discovery.bootstrap_interval);
bootstrap_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut dial_backoff: HashMap<PeerId, DialBackoff> = HashMap::new();
let mut incompatible: HashSet<PeerId> = HashSet::new();

let handle = tokio::spawn(async move {
let start = Instant::now();
let mut budgets: HashMap<PeerId, PeerBudget> = HashMap::new();
loop {
tokio::select! {
_ = bootstrap_tick.tick() => {
let connected = peer_conns.read().await.len();
if connected >= min_peers {
continue;
}
let now = Instant::now();
for (pid, addr) in &bootstrap {
if incompatible.contains(pid) {
continue;
}
if let Some(st) = dial_backoff.get(pid) {
if !st.can_attempt(now) {
continue;
}
if max_attempts > 0 && st.attempts >= max_attempts {
continue;
}
}

// Schedule next attempt before dialing to avoid tight loops.
let attempts = dial_backoff.get(pid).map(|s| s.attempts).unwrap_or(0) + 1;
let backoff = compute_backoff(base_backoff, attempts).unwrap_or(base_backoff);
let jitter = jitter_ms(pid, attempts);
dial_backoff.insert(*pid, DialBackoff {
attempts,
next_at: now + backoff + Duration::from_millis(jitter),
});

swarm.behaviour_mut().gossipsub.add_explicit_peer(pid);
let _ = swarm.dial(addr.clone());
}
}
ev = swarm.select_next_some() => {
match ev {
SwarmEvent::Behaviour(BehaviourEvent::Mdns(e)) => match e {
Expand Down Expand Up @@ -330,12 +372,14 @@ impl NetworkService {
pv,
IDENTIFY_PROTOCOL_VERSION
);
incompatible.insert(peer_id);
swarm.disconnect_peer_id(peer_id);
}
}
}
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
dial_backoff.remove(&peer_id);
{
let mut m = peer_conns.write().await;
*m.entry(peer_id).or_insert(0) += 1;
Expand Down Expand Up @@ -427,6 +471,23 @@ impl NetworkService {
}
}

fn compute_backoff(base: Duration, attempts: u32) -> Option<Duration> {
let pow = attempts.saturating_sub(1).min(10);
let mult = 1u64.checked_shl(pow)?;
let ms = base.as_millis().saturating_mul(mult as u128);
let ms = ms.min(60_000);
Some(Duration::from_millis(ms as u64))
}

fn jitter_ms(peer_id: &PeerId, attempts: u32) -> u64 {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
peer_id.hash(&mut h);
attempts.hash(&mut h);
let v = h.finish();
(v % 250) as u64
}

fn load_or_generate_keypair(path: &Path) -> NetworkResult<identity::Keypair> {
if let Ok(bytes) = std::fs::read(path) {
if let Ok(kp) = identity::Keypair::from_protobuf_encoding(&bytes) {
Expand Down
Loading