From eb755c7834844cae1094f5a68238b68c9a17b1e9 Mon Sep 17 00:00:00 2001 From: Andrew McConnell Date: Sat, 11 Apr 2026 12:07:53 -0500 Subject: [PATCH] edge: add feed runner, manager, HTTP API, and CLI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the parser and sinks into the doublezerod daemon so operators can actually enable a feed. This is the commit where the edge feature turns on end-to-end. Runner owns one feed: two multicast listener goroutines (marketdata on one UDP port, refdata on another), a parser, a sink, and a small set of atomic counters. Each listener joins the multicast group, reads frames in a loop, hands them to the parser, and forwards any produced records to the sink. A periodic goroutine logs a summary every 30s so operators can see liveness without polling the API. Manager owns many runners keyed by code. Enable(cfg) validates the config (required ports, both must differ, resolver + subscription checks, parser/sink factories), constructs a runner, starts it with a long-lived context owned by the manager, and stores it. Disable(code) and Close() cancel the runner context and release the sink. HTTP endpoints ServeEnable / ServeDisable / ServeStatus are thin wrappers over those methods and expose them on the existing doublezerod Unix socket API. Critically, Enable takes no ctx from the caller — the runner's lifetime is owned by the manager, not by the HTTP request, so the request finishing does not tear down the feed. On the operator side, adds `doublezero edge enable/disable/status` subcommands to the Rust CLI, communicating with doublezerod over its Unix socket. The enable command takes --code, --parser, --format, --output, --marketdata-port, and --refdata-port. Status supports --json for machine-readable output. runtime/run.go wires up the manager: resolves multicast group codes to IPs via the existing serviceability reader and routes subscription checks through the existing user manager, so the edge feature automatically respects onchain allowlists. --- client/doublezero/src/cli/command.rs | 11 +- client/doublezero/src/cli/edge.rs | 21 ++ client/doublezero/src/cli/mod.rs | 1 + client/doublezero/src/command/edge.rs | 239 +++++++++++++++++ client/doublezero/src/command/mod.rs | 1 + client/doublezero/src/main.rs | 7 + client/doublezero/src/servicecontroller.rs | 95 +++++++ client/doublezerod/internal/edge/manager.go | 250 ++++++++++++++++++ .../doublezerod/internal/edge/manager_test.go | 191 +++++++++++++ client/doublezerod/internal/edge/runner.go | 183 +++++++++++++ client/doublezerod/internal/runtime/run.go | 33 +++ 11 files changed, 1028 insertions(+), 4 deletions(-) create mode 100644 client/doublezero/src/cli/edge.rs create mode 100644 client/doublezero/src/command/edge.rs create mode 100644 client/doublezerod/internal/edge/manager.go create mode 100644 client/doublezerod/internal/edge/manager_test.go create mode 100644 client/doublezerod/internal/edge/runner.go diff --git a/client/doublezero/src/cli/command.rs b/client/doublezero/src/cli/command.rs index 8f80374581..39ad971acc 100644 --- a/client/doublezero/src/cli/command.rs +++ b/client/doublezero/src/cli/command.rs @@ -2,10 +2,10 @@ use super::multicast::MulticastCliCommand; use crate::{ cli::{ accesspass::AccessPassCliCommand, config::ConfigCliCommand, - contributor::ContributorCliCommand, device::DeviceCliCommand, exchange::ExchangeCliCommand, - globalconfig::GlobalConfigCliCommand, link::LinkCliCommand, location::LocationCliCommand, - permission::PermissionCliCommand, resource::ResourceCliCommand, tenant::TenantCliCommand, - user::UserCliCommand, + contributor::ContributorCliCommand, device::DeviceCliCommand, edge::EdgeCliCommand, + exchange::ExchangeCliCommand, globalconfig::GlobalConfigCliCommand, link::LinkCliCommand, + location::LocationCliCommand, permission::PermissionCliCommand, + resource::ResourceCliCommand, tenant::TenantCliCommand, user::UserCliCommand, }, command::{ connect::ProvisioningCliCommand, disable::DisableCliCommand, @@ -97,6 +97,9 @@ pub enum Command { /// Manage multicast #[command()] Multicast(MulticastCliCommand), + /// Manage edge feed parsing + #[command()] + Edge(EdgeCliCommand), /// Export all data to files #[command()] Export(ExportCliCommand), diff --git a/client/doublezero/src/cli/edge.rs b/client/doublezero/src/cli/edge.rs new file mode 100644 index 0000000000..abfdda3507 --- /dev/null +++ b/client/doublezero/src/cli/edge.rs @@ -0,0 +1,21 @@ +use crate::command::edge::{EdgeDisableCliCommand, EdgeEnableCliCommand, EdgeStatusCliCommand}; +use clap::{Args, Subcommand}; + +#[derive(Args, Debug)] +pub struct EdgeCliCommand { + #[command(subcommand)] + pub command: EdgeCommands, +} + +#[derive(Debug, Subcommand)] +pub enum EdgeCommands { + /// Enable edge feed parsing for a multicast group + #[command()] + Enable(EdgeEnableCliCommand), + /// Disable edge feed parsing for a multicast group + #[command()] + Disable(EdgeDisableCliCommand), + /// Show status of active edge feed parsers + #[command()] + Status(EdgeStatusCliCommand), +} diff --git a/client/doublezero/src/cli/mod.rs b/client/doublezero/src/cli/mod.rs index a84db116ac..bfeafb84ba 100644 --- a/client/doublezero/src/cli/mod.rs +++ b/client/doublezero/src/cli/mod.rs @@ -3,6 +3,7 @@ pub mod command; pub mod config; pub mod contributor; pub mod device; +pub mod edge; pub mod exchange; pub mod globalconfig; pub mod link; diff --git a/client/doublezero/src/command/edge.rs b/client/doublezero/src/command/edge.rs new file mode 100644 index 0000000000..d32c8d703e --- /dev/null +++ b/client/doublezero/src/command/edge.rs @@ -0,0 +1,239 @@ +use crate::{ + requirements::check_doublezero, + servicecontroller::{ServiceController, ServiceControllerImpl}, +}; +use clap::Args; +use doublezero_cli::doublezerocommand::CliCommand; + +#[derive(Args, Debug)] +pub struct EdgeEnableCliCommand { + /// Multicast group code + #[arg(long)] + pub code: String, + + /// Parser name (e.g. "topofbook") + #[arg(long)] + pub parser: String, + + /// Output format: "json" or "csv" + #[arg(long, default_value = "json")] + pub format: String, + + /// Output path (file path or "unix:///path/to/sock") + #[arg(long)] + pub output: String, + + /// UDP port for marketdata messages (quotes, trades) + #[arg(long)] + pub marketdata_port: u16, + + /// UDP port for refdata messages (instrument definitions) + #[arg(long)] + pub refdata_port: u16, +} + +impl EdgeEnableCliCommand { + pub async fn execute(&self, client: &dyn CliCommand) -> eyre::Result<()> { + let controller = ServiceControllerImpl::new(None); + self.execute_with_service_controller(client, &controller) + .await + } + + pub async fn execute_with_service_controller( + &self, + client: &dyn CliCommand, + controller: &T, + ) -> eyre::Result<()> { + check_doublezero(controller, client, None).await?; + controller + .edge_enable( + &self.code, + &self.parser, + &self.format, + &self.output, + self.marketdata_port, + self.refdata_port, + ) + .await?; + println!( + "Edge feed enabled: code={}, parser={}, format={}, output={}", + self.code, self.parser, self.format, self.output + ); + Ok(()) + } +} + +#[derive(Args, Debug)] +pub struct EdgeDisableCliCommand { + /// Multicast group code + #[arg(long)] + pub code: String, +} + +impl EdgeDisableCliCommand { + pub async fn execute(&self, client: &dyn CliCommand) -> eyre::Result<()> { + let controller = ServiceControllerImpl::new(None); + self.execute_with_service_controller(client, &controller) + .await + } + + pub async fn execute_with_service_controller( + &self, + client: &dyn CliCommand, + controller: &T, + ) -> eyre::Result<()> { + check_doublezero(controller, client, None).await?; + controller.edge_disable(&self.code).await?; + println!("Edge feed disabled: code={}", self.code); + Ok(()) + } +} + +#[derive(Args, Debug)] +pub struct EdgeStatusCliCommand { + /// Output as JSON + #[arg(long, default_value_t = false)] + pub json: bool, +} + +impl EdgeStatusCliCommand { + pub async fn execute(&self, client: &dyn CliCommand) -> eyre::Result<()> { + let controller = ServiceControllerImpl::new(None); + self.execute_with_service_controller(client, &controller) + .await + } + + pub async fn execute_with_service_controller( + &self, + client: &dyn CliCommand, + controller: &T, + ) -> eyre::Result<()> { + check_doublezero(controller, client, None).await?; + let statuses = controller.edge_status().await?; + + if self.json { + println!("{}", serde_json::to_string_pretty(&statuses)?); + return Ok(()); + } + + if statuses.is_empty() { + println!("No active edge feeds"); + return Ok(()); + } + + for s in &statuses { + println!( + " {} parser={} format={} output={} records={} buffered={} running={}", + s.code, s.parser, s.format, s.output, s.records_written, s.buffered, s.running, + ); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::servicecontroller::{EdgeFeedStatus, MockServiceController}; + use doublezero_cli::tests::utils::create_test_client; + use doublezero_config::Environment; + + fn setup_mock() -> MockServiceController { + let mut mock = MockServiceController::new(); + mock.expect_service_controller_check().return_const(true); + mock.expect_service_controller_can_open().return_const(true); + mock.expect_get_env() + .returning_st(|| Ok(Environment::default())); + mock + } + + fn setup_client() -> doublezero_cli::doublezerocommand::MockCliCommand { + let mut client = create_test_client(); + client + .expect_get_environment() + .returning_st(Environment::default); + client + } + + #[tokio::test] + async fn test_edge_enable_success() { + let mut mock = setup_mock(); + mock.expect_edge_enable() + .returning(|_, _, _, _, _, _| Ok(())); + + let client = setup_client(); + let cmd = EdgeEnableCliCommand { + code: "mg01".to_string(), + parser: "topofbook".to_string(), + format: "json".to_string(), + output: "/tmp/test.jsonl".to_string(), + marketdata_port: 7000, + refdata_port: 7001, + }; + let result = cmd.execute_with_service_controller(&client, &mock).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_edge_disable_success() { + let mut mock = setup_mock(); + mock.expect_edge_disable().returning(|_| Ok(())); + + let client = setup_client(); + let cmd = EdgeDisableCliCommand { + code: "mg01".to_string(), + }; + let result = cmd.execute_with_service_controller(&client, &mock).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_edge_status_empty() { + let mut mock = setup_mock(); + mock.expect_edge_status().returning(|| Ok(vec![])); + + let client = setup_client(); + let cmd = EdgeStatusCliCommand { json: false }; + let result = cmd.execute_with_service_controller(&client, &mock).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_edge_status_with_feeds() { + let mut mock = setup_mock(); + mock.expect_edge_status().returning(|| { + Ok(vec![EdgeFeedStatus { + code: "mg01".to_string(), + parser: "topofbook".to_string(), + format: "json".to_string(), + output: "/tmp/out.jsonl".to_string(), + records_written: 42, + buffered: 0, + running: true, + }]) + }); + + let client = setup_client(); + let cmd = EdgeStatusCliCommand { json: true }; + let result = cmd.execute_with_service_controller(&client, &mock).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_edge_enable_daemon_not_running() { + let mut mock = MockServiceController::new(); + mock.expect_service_controller_check().return_const(false); + + let client = setup_client(); + let cmd = EdgeEnableCliCommand { + code: "mg01".to_string(), + parser: "topofbook".to_string(), + format: "json".to_string(), + output: "/tmp/test.jsonl".to_string(), + marketdata_port: 7000, + refdata_port: 7001, + }; + let result = cmd.execute_with_service_controller(&client, &mock).await; + assert!(result.is_err()); + } +} diff --git a/client/doublezero/src/command/mod.rs b/client/doublezero/src/command/mod.rs index b70e02cd38..2f2e941ddb 100644 --- a/client/doublezero/src/command/mod.rs +++ b/client/doublezero/src/command/mod.rs @@ -1,6 +1,7 @@ pub mod connect; pub mod disable; pub mod disconnect; +pub mod edge; pub mod enable; pub mod helpers; pub mod latency; diff --git a/client/doublezero/src/main.rs b/client/doublezero/src/main.rs index 17f5dbc8ed..143dc737ad 100644 --- a/client/doublezero/src/main.rs +++ b/client/doublezero/src/main.rs @@ -12,6 +12,7 @@ use crate::cli::{ command::Command, config::ConfigCommands, device::{DeviceCommands, InterfaceCommands}, + edge::EdgeCommands, exchange::ExchangeCommands, globalconfig::{ AirdropCommands, AuthorityCommands, FeatureFlagsCommands, FoundationAllowlistCommands, @@ -305,6 +306,12 @@ async fn main() -> eyre::Result<()> { }, }, + Command::Edge(command) => match command.command { + EdgeCommands::Enable(args) => args.execute(&client).await, + EdgeCommands::Disable(args) => args.execute(&client).await, + EdgeCommands::Status(args) => args.execute(&client).await, + }, + Command::Resource(command) => match command.command { cli::resource::ResourceCommands::Allocate(args) => args.execute(&client, &mut handle), cli::resource::ResourceCommands::Create(args) => args.execute(&client, &mut handle), diff --git a/client/doublezero/src/servicecontroller.rs b/client/doublezero/src/servicecontroller.rs index a9e12cb967..f848fdfa13 100644 --- a/client/doublezero/src/servicecontroller.rs +++ b/client/doublezero/src/servicecontroller.rs @@ -180,6 +180,17 @@ pub struct V2StatusResponse { pub services: Vec, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EdgeFeedStatus { + pub code: String, + pub parser: String, + pub format: String, + pub output: String, + pub records_written: u64, + pub buffered: i64, + pub running: bool, +} + #[automock] pub trait ServiceController { fn service_controller_check(&self) -> bool; @@ -192,6 +203,17 @@ pub trait ServiceController { async fn enable(&self) -> eyre::Result<()>; async fn disable(&self) -> eyre::Result<()>; async fn routes(&self) -> eyre::Result>; + async fn edge_enable( + &self, + code: &str, + parser: &str, + format: &str, + output: &str, + marketdata_port: u16, + refdata_port: u16, + ) -> eyre::Result<()>; + async fn edge_disable(&self, code: &str) -> eyre::Result<()>; + async fn edge_status(&self) -> eyre::Result>; } pub struct ServiceControllerImpl { @@ -357,6 +379,79 @@ impl ServiceController for ServiceControllerImpl { let response = serde_json::from_slice::>(&data)?; Ok(response) } + + async fn edge_enable( + &self, + code: &str, + parser: &str, + format: &str, + output: &str, + marketdata_port: u16, + refdata_port: u16, + ) -> eyre::Result<()> { + let body = serde_json::json!({ + "code": code, + "parser": parser, + "format": format, + "output": output, + "marketdata_port": marketdata_port, + "refdata_port": refdata_port, + }); + let client: Client> = + Client::builder(TokioExecutor::new()).build(UnixConnector); + let req = Request::builder() + .method(Method::POST) + .uri(Uri::new(&self.socket_path, "/edge/enable")) + .header("content-type", "application/json") + .body(Full::from(Bytes::from(body.to_string())))?; + let res = client + .request(req) + .await + .map_err(|e| eyre!("Unable to connect to doublezero daemon: {e}"))?; + let data = res.into_body().collect().await?.to_bytes(); + if let Ok(err_resp) = serde_json::from_slice::(&data) { + if err_resp.status == "error" { + eyre::bail!(err_resp.description); + } + } + Ok(()) + } + + async fn edge_disable(&self, code: &str) -> eyre::Result<()> { + let body = serde_json::json!({"code": code}); + let client: Client> = + Client::builder(TokioExecutor::new()).build(UnixConnector); + let req = Request::builder() + .method(Method::POST) + .uri(Uri::new(&self.socket_path, "/edge/disable")) + .header("content-type", "application/json") + .body(Full::from(Bytes::from(body.to_string())))?; + let res = client + .request(req) + .await + .map_err(|e| eyre!("Unable to connect to doublezero daemon: {e}"))?; + let data = res.into_body().collect().await?.to_bytes(); + if let Ok(err_resp) = serde_json::from_slice::(&data) { + if err_resp.status == "error" { + eyre::bail!(err_resp.description); + } + } + Ok(()) + } + + async fn edge_status(&self) -> eyre::Result> { + let client = Client::builder(TokioExecutor::new()).build(UnixConnector); + let req = Request::builder() + .method(Method::GET) + .uri(Uri::new(&self.socket_path, "/edge/status")) + .body(Empty::::new())?; + let res = client + .request(req) + .await + .map_err(|e| eyre!("Unable to connect to doublezero daemon: {e}"))?; + let data = res.into_body().collect().await?.to_bytes(); + parse_daemon_response::>(&data, "/edge/status") + } } #[cfg(test)] diff --git a/client/doublezerod/internal/edge/manager.go b/client/doublezerod/internal/edge/manager.go new file mode 100644 index 0000000000..066c6c7ce4 --- /dev/null +++ b/client/doublezerod/internal/edge/manager.go @@ -0,0 +1,250 @@ +package edge + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net" + "net/http" + "sync" +) + +// FeedConfig describes a single active feed. +type FeedConfig struct { + // Code is the multicast group code (e.g. "mg01"). + Code string `json:"code"` + + // ParserName selects the parser from the registry (e.g. "topofbook"). + ParserName string `json:"parser"` + + // Format is the output encoding: "json" or "csv". + Format string `json:"format"` + + // OutputPath is where decoded records are written. + OutputPath string `json:"output"` + + // MarketdataPort is the UDP port for marketdata messages (quotes, trades). + MarketdataPort int `json:"marketdata_port"` + + // RefdataPort is the UDP port for refdata messages (instrument defs). + RefdataPort int `json:"refdata_port"` +} + +// FeedStatus reports the state of an active feed. +type FeedStatus struct { + Code string `json:"code"` + ParserName string `json:"parser"` + Format string `json:"format"` + OutputPath string `json:"output"` + RecordsWritten uint64 `json:"records_written"` + Buffered int `json:"buffered"` + Running bool `json:"running"` +} + +// Manager manages the lifecycle of edge feed runners. +type Manager struct { + mu sync.Mutex + runners map[string]*Runner + + // multicastIP resolves a group code to its multicast IP. + multicastIP func(code string) (net.IP, error) + + // isSubscribed reports whether the given multicast IP is in the + // user's active subscription list. This prevents enabling a feed + // parser for a group the user has not joined. + isSubscribed func(groupIP net.IP) bool +} + +// NewManager creates a new edge feed manager. +// multicastIPResolver maps a group code to its multicast IP. +// subscriptionChecker reports whether a multicast IP is actively subscribed. +func NewManager(multicastIPResolver func(code string) (net.IP, error), subscriptionChecker func(groupIP net.IP) bool) *Manager { + if multicastIPResolver == nil { + multicastIPResolver = func(code string) (net.IP, error) { + return nil, fmt.Errorf("no multicast IP resolver configured") + } + } + if subscriptionChecker == nil { + subscriptionChecker = func(net.IP) bool { return false } + } + return &Manager{ + runners: make(map[string]*Runner), + multicastIP: multicastIPResolver, + isSubscribed: subscriptionChecker, + } +} + +// Enable starts a feed runner for the given configuration. +// +// The runner's lifetime is owned by the Manager — it runs until Disable or +// Close is called. Enable deliberately does not accept a caller-supplied +// context: callers like HTTP handlers would pass the per-request context, +// which is cancelled as soon as the handler returns, silently tearing down +// the runner's sockets. +func (m *Manager) Enable(cfg FeedConfig) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.runners[cfg.Code]; exists { + return fmt.Errorf("feed %q is already enabled", cfg.Code) + } + + if cfg.MarketdataPort <= 0 || cfg.MarketdataPort > 65535 { + return fmt.Errorf("marketdata_port is required and must be 1-65535 (got %d)", cfg.MarketdataPort) + } + if cfg.RefdataPort <= 0 || cfg.RefdataPort > 65535 { + return fmt.Errorf("refdata_port is required and must be 1-65535 (got %d)", cfg.RefdataPort) + } + if cfg.MarketdataPort == cfg.RefdataPort { + return fmt.Errorf("marketdata_port and refdata_port must differ (got %d)", cfg.MarketdataPort) + } + + parser, ok := NewParser(cfg.ParserName) + if !ok { + return fmt.Errorf("unknown parser %q; available: %v", cfg.ParserName, RegisteredParsers()) + } + + sink, err := NewSink(SinkConfig{Format: cfg.Format, Path: cfg.OutputPath}) + if err != nil { + return fmt.Errorf("creating output sink: %w", err) + } + + groupIP, err := m.multicastIP(cfg.Code) + if err != nil { + return fmt.Errorf("resolving multicast IP for %q: %w", cfg.Code, err) + } + + if !m.isSubscribed(groupIP) { + return fmt.Errorf("not subscribed to multicast group %q — run 'doublezero connect multicast subscriber' first", cfg.Code) + } + + runner := NewRunner(RunnerConfig{ + Code: cfg.Code, + GroupIP: groupIP, + MarketdataPort: cfg.MarketdataPort, + RefdataPort: cfg.RefdataPort, + Format: cfg.Format, + OutputPath: cfg.OutputPath, + Parser: parser, + Sink: sink, + }) + + runCtx, cancel := context.WithCancel(context.Background()) + runner.cancel = cancel + + go func() { + if err := runner.Run(runCtx); err != nil && runCtx.Err() == nil { + slog.Error("edge: feed runner exited with error", "code", cfg.Code, "error", err) + } + }() + + m.runners[cfg.Code] = runner + slog.Info("edge: feed enabled", "code", cfg.Code, "parser", cfg.ParserName, "format", cfg.Format, "output", cfg.OutputPath) + return nil +} + +// Disable stops and removes the feed runner for the given group code. +func (m *Manager) Disable(code string) error { + m.mu.Lock() + defer m.mu.Unlock() + + runner, exists := m.runners[code] + if !exists { + return fmt.Errorf("feed %q is not enabled", code) + } + + runner.Stop() + delete(m.runners, code) + slog.Info("edge: feed disabled", "code", code) + return nil +} + +// Status returns the status of all active feeds. +func (m *Manager) Status() []FeedStatus { + m.mu.Lock() + defer m.mu.Unlock() + + statuses := make([]FeedStatus, 0, len(m.runners)) + for _, r := range m.runners { + statuses = append(statuses, r.Status()) + } + return statuses +} + +// Close stops all active feed runners. +func (m *Manager) Close() { + m.mu.Lock() + defer m.mu.Unlock() + + for code, runner := range m.runners { + runner.Stop() + slog.Info("edge: feed stopped", "code", code) + } + m.runners = make(map[string]*Runner) +} + +// ServeEnable handles POST /edge/enable requests. +func (m *Manager) ServeEnable(w http.ResponseWriter, r *http.Request) { + var cfg FeedConfig + if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{ + "status": "error", + "description": fmt.Sprintf("malformed request: %v", err), + }) + return + } + + if cfg.Code == "" || cfg.ParserName == "" || cfg.Format == "" || cfg.OutputPath == "" { + writeJSON(w, http.StatusBadRequest, map[string]string{ + "status": "error", + "description": "code, parser, format, and output are required", + }) + return + } + + if err := m.Enable(cfg); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{ + "status": "error", + "description": err.Error(), + }) + return + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// ServeDisable handles POST /edge/disable requests. +func (m *Manager) ServeDisable(w http.ResponseWriter, r *http.Request) { + var req struct { + Code string `json:"code"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Code == "" { + writeJSON(w, http.StatusBadRequest, map[string]string{ + "status": "error", + "description": "code is required", + }) + return + } + + if err := m.Disable(req.Code); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{ + "status": "error", + "description": err.Error(), + }) + return + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// ServeStatus handles GET /edge/status requests. +func (m *Manager) ServeStatus(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, m.Status()) +} + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(v) //nolint:errcheck +} diff --git a/client/doublezerod/internal/edge/manager_test.go b/client/doublezerod/internal/edge/manager_test.go new file mode 100644 index 0000000000..1ec49e77cf --- /dev/null +++ b/client/doublezerod/internal/edge/manager_test.go @@ -0,0 +1,191 @@ +package edge + +import ( + "bytes" + "encoding/json" + "net" + "net/http" + "net/http/httptest" + "path/filepath" + "strings" + "testing" +) + +func TestParserRegistry(t *testing.T) { + // topofbook should be registered via init(). + p, ok := NewParser("topofbook") + if !ok { + t.Fatal("topofbook parser not found in registry") + } + if p.Name() != "topofbook" { + t.Errorf("expected parser name topofbook, got %s", p.Name()) + } + + _, ok = NewParser("nonexistent") + if ok { + t.Error("expected nonexistent parser to not be found") + } + + names := RegisteredParsers() + found := false + for _, n := range names { + if n == "topofbook" { + found = true + } + } + if !found { + t.Error("expected topofbook in registered parsers") + } +} + +func TestManager_ServeStatus_Empty(t *testing.T) { + mgr := NewManager(nil, nil) + + req := httptest.NewRequest(http.MethodGet, "/edge/status", nil) + w := httptest.NewRecorder() + mgr.ServeStatus(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + var statuses []FeedStatus + if err := json.NewDecoder(w.Body).Decode(&statuses); err != nil { + t.Fatalf("error decoding response: %v", err) + } + if len(statuses) != 0 { + t.Errorf("expected 0 statuses, got %d", len(statuses)) + } +} + +func TestManager_ServeEnable_MissingFields(t *testing.T) { + mgr := NewManager(nil, nil) + + body := bytes.NewBufferString(`{"code": "mg01"}`) + req := httptest.NewRequest(http.MethodPost, "/edge/enable", body) + w := httptest.NewRecorder() + mgr.ServeEnable(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400, got %d", w.Code) + } +} + +func TestManager_ServeDisable_NotEnabled(t *testing.T) { + mgr := NewManager(nil, nil) + + body := bytes.NewBufferString(`{"code": "mg01"}`) + req := httptest.NewRequest(http.MethodPost, "/edge/disable", body) + w := httptest.NewRecorder() + mgr.ServeDisable(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400, got %d", w.Code) + } +} + +func TestManager_Enable_PortValidation(t *testing.T) { + groupIP := net.IPv4(239, 0, 0, 1) + mgr := NewManager( + func(code string) (net.IP, error) { return groupIP, nil }, + func(ip net.IP) bool { return true }, + ) + + tests := []struct { + name string + cfg FeedConfig + wantErr string + }{ + { + name: "missing marketdata port", + cfg: FeedConfig{ + Code: "mg01", ParserName: "topofbook", Format: "json", + OutputPath: filepath.Join(t.TempDir(), "out.jsonl"), + RefdataPort: 7001, + }, + wantErr: "marketdata_port", + }, + { + name: "missing refdata port", + cfg: FeedConfig{ + Code: "mg01", ParserName: "topofbook", Format: "json", + OutputPath: filepath.Join(t.TempDir(), "out.jsonl"), + MarketdataPort: 7000, + }, + wantErr: "refdata_port", + }, + { + name: "same port for both", + cfg: FeedConfig{ + Code: "mg01", ParserName: "topofbook", Format: "json", + OutputPath: filepath.Join(t.TempDir(), "out.jsonl"), + MarketdataPort: 7000, + RefdataPort: 7000, + }, + wantErr: "must differ", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := mgr.Enable(tt.cfg) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("expected error containing %q, got: %v", tt.wantErr, err) + } + }) + } +} + +func TestManager_Enable_NotSubscribed(t *testing.T) { + groupIP := net.IPv4(239, 0, 0, 1) + + mgr := NewManager( + func(code string) (net.IP, error) { return groupIP, nil }, + func(ip net.IP) bool { return false }, // not subscribed + ) + + err := mgr.Enable(FeedConfig{ + Code: "mg01", + ParserName: "topofbook", + Format: "json", + OutputPath: filepath.Join(t.TempDir(), "out.jsonl"), + MarketdataPort: 7000, + RefdataPort: 7001, + }) + if err == nil { + t.Fatal("expected error when not subscribed, got nil") + } + if !strings.Contains(err.Error(), "not subscribed") { + t.Errorf("expected 'not subscribed' error, got: %v", err) + } +} + +func TestManager_Enable_Subscribed(t *testing.T) { + groupIP := net.IPv4(239, 0, 0, 1) + + mgr := NewManager( + func(code string) (net.IP, error) { return groupIP, nil }, + func(ip net.IP) bool { return ip.Equal(groupIP) }, // subscribed + ) + + err := mgr.Enable(FeedConfig{ + Code: "mg01", + ParserName: "topofbook", + Format: "json", + OutputPath: filepath.Join(t.TempDir(), "out.jsonl"), + MarketdataPort: 7000, + RefdataPort: 7001, + }) + // Enable will fail at ListenMulticastUDP (no real network), but it + // should get past the subscription check. Verify it didn't fail with + // the subscription guard error. + if err != nil && strings.Contains(err.Error(), "not subscribed") { + t.Errorf("should have passed subscription check, got: %v", err) + } + + // Clean up any runner that may have started. + mgr.Close() +} diff --git a/client/doublezerod/internal/edge/runner.go b/client/doublezerod/internal/edge/runner.go new file mode 100644 index 0000000000..02e5a95ed9 --- /dev/null +++ b/client/doublezerod/internal/edge/runner.go @@ -0,0 +1,183 @@ +package edge + +import ( + "context" + "fmt" + "log/slog" + "net" + "sync/atomic" + "time" + + "golang.org/x/net/ipv4" +) + +const ( + // maxDatagramSize is the maximum UDP datagram size we read. + maxDatagramSize = 65535 + + // summaryInterval is how often the runner logs a periodic + // summary of records written, buffered messages, and known instruments. + summaryInterval = 30 * time.Second +) + +// RunnerConfig configures a single feed runner. +type RunnerConfig struct { + Code string + GroupIP net.IP + MarketdataPort int + RefdataPort int + Format string + OutputPath string + Parser Parser + Sink OutputSink +} + +// Runner listens for multicast packets on a group and decodes them. +type Runner struct { + cfg RunnerConfig + recordsWritten atomic.Uint64 + running atomic.Bool + firstFrameLogged atomic.Bool + cancel context.CancelFunc +} + +// NewRunner creates a new feed runner. Call Run to start it. +func NewRunner(cfg RunnerConfig) *Runner { + return &Runner{cfg: cfg} +} + +// Run starts listening for multicast packets on both the marketdata and +// refdata ports. It blocks until ctx is cancelled. +func (r *Runner) Run(ctx context.Context) error { + r.running.Store(true) + defer r.running.Store(false) + + errCh := make(chan error, 2) + + go func() { + errCh <- r.listenPort(ctx, r.cfg.RefdataPort, "refdata") + }() + + go func() { + errCh <- r.listenPort(ctx, r.cfg.MarketdataPort, "marketdata") + }() + + go r.logPeriodicSummary(ctx) + + select { + case <-ctx.Done(): + return nil + case err := <-errCh: + return err + } +} + +// logPeriodicSummary emits a periodic INFO log describing the runner's +// current state so operators can see liveness without polling edge status. +func (r *Runner) logPeriodicSummary(ctx context.Context) { + ticker := time.NewTicker(summaryInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + slog.Info("edge: runner summary", + "code", r.cfg.Code, + "records_written", r.recordsWritten.Load(), + "buffered", r.cfg.Parser.Buffered(), + "instruments_known", r.cfg.Parser.InstrumentCount()) + } + } +} + +// listenPort joins the multicast group on the given port and reads packets. +func (r *Runner) listenPort(ctx context.Context, port int, label string) (retErr error) { + defer func() { + if rv := recover(); rv != nil { + retErr = fmt.Errorf("panic in %s listener for %s: %v", label, r.cfg.Code, rv) + slog.Error("edge: feed runner panic recovered", "code", r.cfg.Code, "port", label, "panic", rv) + } + }() + + addr := &net.UDPAddr{ + IP: r.cfg.GroupIP, + Port: port, + } + + conn, err := net.ListenMulticastUDP("udp4", nil, addr) + if err != nil { + return fmt.Errorf("joining multicast group %s port %d: %w", r.cfg.GroupIP, port, err) + } + defer conn.Close() + + // Allow multiple listeners on the same group (hot + ref may share an IP). + pc := ipv4.NewPacketConn(conn) + if err := pc.SetControlMessage(ipv4.FlagDst, true); err != nil { + slog.Warn("edge: could not set control message flag", "code", r.cfg.Code, "error", err) + } + + slog.Info("edge: listening for multicast", "code", r.cfg.Code, "group", r.cfg.GroupIP, "port", port, "label", label) + + buf := make([]byte, maxDatagramSize) + for { + select { + case <-ctx.Done(): + return nil + default: + } + + n, _, err := conn.ReadFromUDP(buf) + if err != nil { + if ctx.Err() != nil { + return nil + } + slog.Warn("edge: read error", "code", r.cfg.Code, "port", label, "error", err) + continue + } + + records, err := r.cfg.Parser.Parse(buf[:n]) + if err != nil { + slog.Warn("edge: parse error", "code", r.cfg.Code, "port", label, "error", err) + continue + } + + if len(records) > 0 { + if r.firstFrameLogged.CompareAndSwap(false, true) { + slog.Info("edge: parser producing records", + "code", r.cfg.Code, + "port", label, + "first_batch_size", len(records)) + } + if err := r.cfg.Sink.Write(records); err != nil { + slog.Error("edge: sink write error", "code", r.cfg.Code, "error", err) + continue + } + r.recordsWritten.Add(uint64(len(records))) + } + } +} + +// Stop cancels the runner's context and closes the sink. +func (r *Runner) Stop() { + if r.cancel != nil { + r.cancel() + } + if r.cfg.Sink != nil { + r.cfg.Sink.Close() //nolint:errcheck + } +} + +// Status returns the current status of the runner. +func (r *Runner) Status() FeedStatus { + return FeedStatus{ + Code: r.cfg.Code, + ParserName: r.cfg.Parser.Name(), + Format: r.cfg.Format, + OutputPath: r.cfg.OutputPath, + RecordsWritten: r.recordsWritten.Load(), + Buffered: r.cfg.Parser.Buffered(), + Running: r.running.Load(), + } +} diff --git a/client/doublezerod/internal/runtime/run.go b/client/doublezerod/internal/runtime/run.go index 5e89709edc..a7a26ad144 100644 --- a/client/doublezerod/internal/runtime/run.go +++ b/client/doublezerod/internal/runtime/run.go @@ -14,6 +14,7 @@ import ( "github.com/gagliardetto/solana-go/rpc" "github.com/malbeclabs/doublezero/client/doublezerod/internal/api" "github.com/malbeclabs/doublezero/client/doublezerod/internal/bgp" + "github.com/malbeclabs/doublezero/client/doublezerod/internal/edge" "github.com/malbeclabs/doublezero/client/doublezerod/internal/latency" "github.com/malbeclabs/doublezero/client/doublezerod/internal/liveness" "github.com/malbeclabs/doublezero/client/doublezerod/internal/manager" @@ -142,6 +143,35 @@ func Run(ctx context.Context, sockFile string, routeConfigPath string, enableLat errCh <- err }() + // Edge feed manager for decoding multicast feed data. + edgeMgr := edge.NewManager(func(code string) (net.IP, error) { + // Resolve multicast group code to IP via onchain state. + fetchCtx, fetchCancel := context.WithTimeout(ctx, fetchTimeout) + defer fetchCancel() + data, err := cachingFetcher.GetProgramData(fetchCtx) + if err != nil { + return nil, fmt.Errorf("fetching program data: %w", err) + } + for _, mg := range data.MulticastGroups { + if mg.Code == code { + return net.IP(mg.MulticastIp[:]).To4(), nil + } + } + return nil, fmt.Errorf("multicast group %q not found", code) + }, func(groupIP net.IP) bool { + // Check whether the user has an active multicast subscription + // that includes this group IP. + for _, pr := range nlm.GetProvisionedServices() { + for _, subIP := range pr.MulticastSubGroups { + if subIP.Equal(groupIP) { + return true + } + } + } + return false + }) + defer edgeMgr.Close() + mux := http.NewServeMux() mux.HandleFunc("POST /provision", nlm.ServeProvision) mux.HandleFunc("POST /remove", nlm.ServeRemove) @@ -151,6 +181,9 @@ func Run(ctx context.Context, sockFile string, routeConfigPath string, enableLat mux.HandleFunc("GET /v2/status", nlm.ServeV2Status) mux.HandleFunc("GET /routes", api.ServeRoutesHandler(nlr, lm, nlm, networkConfig)) mux.HandleFunc("POST /resolve-route", api.ServeResolveRouteHandler(nlr, networkConfig)) + mux.HandleFunc("POST /edge/enable", edgeMgr.ServeEnable) + mux.HandleFunc("POST /edge/disable", edgeMgr.ServeDisable) + mux.HandleFunc("GET /edge/status", edgeMgr.ServeStatus) if latencyManager != nil { go func() {