Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ All notable changes to this project will be documented in this file.

- Activator
- Fix duplicate tunnel underlay pairs after restart by registering `device.public_ip` as in-use for legacy users with unset `tunnel_endpoint` during allocation reload
- Add flex-algo topology event handler: when a `TopologyInfo` account is created or updated, calls `BackfillTopology` on all devices with an activated VPNv4 loopback; enabled via `--enable-flex-algo` flag (default: off)
- Client
- Rank devices and tunnel endpoints by minimum observed latency (`min_latency_ns`) instead of average when selecting a connection target, preferring paths with the best achievable round-trip time
- Tools
Expand All @@ -44,6 +45,8 @@ All notable changes to this project will be documented in this file.
- SDK
- Deserialize `agent_version` and `agent_commit` from device latency samples in Go, TypeScript, and Python SDKs
- Add `BGPStatus` type (Unknown/Up/Down) and `SetUserBGPStatus` executor instruction to the Go serviceability SDK
- Python serviceability SDK deserializes `TopologyConstraint`, `TopologyInfo`, and `FlexAlgoNodeSegment`; reads `flex_algo_node_segments` from V2 interface accounts
- TypeScript serviceability SDK deserializes `FlexAlgoNodeSegment` from V2 interface accounts; adds configurable request timeout to the RPC client
- Smartcontract
- Add `agent_version` (`[u8; 16]`) and `agent_commit` (`[u8; 8]`) fields to `DeviceLatencySamplesHeader`, carved from the existing reserved region; accept both fields in the `InitializeDeviceLatencySamples` instruction via incremental deserialization (fully backward compatible)
- Implement `SetUserBGPStatus` processor: validates metrics publisher authorization, updates `bgp_status`, `last_bgp_reported_at`, and `last_bgp_up_at` fields on the user account
Expand Down
22 changes: 16 additions & 6 deletions activator/src/activator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub async fn run_activator(
websocket_url: Option<String>,
program_id: Option<String>,
keypair: Option<PathBuf>,
enable_flex_algo: bool,
) -> eyre::Result<()> {
let client = create_client(rpc_url, websocket_url, program_id, keypair)?;

Expand All @@ -34,13 +35,20 @@ pub async fn run_activator(

let use_onchain_allocation = read_onchain_allocation_flag(client.as_ref())?;

run_activator_with_client(client, async_client_factory, use_onchain_allocation).await
run_activator_with_client(
client,
async_client_factory,
use_onchain_allocation,
enable_flex_algo,
)
.await
}

async fn run_activator_with_client<C, F, R, A>(
client: Arc<C>,
async_client_factory: F,
use_onchain_allocation: bool,
enable_flex_algo: bool,
) -> eyre::Result<()>
where
C: DoubleZeroClient + Send + Sync + 'static,
Expand All @@ -49,15 +57,16 @@ where
A: AsyncDoubleZeroClient + Send + Sync + 'static,
{
if use_onchain_allocation {
run_activator_stateless(client, async_client_factory).await
run_activator_stateless(client, async_client_factory, enable_flex_algo).await
} else {
run_activator_stateful(client, async_client_factory).await
run_activator_stateful(client, async_client_factory, enable_flex_algo).await
}
}

async fn run_activator_stateful<C, F, R, A>(
client: Arc<C>,
async_client_factory: F,
enable_flex_algo: bool,
) -> eyre::Result<()>
where
C: DoubleZeroClient + Send + Sync + 'static,
Expand All @@ -69,7 +78,7 @@ where
info!("Activator handler loop started (stateful mode)");

let (tx, rx) = mpsc::channel(128);
let mut processor = Processor::new(rx, client.clone())?;
let mut processor = Processor::new(rx, client.clone(), enable_flex_algo)?;

let shutdown = Arc::new(AtomicBool::new(false));

Expand Down Expand Up @@ -105,6 +114,7 @@ where
async fn run_activator_stateless<C, F, R, A>(
client: Arc<C>,
async_client_factory: F,
enable_flex_algo: bool,
) -> eyre::Result<()>
where
C: DoubleZeroClient + Send + Sync + 'static,
Expand All @@ -116,7 +126,7 @@ where
info!("Activator handler loop started stateless mode (onchain allocation)");

let (tx, rx) = mpsc::channel(128);
let mut processor = ProcessorStateless::new(rx, client.clone())?;
let mut processor = ProcessorStateless::new(rx, client.clone(), enable_flex_algo)?;

let shutdown = Arc::new(AtomicBool::new(false));

Expand Down Expand Up @@ -489,7 +499,7 @@ mod tests {
libc::raise(libc::SIGTERM);
}
});
let result = run_activator_with_client(client, client_factory, false).await;
let result = run_activator_with_client(client, client_factory, false, false).await;
assert!(result.is_ok());
}
}
6 changes: 6 additions & 0 deletions activator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ struct AppArgs {

#[arg(long, default_value = "info")]
log_level: String,

/// Enable flex-algo topology automation: automatically backfill FlexAlgoNodeSegments
/// when new topologies are created or Vpnv4 loopbacks are activated.
#[arg(long, default_value_t = false)]
enable_flex_algo: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -80,6 +85,7 @@ async fn main() -> eyre::Result<()> {
Some(ws_url.clone()),
Some(program_id.clone()),
Some(keypair.clone()),
args.enable_flex_algo,
)
.await?;

Expand Down
41 changes: 35 additions & 6 deletions activator/src/process/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub fn process_device_event(
device: &Device,
segment_routing_ids: &mut IDAllocator,
link_ips: &mut IPBlockAllocator,
enable_flex_algo: bool,
) {
match device.status {
DeviceStatus::Pending => {
Expand Down Expand Up @@ -58,7 +59,12 @@ pub fn process_device_event(
info!("{log_msg}");
}
DeviceStatus::DeviceProvisioning | DeviceStatus::LinkProvisioning => {
let mut mgr = InterfaceMgr::new(client, Some(segment_routing_ids), link_ips);
let mut mgr = InterfaceMgr::new(
client,
Some(segment_routing_ids),
link_ips,
enable_flex_algo,
);
mgr.process_device_interfaces(pubkey, device);

match devices.entry(*pubkey) {
Expand All @@ -76,7 +82,12 @@ pub fn process_device_event(
}
}
DeviceStatus::Activated => {
let mut mgr = InterfaceMgr::new(client, Some(segment_routing_ids), link_ips);
let mut mgr = InterfaceMgr::new(
client,
Some(segment_routing_ids),
link_ips,
enable_flex_algo,
);
mgr.process_device_interfaces(pubkey, device);

match devices.entry(*pubkey) {
Expand Down Expand Up @@ -167,6 +178,7 @@ pub fn process_device_event_stateless(
pubkey: &Pubkey,
devices: &mut DeviceMapStateless,
device: &Device,
enable_flex_algo: bool,
) {
match device.status {
DeviceStatus::Pending => {
Expand Down Expand Up @@ -200,7 +212,7 @@ pub fn process_device_event_stateless(
info!("{log_msg}");
}
DeviceStatus::DeviceProvisioning | DeviceStatus::LinkProvisioning => {
let mgr = InterfaceMgrStateless::new(client);
let mgr = InterfaceMgrStateless::new(client, enable_flex_algo);
mgr.process_device_interfaces(pubkey, device);

match devices.entry(*pubkey) {
Expand All @@ -215,7 +227,7 @@ pub fn process_device_event_stateless(
}
}
DeviceStatus::Activated => {
let mgr = InterfaceMgrStateless::new(client);
let mgr = InterfaceMgrStateless::new(client, enable_flex_algo);
mgr.process_device_interfaces(pubkey, device);

match devices.entry(*pubkey) {
Expand Down Expand Up @@ -396,6 +408,7 @@ mod tests {
&device,
&mut segment_ids,
&mut ip_block_allocator,
false,
);

assert!(devices.contains_key(&device_pubkey));
Expand Down Expand Up @@ -447,6 +460,7 @@ mod tests {
&device,
&mut segment_ids,
&mut ip_block_allocator,
false,
);

device.status = DeviceStatus::Deleting;
Expand Down Expand Up @@ -538,6 +552,7 @@ mod tests {
&device,
&mut segment_ids,
&mut ip_block_allocator,
false,
);
assert!(!devices.contains_key(&device_pubkey));

Expand Down Expand Up @@ -669,6 +684,7 @@ mod tests {
&device,
&mut segment_ids,
&mut ip_block_allocator,
false,
);

assert!(devices.contains_key(&pubkey));
Expand Down Expand Up @@ -697,6 +713,7 @@ mod tests {
&device,
&mut segment_ids,
&mut ip_block_allocator,
false,
);

assert!(devices.contains_key(&pubkey));
Expand Down Expand Up @@ -769,7 +786,13 @@ mod tests {
)
.returning(|_, _| Ok(Signature::new_unique()));

super::process_device_event_stateless(&client, &device_pubkey, &mut devices, &device);
super::process_device_event_stateless(
&client,
&device_pubkey,
&mut devices,
&device,
false,
);

assert!(devices.contains_key(&device_pubkey));

Expand Down Expand Up @@ -896,7 +919,13 @@ mod tests {
)
.returning(|_, _| Ok(Signature::new_unique()));

super::process_device_event_stateless(&client, &device_pubkey, &mut devices, &device);
super::process_device_event_stateless(
&client,
&device_pubkey,
&mut devices,
&device,
false,
);

assert!(!devices.contains_key(&device_pubkey));

Expand Down
58 changes: 47 additions & 11 deletions activator/src/process/iface_mgr.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{idallocator::IDAllocator, ipblockallocator::IPBlockAllocator};
use crate::{
idallocator::IDAllocator, ipblockallocator::IPBlockAllocator,
process::topology::backfill_all_topologies_for_device,
};
use doublezero_program_common::types::NetworkV4;
use doublezero_sdk::{
commands::device::interface::{
Expand All @@ -15,11 +18,15 @@ use solana_sdk::pubkey::Pubkey;
/// Does not use local allocators - all allocation is handled by the smart contract.
pub struct InterfaceMgrStateless<'a> {
client: &'a dyn DoubleZeroClient,
enable_flex_algo: bool,
}

impl<'a> InterfaceMgrStateless<'a> {
pub fn new(client: &'a dyn DoubleZeroClient) -> Self {
Self { client }
pub fn new(client: &'a dyn DoubleZeroClient, enable_flex_algo: bool) -> Self {
Self {
client,
enable_flex_algo,
}
}

/// Process all interfaces for a device based on their current state
Expand Down Expand Up @@ -67,13 +74,19 @@ impl<'a> InterfaceMgrStateless<'a> {
device: &Device,
iface: &CurrentInterfaceVersion,
) {
self.activate(
let activated = self.activate(
device_pubkey,
&device.code,
&iface.name,
&NetworkV4::default(),
0,
);

// Gap 2: after activating a Vpnv4 loopback, backfill all existing topologies
// so this device receives FlexAlgoNodeSegments for every topology.
if activated && self.enable_flex_algo && iface.loopback_type == LoopbackType::Vpnv4 {
backfill_all_topologies_for_device(self.client, device_pubkey);
}
}

/// Handle interface deletion (stateless mode - no local deallocation)
Expand All @@ -94,7 +107,7 @@ impl<'a> InterfaceMgrStateless<'a> {
name: &str,
ip_net: &NetworkV4,
node_segment_idx: u16,
) {
) -> bool {
let cmd = ActivateDeviceInterfaceCommand {
pubkey: *pubkey,
name: name.to_string(),
Expand All @@ -103,8 +116,15 @@ impl<'a> InterfaceMgrStateless<'a> {
use_onchain_allocation: true,
};

if let Err(e) = cmd.execute(self.client) {
error!("Failed to activate interface {name} on {context}: {e}");
match cmd.execute(self.client) {
Ok(signature) => {
info!("Activated interface {name} on {context}: {signature}");
true
}
Err(e) => {
error!("Failed to activate interface {name} on {context}: {e}");
false
}
}
}

Expand Down Expand Up @@ -147,18 +167,21 @@ pub struct InterfaceMgr<'a> {
// Optional because it's not required for process_link_event
segment_routing_ids: Option<&'a mut IDAllocator>,
link_ips: &'a mut IPBlockAllocator,
enable_flex_algo: bool,
}

impl<'a> InterfaceMgr<'a> {
pub fn new(
client: &'a dyn DoubleZeroClient,
segment_routing_ids: Option<&'a mut IDAllocator>,
link_ips: &'a mut IPBlockAllocator,
enable_flex_algo: bool,
) -> Self {
Self {
client,
segment_routing_ids,
link_ips,
enable_flex_algo,
}
}

Expand Down Expand Up @@ -247,13 +270,19 @@ impl<'a> InterfaceMgr<'a> {
}

// Activate with allocated resources
self.activate(
let activated = self.activate(
device_pubkey,
&device.code,
&iface.name,
&iface.ip_net,
iface.node_segment_idx,
);

// Gap 2: after activating a Vpnv4 loopback, backfill all existing topologies
// so this device receives FlexAlgoNodeSegments for every topology.
if activated && self.enable_flex_algo && iface.loopback_type == LoopbackType::Vpnv4 {
backfill_all_topologies_for_device(self.client, device_pubkey);
}
}

/// Handle interface deletion and resource cleanup
Expand Down Expand Up @@ -292,7 +321,7 @@ impl<'a> InterfaceMgr<'a> {
name: &str,
ip_net: &NetworkV4,
node_segment_idx: u16,
) {
) -> bool {
let cmd = ActivateDeviceInterfaceCommand {
pubkey: *pubkey,
name: name.to_string(),
Expand All @@ -301,8 +330,15 @@ impl<'a> InterfaceMgr<'a> {
use_onchain_allocation: false,
};

if let Err(e) = cmd.execute(self.client) {
error!("Failed to activate interface {name} on {context}: {e}");
match cmd.execute(self.client) {
Ok(signature) => {
info!("Activated interface {name} on {context}: {signature}");
true
}
Err(e) => {
error!("Failed to activate interface {name} on {context}: {e}");
false
}
}
}

Expand Down
Loading
Loading