diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ddc87f136..2132e4feef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ All notable changes to this project will be documented in this file. - Activator - Fix duplicate tunnel underlay pairs after restart by registering `device.public_ip` as in-use for legacy users with unset `tunnel_endpoint` during allocation reload + - Add flex-algo topology event handler: when a `TopologyInfo` account is created or updated, calls `BackfillTopology` on all devices with an activated VPNv4 loopback; enabled via `--enable-flex-algo` flag (default: off) - Client - Rank devices and tunnel endpoints by minimum observed latency (`min_latency_ns`) instead of average when selecting a connection target, preferring paths with the best achievable round-trip time - Tools @@ -51,6 +52,8 @@ All notable changes to this project will be documented in this file. - SDK - Deserialize `agent_version` and `agent_commit` from device latency samples in Go, TypeScript, and Python SDKs - Add `BGPStatus` type (Unknown/Up/Down) and `SetUserBGPStatus` executor instruction to the Go serviceability SDK + - Python serviceability SDK deserializes `TopologyConstraint`, `TopologyInfo`, and `FlexAlgoNodeSegment`; reads `flex_algo_node_segments` from V2 interface accounts + - TypeScript serviceability SDK deserializes `FlexAlgoNodeSegment` from V2 interface accounts; adds configurable request timeout to the RPC client - Smartcontract - Add `agent_version` (`[u8; 16]`) and `agent_commit` (`[u8; 8]`) fields to `DeviceLatencySamplesHeader`, carved from the existing reserved region; accept both fields in the `InitializeDeviceLatencySamples` instruction via incremental deserialization (fully backward compatible) - Implement `SetUserBGPStatus` processor: validates metrics publisher authorization, updates `bgp_status`, `last_bgp_reported_at`, and `last_bgp_up_at` fields on the user account diff --git a/activator/src/activator.rs b/activator/src/activator.rs index 4bbcc93bc6..e41cbefb90 100644 --- a/activator/src/activator.rs +++ b/activator/src/activator.rs @@ -21,6 +21,7 @@ pub async fn run_activator( websocket_url: Option, program_id: Option, keypair: Option, + enable_flex_algo: bool, ) -> eyre::Result<()> { let client = create_client(rpc_url, websocket_url, program_id, keypair)?; @@ -34,13 +35,20 @@ pub async fn run_activator( let use_onchain_allocation = read_onchain_allocation_flag(client.as_ref())?; - run_activator_with_client(client, async_client_factory, use_onchain_allocation).await + run_activator_with_client( + client, + async_client_factory, + use_onchain_allocation, + enable_flex_algo, + ) + .await } async fn run_activator_with_client( client: Arc, async_client_factory: F, use_onchain_allocation: bool, + enable_flex_algo: bool, ) -> eyre::Result<()> where C: DoubleZeroClient + Send + Sync + 'static, @@ -49,15 +57,16 @@ where A: AsyncDoubleZeroClient + Send + Sync + 'static, { if use_onchain_allocation { - run_activator_stateless(client, async_client_factory).await + run_activator_stateless(client, async_client_factory, enable_flex_algo).await } else { - run_activator_stateful(client, async_client_factory).await + run_activator_stateful(client, async_client_factory, enable_flex_algo).await } } async fn run_activator_stateful( client: Arc, async_client_factory: F, + enable_flex_algo: bool, ) -> eyre::Result<()> where C: DoubleZeroClient + Send + Sync + 'static, @@ -69,7 +78,7 @@ where info!("Activator handler loop started (stateful mode)"); let (tx, rx) = mpsc::channel(128); - let mut processor = Processor::new(rx, client.clone())?; + let mut processor = Processor::new(rx, client.clone(), enable_flex_algo)?; let shutdown = Arc::new(AtomicBool::new(false)); @@ -105,6 +114,7 @@ where async fn run_activator_stateless( client: Arc, async_client_factory: F, + enable_flex_algo: bool, ) -> eyre::Result<()> where C: DoubleZeroClient + Send + Sync + 'static, @@ -116,7 +126,7 @@ where info!("Activator handler loop started stateless mode (onchain allocation)"); let (tx, rx) = mpsc::channel(128); - let mut processor = ProcessorStateless::new(rx, client.clone())?; + let mut processor = ProcessorStateless::new(rx, client.clone(), enable_flex_algo)?; let shutdown = Arc::new(AtomicBool::new(false)); @@ -489,7 +499,7 @@ mod tests { libc::raise(libc::SIGTERM); } }); - let result = run_activator_with_client(client, client_factory, false).await; + let result = run_activator_with_client(client, client_factory, false, false).await; assert!(result.is_ok()); } } diff --git a/activator/src/main.rs b/activator/src/main.rs index 3879918596..23eced2b55 100644 --- a/activator/src/main.rs +++ b/activator/src/main.rs @@ -40,6 +40,11 @@ struct AppArgs { #[arg(long, default_value = "info")] log_level: String, + + /// Enable flex-algo topology automation: automatically backfill FlexAlgoNodeSegments + /// when new topologies are created or Vpnv4 loopbacks are activated. + #[arg(long, default_value_t = false)] + enable_flex_algo: bool, } #[tokio::main] @@ -80,6 +85,7 @@ async fn main() -> eyre::Result<()> { Some(ws_url.clone()), Some(program_id.clone()), Some(keypair.clone()), + args.enable_flex_algo, ) .await?; diff --git a/activator/src/process/device.rs b/activator/src/process/device.rs index 1dcb4addcf..fd653b19dc 100644 --- a/activator/src/process/device.rs +++ b/activator/src/process/device.rs @@ -25,6 +25,7 @@ pub fn process_device_event( device: &Device, segment_routing_ids: &mut IDAllocator, link_ips: &mut IPBlockAllocator, + enable_flex_algo: bool, ) { match device.status { DeviceStatus::Pending => { @@ -58,7 +59,12 @@ pub fn process_device_event( info!("{log_msg}"); } DeviceStatus::DeviceProvisioning | DeviceStatus::LinkProvisioning => { - let mut mgr = InterfaceMgr::new(client, Some(segment_routing_ids), link_ips); + let mut mgr = InterfaceMgr::new( + client, + Some(segment_routing_ids), + link_ips, + enable_flex_algo, + ); mgr.process_device_interfaces(pubkey, device); match devices.entry(*pubkey) { @@ -76,7 +82,12 @@ pub fn process_device_event( } } DeviceStatus::Activated => { - let mut mgr = InterfaceMgr::new(client, Some(segment_routing_ids), link_ips); + let mut mgr = InterfaceMgr::new( + client, + Some(segment_routing_ids), + link_ips, + enable_flex_algo, + ); mgr.process_device_interfaces(pubkey, device); match devices.entry(*pubkey) { @@ -167,6 +178,7 @@ pub fn process_device_event_stateless( pubkey: &Pubkey, devices: &mut DeviceMapStateless, device: &Device, + enable_flex_algo: bool, ) { match device.status { DeviceStatus::Pending => { @@ -200,7 +212,7 @@ pub fn process_device_event_stateless( info!("{log_msg}"); } DeviceStatus::DeviceProvisioning | DeviceStatus::LinkProvisioning => { - let mgr = InterfaceMgrStateless::new(client); + let mgr = InterfaceMgrStateless::new(client, enable_flex_algo); mgr.process_device_interfaces(pubkey, device); match devices.entry(*pubkey) { @@ -215,7 +227,7 @@ pub fn process_device_event_stateless( } } DeviceStatus::Activated => { - let mgr = InterfaceMgrStateless::new(client); + let mgr = InterfaceMgrStateless::new(client, enable_flex_algo); mgr.process_device_interfaces(pubkey, device); match devices.entry(*pubkey) { @@ -396,6 +408,7 @@ mod tests { &device, &mut segment_ids, &mut ip_block_allocator, + false, ); assert!(devices.contains_key(&device_pubkey)); @@ -447,6 +460,7 @@ mod tests { &device, &mut segment_ids, &mut ip_block_allocator, + false, ); device.status = DeviceStatus::Deleting; @@ -538,6 +552,7 @@ mod tests { &device, &mut segment_ids, &mut ip_block_allocator, + false, ); assert!(!devices.contains_key(&device_pubkey)); @@ -669,6 +684,7 @@ mod tests { &device, &mut segment_ids, &mut ip_block_allocator, + false, ); assert!(devices.contains_key(&pubkey)); @@ -697,6 +713,7 @@ mod tests { &device, &mut segment_ids, &mut ip_block_allocator, + false, ); assert!(devices.contains_key(&pubkey)); @@ -769,7 +786,13 @@ mod tests { ) .returning(|_, _| Ok(Signature::new_unique())); - super::process_device_event_stateless(&client, &device_pubkey, &mut devices, &device); + super::process_device_event_stateless( + &client, + &device_pubkey, + &mut devices, + &device, + false, + ); assert!(devices.contains_key(&device_pubkey)); @@ -896,7 +919,13 @@ mod tests { ) .returning(|_, _| Ok(Signature::new_unique())); - super::process_device_event_stateless(&client, &device_pubkey, &mut devices, &device); + super::process_device_event_stateless( + &client, + &device_pubkey, + &mut devices, + &device, + false, + ); assert!(!devices.contains_key(&device_pubkey)); diff --git a/activator/src/process/iface_mgr.rs b/activator/src/process/iface_mgr.rs index 02d0f1733c..5888b2411e 100644 --- a/activator/src/process/iface_mgr.rs +++ b/activator/src/process/iface_mgr.rs @@ -1,4 +1,7 @@ -use crate::{idallocator::IDAllocator, ipblockallocator::IPBlockAllocator}; +use crate::{ + idallocator::IDAllocator, ipblockallocator::IPBlockAllocator, + process::topology::backfill_all_topologies_for_device, +}; use doublezero_program_common::types::NetworkV4; use doublezero_sdk::{ commands::device::interface::{ @@ -15,11 +18,15 @@ use solana_sdk::pubkey::Pubkey; /// Does not use local allocators - all allocation is handled by the smart contract. pub struct InterfaceMgrStateless<'a> { client: &'a dyn DoubleZeroClient, + enable_flex_algo: bool, } impl<'a> InterfaceMgrStateless<'a> { - pub fn new(client: &'a dyn DoubleZeroClient) -> Self { - Self { client } + pub fn new(client: &'a dyn DoubleZeroClient, enable_flex_algo: bool) -> Self { + Self { + client, + enable_flex_algo, + } } /// Process all interfaces for a device based on their current state @@ -67,13 +74,19 @@ impl<'a> InterfaceMgrStateless<'a> { device: &Device, iface: &CurrentInterfaceVersion, ) { - self.activate( + let activated = self.activate( device_pubkey, &device.code, &iface.name, &NetworkV4::default(), 0, ); + + // Gap 2: after activating a Vpnv4 loopback, backfill all existing topologies + // so this device receives FlexAlgoNodeSegments for every topology. + if activated && self.enable_flex_algo && iface.loopback_type == LoopbackType::Vpnv4 { + backfill_all_topologies_for_device(self.client, device_pubkey); + } } /// Handle interface deletion (stateless mode - no local deallocation) @@ -94,7 +107,7 @@ impl<'a> InterfaceMgrStateless<'a> { name: &str, ip_net: &NetworkV4, node_segment_idx: u16, - ) { + ) -> bool { let cmd = ActivateDeviceInterfaceCommand { pubkey: *pubkey, name: name.to_string(), @@ -103,8 +116,15 @@ impl<'a> InterfaceMgrStateless<'a> { use_onchain_allocation: true, }; - if let Err(e) = cmd.execute(self.client) { - error!("Failed to activate interface {name} on {context}: {e}"); + match cmd.execute(self.client) { + Ok(signature) => { + info!("Activated interface {name} on {context}: {signature}"); + true + } + Err(e) => { + error!("Failed to activate interface {name} on {context}: {e}"); + false + } } } @@ -147,6 +167,7 @@ pub struct InterfaceMgr<'a> { // Optional because it's not required for process_link_event segment_routing_ids: Option<&'a mut IDAllocator>, link_ips: &'a mut IPBlockAllocator, + enable_flex_algo: bool, } impl<'a> InterfaceMgr<'a> { @@ -154,11 +175,13 @@ impl<'a> InterfaceMgr<'a> { client: &'a dyn DoubleZeroClient, segment_routing_ids: Option<&'a mut IDAllocator>, link_ips: &'a mut IPBlockAllocator, + enable_flex_algo: bool, ) -> Self { Self { client, segment_routing_ids, link_ips, + enable_flex_algo, } } @@ -247,13 +270,19 @@ impl<'a> InterfaceMgr<'a> { } // Activate with allocated resources - self.activate( + let activated = self.activate( device_pubkey, &device.code, &iface.name, &iface.ip_net, iface.node_segment_idx, ); + + // Gap 2: after activating a Vpnv4 loopback, backfill all existing topologies + // so this device receives FlexAlgoNodeSegments for every topology. + if activated && self.enable_flex_algo && iface.loopback_type == LoopbackType::Vpnv4 { + backfill_all_topologies_for_device(self.client, device_pubkey); + } } /// Handle interface deletion and resource cleanup @@ -292,7 +321,7 @@ impl<'a> InterfaceMgr<'a> { name: &str, ip_net: &NetworkV4, node_segment_idx: u16, - ) { + ) -> bool { let cmd = ActivateDeviceInterfaceCommand { pubkey: *pubkey, name: name.to_string(), @@ -301,8 +330,15 @@ impl<'a> InterfaceMgr<'a> { use_onchain_allocation: false, }; - if let Err(e) = cmd.execute(self.client) { - error!("Failed to activate interface {name} on {context}: {e}"); + match cmd.execute(self.client) { + Ok(signature) => { + info!("Activated interface {name} on {context}: {signature}"); + true + } + Err(e) => { + error!("Failed to activate interface {name} on {context}: {e}"); + false + } } } diff --git a/activator/src/process/mod.rs b/activator/src/process/mod.rs index 634eea5f84..79bec5a62b 100644 --- a/activator/src/process/mod.rs +++ b/activator/src/process/mod.rs @@ -5,4 +5,5 @@ pub mod iface_mgr; pub mod link; pub mod location; pub mod multicastgroup; +pub mod topology; pub mod user; diff --git a/activator/src/process/topology.rs b/activator/src/process/topology.rs new file mode 100644 index 0000000000..acdf60dd82 --- /dev/null +++ b/activator/src/process/topology.rs @@ -0,0 +1,107 @@ +use doublezero_sdk::{ + commands::{ + device::list::ListDeviceCommand, + topology::{backfill::BackfillTopologyCommand, list::ListTopologyCommand}, + }, + DeviceStatus, DoubleZeroClient, InterfaceStatus, InterfaceType, LoopbackType, +}; +use log::{error, info}; +use solana_sdk::pubkey::Pubkey; + +/// Handle a topology account event by backfilling all devices that have activated +/// Vpnv4 loopbacks but do not yet have a FlexAlgoNodeSegment for this topology. +/// +/// Called when a new topology is created or updated. The BackfillTopology instruction +/// is idempotent: it skips devices that already have a segment for this topology. +pub fn process_topology_event(client: &dyn DoubleZeroClient, topology_name: &str) { + info!("Event:Topology name={topology_name} — backfilling devices"); + + let devices = match ListDeviceCommand.execute(client) { + Ok(d) => d, + Err(e) => { + error!("Failed to list devices for topology backfill: {e}"); + return; + } + }; + + let device_pubkeys: Vec = devices + .iter() + .filter(|(_, d)| { + matches!( + d.status, + DeviceStatus::Activated + | DeviceStatus::DeviceProvisioning + | DeviceStatus::LinkProvisioning + | DeviceStatus::Drained + ) + }) + .filter(|(_, d)| { + d.interfaces.iter().any(|iface| { + let iface = iface.into_current_version(); + iface.interface_type == InterfaceType::Loopback + && iface.loopback_type == LoopbackType::Vpnv4 + && iface.status == InterfaceStatus::Activated + }) + }) + .map(|(pk, _)| *pk) + .collect(); + + if device_pubkeys.is_empty() { + info!("No eligible devices found for topology backfill (topology={topology_name})"); + return; + } + + info!( + "Backfilling {} device(s) for topology={topology_name}", + device_pubkeys.len() + ); + + let cmd = BackfillTopologyCommand { + name: topology_name.to_string(), + device_pubkeys, + }; + + match cmd.execute(client) { + Ok(sig) => info!("BackfillTopology({topology_name}) succeeded: {sig}"), + Err(e) => error!("BackfillTopology({topology_name}) failed: {e}"), + } +} + +/// Backfill all known topologies for a single device, called after a Vpnv4 loopback +/// is activated on that device. The BackfillTopology instruction is idempotent. +pub fn backfill_all_topologies_for_device(client: &dyn DoubleZeroClient, device_pubkey: &Pubkey) { + let topologies = match ListTopologyCommand.execute(client) { + Ok(t) => t, + Err(e) => { + error!("Failed to list topologies for device backfill: {e}"); + return; + } + }; + + if topologies.is_empty() { + return; + } + + for topology in topologies.values() { + info!( + "Backfilling topology={} for device={}", + topology.name, device_pubkey + ); + + let cmd = BackfillTopologyCommand { + name: topology.name.clone(), + device_pubkeys: vec![*device_pubkey], + }; + + match cmd.execute(client) { + Ok(sig) => info!( + "BackfillTopology({}) for device {} succeeded: {sig}", + topology.name, device_pubkey + ), + Err(e) => error!( + "BackfillTopology({}) for device {} failed: {e}", + topology.name, device_pubkey + ), + } + } +} diff --git a/activator/src/processor.rs b/activator/src/processor.rs index e6ec3395b1..864b3d00c5 100644 --- a/activator/src/processor.rs +++ b/activator/src/processor.rs @@ -8,6 +8,7 @@ use crate::{ link::{process_link_event, process_link_event_stateless}, location::process_location_event, multicastgroup::{process_multicastgroup_event, process_multicastgroup_event_stateless}, + topology::process_topology_event, user::{process_user_event, process_user_event_stateless}, }, states::devicestate::{DeviceState, DeviceStateStateless}, @@ -54,6 +55,7 @@ pub struct Processor { locations: LocationMap, exchanges: ExchangeMap, multicastgroups: MulticastGroupMap, + enable_flex_algo: bool, } /// Stateless processor for onchain allocation mode. @@ -63,6 +65,7 @@ pub struct ProcessorStateless { client: Arc, devices: DeviceMapStateless, multicastgroups: MulticastGroupMap, + enable_flex_algo: bool, } /// Reserve segment routing IDs and loopback IPs for devices that have active allocations. @@ -184,6 +187,7 @@ impl Processor { pub fn new( rx: mpsc::Receiver<(Box, Box)>, client: Arc, + enable_flex_algo: bool, ) -> eyre::Result { let builder = ExponentialBuilder::new() .with_max_times(5) @@ -269,6 +273,7 @@ impl Processor { locations, exchanges, multicastgroups: HashMap::new(), + enable_flex_algo, }) } @@ -294,6 +299,7 @@ impl Processor { device, &mut self.segment_routing_ids, &mut self.link_ips, + self.enable_flex_algo, ); } AccountData::Link(link) => { @@ -346,6 +352,11 @@ impl Processor { error!("Error processing access pass event: {e}"); }); } + AccountData::Topology(topology) => { + if self.enable_flex_algo { + process_topology_event(self.client.as_ref(), &topology.name); + } + } _ => {} }; metrics::counter!("doublezero_activator_event_handled").increment(1); @@ -356,6 +367,7 @@ impl ProcessorStateless { pub fn new( rx: mpsc::Receiver<(Box, Box)>, client: Arc, + enable_flex_algo: bool, ) -> eyre::Result { let builder = ExponentialBuilder::new() .with_max_times(5) @@ -393,6 +405,7 @@ impl ProcessorStateless { client, devices: device_map, multicastgroups: HashMap::new(), + enable_flex_algo, }) } @@ -416,6 +429,7 @@ impl ProcessorStateless { pubkey, &mut self.devices, device, + self.enable_flex_algo, ); } AccountData::Link(link) => { @@ -445,6 +459,11 @@ impl ProcessorStateless { error!("Error processing access pass event: {e}"); }); } + AccountData::Topology(topology) => { + if self.enable_flex_algo { + process_topology_event(self.client.as_ref(), &topology.name); + } + } _ => {} }; metrics::counter!("doublezero_activator_event_handled").increment(1); @@ -497,7 +516,7 @@ mod tests { config.device_tunnel_block = NetworkV4::default(); let client = mock_client_with_config(config); let (_tx, rx) = mpsc::channel(1); - let result = Processor::new(rx, client); + let result = Processor::new(rx, client, false); let err = result.err().expect("expected error").to_string(); assert!(err.contains("device_tunnel_block"), "error was: {err}"); } @@ -508,7 +527,7 @@ mod tests { config.user_tunnel_block = NetworkV4::default(); let client = mock_client_with_config(config); let (_tx, rx) = mpsc::channel(1); - let result = Processor::new(rx, client); + let result = Processor::new(rx, client, false); let err = result.err().expect("expected error").to_string(); assert!(err.contains("user_tunnel_block"), "error was: {err}"); } @@ -519,7 +538,7 @@ mod tests { config.multicastgroup_block = NetworkV4::default(); let client = mock_client_with_config(config); let (_tx, rx) = mpsc::channel(1); - let result = Processor::new(rx, client); + let result = Processor::new(rx, client, false); let err = result.err().expect("expected error").to_string(); assert!(err.contains("multicastgroup_block"), "error was: {err}"); } @@ -530,7 +549,7 @@ mod tests { config.multicast_publisher_block = NetworkV4::default(); let client = mock_client_with_config(config); let (_tx, rx) = mpsc::channel(1); - let result = Processor::new(rx, client); + let result = Processor::new(rx, client, false); let err = result.err().expect("expected error").to_string(); assert!( err.contains("multicast_publisher_block"), diff --git a/sdk/borsh-incremental/typescript/borsh-incremental/index.ts b/sdk/borsh-incremental/typescript/borsh-incremental/index.ts index 2fbaf64948..8e38712bc0 100644 --- a/sdk/borsh-incremental/typescript/borsh-incremental/index.ts +++ b/sdk/borsh-incremental/typescript/borsh-incremental/index.ts @@ -185,6 +185,8 @@ export class IncrementalReader { tryReadString(def: string = ""): string { if (this.remaining < 4) return def; + const len = this.data.getUint32(this._offset, true); + if (this._offset + 4 + len > this.raw.byteLength) return def; return this.readString(); } diff --git a/sdk/serviceability/python/serviceability/state.py b/sdk/serviceability/python/serviceability/state.py index 1a61b961d9..c91edf8895 100644 --- a/sdk/serviceability/python/serviceability/state.py +++ b/sdk/serviceability/python/serviceability/state.py @@ -42,6 +42,7 @@ class AccountTypeEnum(IntEnum): ACCESS_PASS = 11 TENANT = 13 PERMISSION = 15 + TOPOLOGY = 16 # --------------------------------------------------------------------------- @@ -403,6 +404,12 @@ def __str__(self) -> str: CURRENT_INTERFACE_VERSION = 2 +@dataclass +class FlexAlgoNodeSegment: + topology: Pubkey = Pubkey.default() + node_segment_idx: int = 0 + + @dataclass class Interface: version: int = 0 @@ -420,9 +427,10 @@ class Interface: ip_net: bytes = b"\x00" * 5 node_segment_idx: int = 0 user_tunnel_endpoint: bool = False + flex_algo_node_segments: list[FlexAlgoNodeSegment] = field(default_factory=list) @classmethod - def from_reader(cls, r: IncrementalReader) -> Interface: + def from_reader(cls, r: DefensiveReader) -> Interface: iface = cls() iface.version = r.read_u8() if iface.version > CURRENT_INTERFACE_VERSION - 1: @@ -451,6 +459,12 @@ def from_reader(cls, r: IncrementalReader) -> Interface: iface.ip_net = r.read_network_v4() iface.node_segment_idx = r.read_u16() iface.user_tunnel_endpoint = r.read_bool() + count = r.read_u32() + for _ in range(count): + seg = FlexAlgoNodeSegment() + seg.topology = _read_pubkey(r) + seg.node_segment_idx = r.read_u16() + iface.flex_algo_node_segments.append(seg) return iface @@ -679,6 +693,8 @@ class Link: delay_override_ns: int = 0 link_health: LinkHealth = LinkHealth.UNKNOWN link_desired_status: LinkDesiredStatus = LinkDesiredStatus.PENDING + link_topologies: list[Pubkey] = field(default_factory=list) + link_flags: int = 0 @classmethod def from_bytes(cls, data: bytes) -> Link: @@ -705,6 +721,8 @@ def from_bytes(cls, data: bytes) -> Link: lk.delay_override_ns = r.read_u64() lk.link_health = LinkHealth(r.read_u8()) lk.link_desired_status = LinkDesiredStatus(r.read_u8()) + lk.link_topologies = _read_pubkey_vec(r) + lk.link_flags = r.read_u8() return lk @@ -868,6 +886,7 @@ class Tenant: billing_discriminant: int = 0 billing_rate: int = 0 billing_last_deduction_dz_epoch: int = 0 + include_topologies: list[Pubkey] = field(default_factory=list) @classmethod def from_bytes(cls, data: bytes) -> Tenant: @@ -887,6 +906,7 @@ def from_bytes(cls, data: bytes) -> Tenant: t.billing_discriminant = r.read_u8() t.billing_rate = r.read_u64() t.billing_last_deduction_dz_epoch = r.read_u64() + t.include_topologies = _read_pubkey_vec(r) return t @@ -995,3 +1015,42 @@ def from_bytes(cls, data: bytes) -> Permission: hi = r.read_u64() p.permissions = lo | (hi << 64) return p + + +# --------------------------------------------------------------------------- +# TopologyInfo +# --------------------------------------------------------------------------- + + +class TopologyConstraint(IntEnum): + INCLUDE_ANY = 0 + EXCLUDE = 1 + + def __str__(self) -> str: + _names = {0: "include-any", 1: "exclude"} + return _names.get(self.value, "unknown") + + +@dataclass +class TopologyInfo: + account_type: int = 0 + owner: Pubkey = Pubkey.default() + bump_seed: int = 0 + name: str = "" + admin_group_bit: int = 0 + flex_algo_number: int = 0 + constraint: TopologyConstraint = TopologyConstraint.INCLUDE_ANY + pub_key: Pubkey = Pubkey.default() # set from account address after deserialization + + @classmethod + def from_bytes(cls, data: bytes) -> TopologyInfo: + r = DefensiveReader(data) + t = cls() + t.account_type = r.read_u8() + t.owner = _read_pubkey(r) + t.bump_seed = r.read_u8() + t.name = r.read_string() + t.admin_group_bit = r.read_u8() + t.flex_algo_number = r.read_u8() + t.constraint = TopologyConstraint(r.read_u8()) + return t diff --git a/sdk/serviceability/testdata/fixtures/device.bin b/sdk/serviceability/testdata/fixtures/device.bin index 8554f47fb1..5e322d10ae 100644 Binary files a/sdk/serviceability/testdata/fixtures/device.bin and b/sdk/serviceability/testdata/fixtures/device.bin differ diff --git a/sdk/serviceability/testdata/fixtures/generate-fixtures/src/main.rs b/sdk/serviceability/testdata/fixtures/generate-fixtures/src/main.rs index 78b1293eda..6c5cc8b52d 100644 --- a/sdk/serviceability/testdata/fixtures/generate-fixtures/src/main.rs +++ b/sdk/serviceability/testdata/fixtures/generate-fixtures/src/main.rs @@ -323,6 +323,7 @@ fn generate_device(dir: &Path) { ip_net: "172.16.0.1/30".parse().unwrap(), node_segment_idx: 200, user_tunnel_endpoint: true, + flex_algo_node_segments: vec![], }), ], reference_count: 12, @@ -433,6 +434,8 @@ fn generate_link(dir: &Path) { delay_override_ns: 0, link_health: LinkHealth::ReadyForService, desired_status: LinkDesiredStatus::Activated, + link_topologies: vec![], + link_flags: 0, }; let data = borsh::to_vec(&val).unwrap(); @@ -759,6 +762,7 @@ fn generate_tenant(dir: &Path) { metro_routing: true, route_liveness: false, billing: TenantBillingConfig::default(), + include_topologies: vec![], }; let data = borsh::to_vec(&val).unwrap(); diff --git a/sdk/serviceability/testdata/fixtures/link.bin b/sdk/serviceability/testdata/fixtures/link.bin index e988d5d648..460511378d 100644 Binary files a/sdk/serviceability/testdata/fixtures/link.bin and b/sdk/serviceability/testdata/fixtures/link.bin differ diff --git a/sdk/serviceability/testdata/fixtures/tenant.bin b/sdk/serviceability/testdata/fixtures/tenant.bin index bfc41f6d8c..064816e9b8 100644 Binary files a/sdk/serviceability/testdata/fixtures/tenant.bin and b/sdk/serviceability/testdata/fixtures/tenant.bin differ diff --git a/sdk/serviceability/testdata/fixtures/user.json b/sdk/serviceability/testdata/fixtures/user.json index 426aa11c38..2093b5a039 100644 --- a/sdk/serviceability/testdata/fixtures/user.json +++ b/sdk/serviceability/testdata/fixtures/user.json @@ -118,4 +118,4 @@ "typ": "u64" } ] -} +} \ No newline at end of file diff --git a/sdk/serviceability/typescript/serviceability/state.ts b/sdk/serviceability/typescript/serviceability/state.ts index d8ac7ffba1..4f3147748e 100644 --- a/sdk/serviceability/typescript/serviceability/state.ts +++ b/sdk/serviceability/typescript/serviceability/state.ts @@ -470,6 +470,11 @@ export function deserializeExchange(data: Uint8Array): Exchange { // Interface (versioned, embedded in Device) // --------------------------------------------------------------------------- +export interface FlexAlgoNodeSegment { + topology: PublicKey; + nodeSegmentIdx: number; +} + export interface DeviceInterface { version: number; status: number; @@ -486,6 +491,7 @@ export interface DeviceInterface { ipNet: Uint8Array; nodeSegmentIdx: number; userTunnelEndpoint: boolean; + flexAlgoNodeSegments?: FlexAlgoNodeSegment[]; } const CURRENT_INTERFACE_VERSION = 2; @@ -507,6 +513,7 @@ function deserializeInterface(r: DefensiveReader): DeviceInterface { ipNet: new Uint8Array(5), nodeSegmentIdx: 0, userTunnelEndpoint: false, + flexAlgoNodeSegments: [], }; iface.version = r.readU8(); @@ -540,6 +547,19 @@ function deserializeInterface(r: DefensiveReader): DeviceInterface { iface.ipNet = r.readNetworkV4(); iface.nodeSegmentIdx = r.readU16(); iface.userTunnelEndpoint = r.readBool(); + const segCount = r.readU32(); + const flexAlgoNodeSegments: FlexAlgoNodeSegment[] = []; + for (let i = 0; i < segCount; i++) { + // Break early if there isn't enough data for a full segment. On pre-RFC-18 + // mainnet accounts, segCount reads garbage bytes from the next field, so + // without this guard the loop runs hundreds of thousands of times. + if (r.remaining < 34) break; // 32 (pubkey) + 2 (u16) + flexAlgoNodeSegments.push({ + topology: readPubkey(r), + nodeSegmentIdx: r.readU16(), + }); + } + iface.flexAlgoNodeSegments = flexAlgoNodeSegments; } return iface; diff --git a/sdk/serviceability/typescript/serviceability/tests/compat.test.ts b/sdk/serviceability/typescript/serviceability/tests/compat.test.ts index b041bb5c59..98d9ebd013 100644 --- a/sdk/serviceability/typescript/serviceability/tests/compat.test.ts +++ b/sdk/serviceability/typescript/serviceability/tests/compat.test.ts @@ -13,8 +13,9 @@ import { describe, expect, test, setDefaultTimeout } from "bun:test"; // Compat tests hit public RPC endpoints which may be slow or rate-limited. -setDefaultTimeout(30_000); -import { Connection, PublicKey } from "@solana/web3.js"; +// The getProgramData test fetches all mainnet accounts and can take 60-90s during busy periods. +setDefaultTimeout(120_000); +import { PublicKey } from "@solana/web3.js"; import { PROGRAM_IDS, LEDGER_RPC_URLS } from "../config.js"; import { newConnection } from "../rpc.js"; import {