From 00a9db0ddd9f97b4cb023de25117b43a443fe62a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 29 Jan 2026 13:25:49 -0800 Subject: [PATCH 1/4] server/json-rpc: Replace jsonrpsee with axum Migrate the JSON-RPC admin server from jsonrpsee to axum/tower, which are already used elsewhere in the codebase (server/graphman). This removes the jsonrpsee dependency while maintaining full protocol compatibility. Changes: - Add jsonrpc.rs: JSON-RPC 2.0 types (request, response, error, ID) - Add error.rs: Error code mapping from SubgraphRegistrarError - Add handlers.rs: Request dispatch and method handlers - Add server.rs: Axum-based HTTP server with graceful shutdown - Update lib.rs: Module structure with public exports - Update Cargo.toml: Replace jsonrpsee with axum, serde_json, thiserror --- Cargo.lock | 192 +++--------------- server/json-rpc/Cargo.toml | 6 +- server/json-rpc/src/error.rs | 47 +++++ server/json-rpc/src/handlers.rs | 347 ++++++++++++++++++++++++++++++++ server/json-rpc/src/jsonrpc.rs | 198 ++++++++++++++++++ server/json-rpc/src/lib.rs | 341 +------------------------------ server/json-rpc/src/server.rs | 95 +++++++++ 7 files changed, 727 insertions(+), 499 deletions(-) create mode 100644 server/json-rpc/src/error.rs create mode 100644 server/json-rpc/src/handlers.rs create mode 100644 server/json-rpc/src/jsonrpc.rs create mode 100644 server/json-rpc/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index a0c04723ed1..ff4842b829e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1618,9 +1618,9 @@ dependencies = [ "form_urlencoded", "futures-util", "http 1.4.0", - "http-body 1.0.0", + "http-body", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-util", "itoa", "matchit", @@ -1651,7 +1651,7 @@ dependencies = [ "bytes", "futures-core", "http 1.4.0", - "http-body 1.0.0", + "http-body", "http-body-util", "mime", "pin-project-lite", @@ -1706,15 +1706,6 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d809780667f4410e7c41b07f52439b94d2bdf8528eeedc287fa38d3b7f95d82" -[[package]] -name = "beef" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1" -dependencies = [ - "serde", -] - [[package]] name = "bigdecimal" version = "0.1.2" @@ -3682,7 +3673,7 @@ dependencies = [ "http 0.2.12", "http 1.4.0", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-util", "indoc", "itertools 0.14.0", @@ -3939,9 +3930,13 @@ dependencies = [ name = "graph-server-json-rpc" version = "0.36.0" dependencies = [ + "axum", "graph", - "jsonrpsee", "serde", + "serde_json", + "slog", + "thiserror 2.0.18", + "tokio", ] [[package]] @@ -4107,25 +4102,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "h2" -version = "0.3.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.12", - "indexmap 2.11.4", - "slab", - "tokio", - "tokio-util 0.7.18", - "tracing", -] - [[package]] name = "h2" version = "0.4.5" @@ -4335,17 +4311,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" -dependencies = [ - "bytes", - "http 0.2.12", - "pin-project-lite", -] - [[package]] name = "http-body" version = "1.0.0" @@ -4365,7 +4330,7 @@ dependencies = [ "bytes", "futures-core", "http 1.4.0", - "http-body 1.0.0", + "http-body", "pin-project-lite", ] @@ -4387,30 +4352,6 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" -[[package]] -name = "hyper" -version = "0.14.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "socket2 0.5.7", - "tokio", - "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tracing", - "want", -] - [[package]] name = "hyper" version = "1.8.1" @@ -4421,9 +4362,9 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2 0.4.5", + "h2", "http 1.4.0", - "http-body 1.0.0", + "http-body", "httparse", "httpdate", "itoa", @@ -4442,7 +4383,7 @@ checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.4.0", - "hyper 1.8.1", + "hyper", "hyper-util", "rustls", "rustls-native-certs 0.7.1", @@ -4459,7 +4400,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper 1.8.1", + "hyper", "hyper-util", "pin-project-lite", "tokio", @@ -4474,7 +4415,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-util", "native-tls", "tokio", @@ -4494,8 +4435,8 @@ dependencies = [ "futures-core", "futures-util", "http 1.4.0", - "http-body 1.0.0", - "hyper 1.8.1", + "http-body", + "hyper", "ipnet", "libc", "percent-encoding", @@ -4968,77 +4909,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "jsonrpsee" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bd0d559d5e679b1ab2f869b486a11182923863b1b3ee8b421763cdd707b783a" -dependencies = [ - "jsonrpsee-core", - "jsonrpsee-http-server", - "jsonrpsee-types", -] - -[[package]] -name = "jsonrpsee-core" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3dc3e9cf2ba50b7b1d7d76a667619f82846caa39e8e8daa8a4962d74acaddca" -dependencies = [ - "anyhow", - "arrayvec 0.7.4", - "async-trait", - "beef", - "futures-channel", - "futures-util", - "globset", - "http 0.2.12", - "hyper 0.14.29", - "jsonrpsee-types", - "lazy_static", - "parking_lot", - "rand 0.8.5", - "rustc-hash 1.1.0", - "serde", - "serde_json", - "thiserror 1.0.61", - "tokio", - "tracing", - "unicase", -] - -[[package]] -name = "jsonrpsee-http-server" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03802f0373a38c2420c70b5144742d800b509e2937edc4afb116434f07120117" -dependencies = [ - "futures-channel", - "futures-util", - "hyper 0.14.29", - "jsonrpsee-core", - "jsonrpsee-types", - "serde", - "serde_json", - "tokio", - "tracing", - "tracing-futures", -] - -[[package]] -name = "jsonrpsee-types" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e290bba767401b646812f608c099b922d8142603c9e73a50fb192d3ac86f4a0d" -dependencies = [ - "anyhow", - "beef", - "serde", - "serde_json", - "thiserror 1.0.61", - "tracing", -] - [[package]] name = "k256" version = "0.13.4" @@ -5616,7 +5486,7 @@ dependencies = [ "http 1.4.0", "http-body-util", "humantime", - "hyper 1.8.1", + "hyper", "itertools 0.14.0", "parking_lot", "percent-encoding", @@ -6688,11 +6558,11 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.4.5", + "h2", "http 1.4.0", - "http-body 1.0.0", + "http-body", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-rustls", "hyper-tls", "hyper-util", @@ -8234,11 +8104,11 @@ dependencies = [ "base64 0.22.1", "bytes", "flate2", - "h2 0.4.5", + "h2", "http 1.4.0", - "http-body 1.0.0", + "http-body", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-timeout", "hyper-util", "percent-encoding", @@ -8343,7 +8213,7 @@ dependencies = [ "bytes", "futures-util", "http 1.4.0", - "http-body 1.0.0", + "http-body", "iri-string", "pin-project-lite", "tower 0.5.2", @@ -8417,16 +8287,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "try-lock" version = "0.2.5" @@ -9644,7 +9504,7 @@ dependencies = [ "futures 0.3.31", "http 1.4.0", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-util", "log", "once_cell", diff --git a/server/json-rpc/Cargo.toml b/server/json-rpc/Cargo.toml index 3b727976811..65af5d0a280 100644 --- a/server/json-rpc/Cargo.toml +++ b/server/json-rpc/Cargo.toml @@ -4,6 +4,10 @@ version.workspace = true edition.workspace = true [dependencies] +axum = { workspace = true } graph = { path = "../../graph" } -jsonrpsee = { version = "0.15.1", features = ["http-server"] } serde = { workspace = true } +serde_json = { workspace = true } +slog = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } diff --git a/server/json-rpc/src/error.rs b/server/json-rpc/src/error.rs new file mode 100644 index 00000000000..cc21dc3cd99 --- /dev/null +++ b/server/json-rpc/src/error.rs @@ -0,0 +1,47 @@ +//! Error handling for the JSON-RPC admin server. +//! +//! This module defines error codes and mapping from `SubgraphRegistrarError` +//! to JSON-RPC error responses. + +use graph::prelude::SubgraphRegistrarError; +use slog::{error, Logger}; + +use crate::jsonrpc::JsonRpcError; + +/// Application-specific error codes for subgraph operations. +/// +/// These codes are preserved from the original jsonrpsee implementation +/// to maintain backward compatibility with existing clients. +pub mod error_codes { + pub const DEPLOY_ERROR: i64 = 0; + pub const REMOVE_ERROR: i64 = 1; + pub const CREATE_ERROR: i64 = 2; + pub const REASSIGN_ERROR: i64 = 3; + pub const PAUSE_ERROR: i64 = 4; + pub const RESUME_ERROR: i64 = 5; +} + +/// Convert a `SubgraphRegistrarError` to a `JsonRpcError`. +/// +/// Logs the error and returns an appropriate JSON-RPC error response. +/// For `Unknown` errors, the message is masked as "internal error" to avoid +/// leaking sensitive information. +pub fn registrar_error_to_jsonrpc( + logger: &Logger, + operation: &str, + e: SubgraphRegistrarError, + code: i64, + params: impl std::fmt::Debug, +) -> JsonRpcError { + error!(logger, "{} failed", operation; + "error" => format!("{:?}", e), + "params" => format!("{:?}", params)); + + let message = if let SubgraphRegistrarError::Unknown(_) = e { + "internal error".to_owned() + } else { + e.to_string() + }; + + JsonRpcError::new(code, message) +} diff --git a/server/json-rpc/src/handlers.rs b/server/json-rpc/src/handlers.rs new file mode 100644 index 00000000000..c0958cef675 --- /dev/null +++ b/server/json-rpc/src/handlers.rs @@ -0,0 +1,347 @@ +//! JSON-RPC request handlers for subgraph operations. +//! +//! This module implements the request dispatch and individual handlers +//! for each JSON-RPC method. + +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::sync::Arc; + +use axum::extract::{ConnectInfo, State}; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::IntoResponse; +use axum::Json; +use graph::prelude::{ + DeploymentHash, NodeId, SubgraphName, SubgraphRegistrar, Value as GraphValue, ENV_VARS, +}; +use serde::Deserialize; +use serde_json::{self, Value as JsonValue}; +use slog::{info, Logger}; + +use crate::error::{error_codes, registrar_error_to_jsonrpc}; +use crate::jsonrpc::{JsonRpcError, JsonRpcId, JsonRpcRequest, JsonRpcResponse}; + +/// Shared application state for the JSON-RPC server. +pub struct AppState { + pub registrar: Arc, + pub http_port: u16, + pub node_id: NodeId, + pub logger: Logger, +} + +/// Main JSON-RPC request handler. +/// +/// Processes incoming JSON-RPC requests, dispatches to the appropriate method handler, +/// and returns the response. +pub async fn jsonrpc_handler( + State(state): State>>, + ConnectInfo(remote_addr): ConnectInfo, + headers: HeaderMap, + body: String, +) -> impl IntoResponse { + // Log the incoming request with proxy headers + info!( + &state.logger, + "JSON-RPC request"; + "remote_addr" => %remote_addr, + "x_forwarded_for" => headers.get("x-forwarded-for").and_then(|v| v.to_str().ok()), + "x_real_ip" => headers.get("x-real-ip").and_then(|v| v.to_str().ok()), + "x_forwarded_proto" => headers.get("x-forwarded-proto").and_then(|v| v.to_str().ok()) + ); + + // Parse the JSON-RPC request + let request: JsonRpcRequest = match serde_json::from_str(&body) { + Ok(req) => req, + Err(_) => { + return (StatusCode::OK, Json(JsonRpcResponse::parse_error())); + } + }; + + // Validate JSON-RPC version + if !request.is_valid_version() { + return (StatusCode::OK, Json(JsonRpcResponse::invalid_request())); + } + + let id = request.id.clone().unwrap_or(JsonRpcId::Null); + + // Log the method call + info!( + &state.logger, + "JSON-RPC call"; + "method" => &request.method, + "params" => ?request.params + ); + + // Dispatch to the appropriate handler + let response = match request.method.as_str() { + "subgraph_create" => handle_create(&state, &request, id.clone()).await, + "subgraph_deploy" => handle_deploy(&state, &request, id.clone()).await, + "subgraph_remove" => handle_remove(&state, &request, id.clone()).await, + "subgraph_reassign" => handle_reassign(&state, &request, id.clone()).await, + "subgraph_pause" => handle_pause(&state, &request, id.clone()).await, + "subgraph_resume" => handle_resume(&state, &request, id.clone()).await, + _ => JsonRpcResponse::error(id, JsonRpcError::method_not_found()), + }; + + (StatusCode::OK, Json(response)) +} + +/// Parse parameters from a JSON-RPC request. +#[allow(clippy::result_large_err)] +fn parse_params Deserialize<'de>>( + request: &JsonRpcRequest, + id: JsonRpcId, +) -> Result { + let params = request.params.clone().unwrap_or(JsonValue::Null); + serde_json::from_value(params).map_err(|e| { + JsonRpcResponse::error( + id, + JsonRpcError::invalid_params(format!("Invalid params: {}", e)), + ) + }) +} + +/// Handler for `subgraph_create`. +async fn handle_create( + state: &AppState, + request: &JsonRpcRequest, + id: JsonRpcId, +) -> JsonRpcResponse { + let params: SubgraphCreateParams = match parse_params(request, id.clone()) { + Ok(p) => p, + Err(resp) => return resp, + }; + + info!(&state.logger, "Received subgraph_create request"; "params" => format!("{:?}", params)); + + match state.registrar.create_subgraph(params.name.clone()).await { + Ok(result) => { + let value = serde_json::to_value(result).expect("invalid subgraph creation result"); + JsonRpcResponse::success(id, value) + } + Err(e) => { + let error = registrar_error_to_jsonrpc( + &state.logger, + "subgraph_create", + e, + error_codes::CREATE_ERROR, + ¶ms, + ); + JsonRpcResponse::error(id, error) + } + } +} + +/// Handler for `subgraph_deploy`. +async fn handle_deploy( + state: &AppState, + request: &JsonRpcRequest, + id: JsonRpcId, +) -> JsonRpcResponse { + let params: SubgraphDeployParams = match parse_params(request, id.clone()) { + Ok(p) => p, + Err(resp) => return resp, + }; + + info!(&state.logger, "Received subgraph_deploy request"; "params" => format!("{:?}", params)); + + let node_id = params.node_id.clone().unwrap_or(state.node_id.clone()); + let routes = subgraph_routes(¶ms.name, state.http_port); + + match state + .registrar + .create_subgraph_version( + params.name.clone(), + params.ipfs_hash.clone(), + node_id, + params.debug_fork.clone(), + // Here it doesn't make sense to receive another + // startBlock, we'll use the one from the manifest. + None, + None, + params.history_blocks, + false, + ) + .await + { + Ok(_) => JsonRpcResponse::success(id, routes), + Err(e) => { + let error = registrar_error_to_jsonrpc( + &state.logger, + "subgraph_deploy", + e, + error_codes::DEPLOY_ERROR, + ¶ms, + ); + JsonRpcResponse::error(id, error) + } + } +} + +/// Handler for `subgraph_remove`. +async fn handle_remove( + state: &AppState, + request: &JsonRpcRequest, + id: JsonRpcId, +) -> JsonRpcResponse { + let params: SubgraphRemoveParams = match parse_params(request, id.clone()) { + Ok(p) => p, + Err(resp) => return resp, + }; + + info!(&state.logger, "Received subgraph_remove request"; "params" => format!("{:?}", params)); + + match state.registrar.remove_subgraph(params.name.clone()).await { + Ok(_) => JsonRpcResponse::success(id, serde_json::to_value(GraphValue::Null).unwrap()), + Err(e) => { + let error = registrar_error_to_jsonrpc( + &state.logger, + "subgraph_remove", + e, + error_codes::REMOVE_ERROR, + ¶ms, + ); + JsonRpcResponse::error(id, error) + } + } +} + +/// Handler for `subgraph_reassign`. +async fn handle_reassign( + state: &AppState, + request: &JsonRpcRequest, + id: JsonRpcId, +) -> JsonRpcResponse { + let params: SubgraphReassignParams = match parse_params(request, id.clone()) { + Ok(p) => p, + Err(resp) => return resp, + }; + + info!(&state.logger, "Received subgraph_reassignment request"; "params" => format!("{:?}", params)); + + match state + .registrar + .reassign_subgraph(¶ms.ipfs_hash, ¶ms.node_id) + .await + { + Ok(_) => JsonRpcResponse::success(id, serde_json::to_value(GraphValue::Null).unwrap()), + Err(e) => { + let error = registrar_error_to_jsonrpc( + &state.logger, + "subgraph_reassign", + e, + error_codes::REASSIGN_ERROR, + ¶ms, + ); + JsonRpcResponse::error(id, error) + } + } +} + +/// Handler for `subgraph_pause`. +async fn handle_pause( + state: &AppState, + request: &JsonRpcRequest, + id: JsonRpcId, +) -> JsonRpcResponse { + let params: SubgraphPauseParams = match parse_params(request, id.clone()) { + Ok(p) => p, + Err(resp) => return resp, + }; + + info!(&state.logger, "Received subgraph_pause request"; "params" => format!("{:?}", params)); + + match state.registrar.pause_subgraph(¶ms.deployment).await { + Ok(_) => JsonRpcResponse::success(id, serde_json::to_value(GraphValue::Null).unwrap()), + Err(e) => { + let error = registrar_error_to_jsonrpc( + &state.logger, + "subgraph_pause", + e, + error_codes::PAUSE_ERROR, + ¶ms, + ); + JsonRpcResponse::error(id, error) + } + } +} + +/// Handler for `subgraph_resume`. +async fn handle_resume( + state: &AppState, + request: &JsonRpcRequest, + id: JsonRpcId, +) -> JsonRpcResponse { + let params: SubgraphPauseParams = match parse_params(request, id.clone()) { + Ok(p) => p, + Err(resp) => return resp, + }; + + info!(&state.logger, "Received subgraph_resume request"; "params" => format!("{:?}", params)); + + match state.registrar.resume_subgraph(¶ms.deployment).await { + Ok(_) => JsonRpcResponse::success(id, serde_json::to_value(GraphValue::Null).unwrap()), + Err(e) => { + let error = registrar_error_to_jsonrpc( + &state.logger, + "subgraph_resume", + e, + error_codes::RESUME_ERROR, + ¶ms, + ); + JsonRpcResponse::error(id, error) + } + } +} + +/// Build the subgraph routes response for deploy. +fn subgraph_routes(name: &SubgraphName, http_port: u16) -> JsonValue { + let http_base_url = ENV_VARS + .external_http_base_url + .clone() + .unwrap_or_else(|| format!(":{}", http_port)); + + let mut map = BTreeMap::new(); + map.insert( + "playground", + format!("{}/subgraphs/name/{}/graphql", http_base_url, name), + ); + map.insert( + "queries", + format!("{}/subgraphs/name/{}", http_base_url, name), + ); + + serde_json::to_value(map).expect("invalid subgraph routes") +} + +// Parameter structs for each method + +#[derive(Debug, Deserialize)] +pub struct SubgraphCreateParams { + pub name: SubgraphName, +} + +#[derive(Debug, Deserialize)] +pub struct SubgraphDeployParams { + pub name: SubgraphName, + pub ipfs_hash: DeploymentHash, + pub node_id: Option, + pub debug_fork: Option, + pub history_blocks: Option, +} + +#[derive(Debug, Deserialize)] +pub struct SubgraphRemoveParams { + pub name: SubgraphName, +} + +#[derive(Debug, Deserialize)] +pub struct SubgraphReassignParams { + pub ipfs_hash: DeploymentHash, + pub node_id: NodeId, +} + +#[derive(Debug, Deserialize)] +pub struct SubgraphPauseParams { + pub deployment: DeploymentHash, +} diff --git a/server/json-rpc/src/jsonrpc.rs b/server/json-rpc/src/jsonrpc.rs new file mode 100644 index 00000000000..3d2603db151 --- /dev/null +++ b/server/json-rpc/src/jsonrpc.rs @@ -0,0 +1,198 @@ +//! JSON-RPC 2.0 types for the admin server. +//! +//! This module implements the JSON-RPC 2.0 protocol types according to the specification: +//! https://www.jsonrpc.org/specification + +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; + +/// JSON-RPC 2.0 request ID. +/// +/// The ID can be a string, number, or null (for notifications). +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum JsonRpcId { + String(String), + Number(i64), + #[default] + Null, +} + +/// JSON-RPC 2.0 request object. +#[derive(Debug, Clone, Deserialize)] +pub struct JsonRpcRequest { + /// JSON-RPC version. Must be "2.0". + pub jsonrpc: String, + + /// Method name to invoke. + pub method: String, + + /// Method parameters (optional). + #[serde(default)] + pub params: Option, + + /// Request ID (optional for notifications). + #[serde(default)] + pub id: Option, +} + +impl JsonRpcRequest { + /// Returns true if this request has a valid JSON-RPC version. + pub fn is_valid_version(&self) -> bool { + self.jsonrpc == "2.0" + } +} + +/// JSON-RPC 2.0 error object. +#[derive(Debug, Clone, Serialize)] +pub struct JsonRpcError { + /// Error code. + pub code: i64, + + /// Human-readable error message. + pub message: String, + + /// Additional error data (optional). + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +impl JsonRpcError { + /// Create a new error with the given code and message. + pub fn new(code: i64, message: impl Into) -> Self { + Self { + code, + message: message.into(), + data: None, + } + } + + /// Parse error (-32700): Invalid JSON was received. + pub fn parse_error() -> Self { + Self::new(-32700, "Parse error") + } + + /// Invalid request (-32600): The JSON sent is not a valid Request object. + pub fn invalid_request() -> Self { + Self::new(-32600, "Invalid Request") + } + + /// Method not found (-32601): The method does not exist / is not available. + pub fn method_not_found() -> Self { + Self::new(-32601, "Method not found") + } + + /// Invalid params (-32602): Invalid method parameter(s). + pub fn invalid_params(message: impl Into) -> Self { + Self::new(-32602, message) + } +} + +/// JSON-RPC 2.0 response object. +#[derive(Debug, Clone, Serialize)] +pub struct JsonRpcResponse { + /// JSON-RPC version. Always "2.0". + pub jsonrpc: &'static str, + + /// Result on success (mutually exclusive with error). + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, + + /// Error on failure (mutually exclusive with result). + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + + /// Request ID (same as the request, or null for notifications). + pub id: JsonRpcId, +} + +impl JsonRpcResponse { + /// Create a successful response. + pub fn success(id: JsonRpcId, result: JsonValue) -> Self { + Self { + jsonrpc: "2.0", + result: Some(result), + error: None, + id, + } + } + + /// Create an error response. + pub fn error(id: JsonRpcId, error: JsonRpcError) -> Self { + Self { + jsonrpc: "2.0", + result: None, + error: Some(error), + id, + } + } + + /// Create an error response for a parse error (when we don't have an ID). + pub fn parse_error() -> Self { + Self::error(JsonRpcId::Null, JsonRpcError::parse_error()) + } + + /// Create an error response for an invalid request (when we don't have an ID). + pub fn invalid_request() -> Self { + Self::error(JsonRpcId::Null, JsonRpcError::invalid_request()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserialize_request_with_string_id() { + let json = r#"{"jsonrpc":"2.0","method":"test","id":"abc"}"#; + let req: JsonRpcRequest = serde_json::from_str(json).unwrap(); + assert_eq!(req.id, Some(JsonRpcId::String("abc".to_string()))); + } + + #[test] + fn deserialize_request_with_number_id() { + let json = r#"{"jsonrpc":"2.0","method":"test","id":123}"#; + let req: JsonRpcRequest = serde_json::from_str(json).unwrap(); + assert_eq!(req.id, Some(JsonRpcId::Number(123))); + } + + #[test] + fn deserialize_request_with_null_id() { + // When id is explicitly null, serde treats it as None due to Option + // This is acceptable per JSON-RPC 2.0 spec as null id means notification + let json = r#"{"jsonrpc":"2.0","method":"test","id":null}"#; + let req: JsonRpcRequest = serde_json::from_str(json).unwrap(); + assert_eq!(req.id, None); + } + + #[test] + fn deserialize_request_without_id() { + let json = r#"{"jsonrpc":"2.0","method":"test"}"#; + let req: JsonRpcRequest = serde_json::from_str(json).unwrap(); + assert_eq!(req.id, None); + } + + #[test] + fn serialize_success_response() { + let resp = JsonRpcResponse::success(JsonRpcId::Number(1), serde_json::json!({"ok": true})); + let json = serde_json::to_string(&resp).unwrap(); + assert!(json.contains(r#""jsonrpc":"2.0""#)); + assert!(json.contains(r#""result":{"ok":true}"#)); + assert!(json.contains(r#""id":1"#)); + assert!(!json.contains("error")); + } + + #[test] + fn serialize_error_response() { + let resp = JsonRpcResponse::error( + JsonRpcId::String("req-1".to_string()), + JsonRpcError::new(-32601, "Method not found"), + ); + let json = serde_json::to_string(&resp).unwrap(); + assert!(json.contains(r#""jsonrpc":"2.0""#)); + assert!(json.contains(r#""error""#)); + assert!(json.contains(r#""code":-32601"#)); + assert!(json.contains(r#""id":"req-1""#)); + assert!(!json.contains("result")); + } +} diff --git a/server/json-rpc/src/lib.rs b/server/json-rpc/src/lib.rs index 1bd6769f3b8..66a0e85e67c 100644 --- a/server/json-rpc/src/lib.rs +++ b/server/json-rpc/src/lib.rs @@ -1,334 +1,11 @@ -use graph::prelude::{Value as GraphValue, *}; -use jsonrpsee::core::middleware::{Headers, HttpMiddleware, MethodKind, Params}; -use jsonrpsee::core::Error as JsonRpcError; -use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; -use jsonrpsee::types::error::CallError; -use jsonrpsee::types::ErrorObject; -use jsonrpsee::RpcModule; -use serde_json::{self, Value as JsonValue}; +//! JSON-RPC admin server for subgraph management. +//! +//! This crate provides a JSON-RPC 2.0 server for managing subgraphs, +//! supporting operations like create, deploy, remove, reassign, pause, and resume. -use std::collections::BTreeMap; -use std::net::{Ipv4Addr, SocketAddr}; +mod error; +mod handlers; +mod jsonrpc; +mod server; -/// Middleware for logging JSON-RPC requests. -/// -/// Logs incoming HTTP requests with remote address and proxy headers, -/// and logs each JSON-RPC method call with its parameters. -#[derive(Clone)] -struct RpcLogger { - logger: Logger, -} - -impl HttpMiddleware for RpcLogger { - type Instant = (); - - fn on_request(&self, remote_addr: SocketAddr, headers: &Headers) -> Self::Instant { - info!( - &self.logger, - "JSON-RPC request"; - "remote_addr" => %remote_addr, - "x_forwarded_for" => headers.get("x-forwarded-for").and_then(|v| v.to_str().ok()), - "x_real_ip" => headers.get("x-real-ip").and_then(|v| v.to_str().ok()), - "x_forwarded_proto" => headers.get("x-forwarded-proto").and_then(|v| v.to_str().ok()) - ); - } - - fn on_call(&self, method_name: &str, params: Params, _kind: MethodKind) { - info!( - &self.logger, - "JSON-RPC call"; - "method" => method_name, - "params" => ?params - ); - } - - fn on_result(&self, _method_name: &str, _success: bool, _started_at: Self::Instant) {} - - fn on_response(&self, _result: &str, _started_at: Self::Instant) {} -} - -type JsonRpcResult = Result; - -pub struct JsonRpcServer { - // TODO: in the future we might want to have some sort of async drop to stop - // the server. For now, we're just letting it run it forever. - _handle: HttpServerHandle, -} - -impl JsonRpcServer { - pub async fn serve( - port: u16, - http_port: u16, - registrar: Arc, - node_id: NodeId, - logger: Logger, - ) -> JsonRpcResult - where - R: SubgraphRegistrar, - { - let logger = logger.new(o!("component" => "JsonRpcServer")); - - info!( - logger, - "Starting JSON-RPC admin server at: http://localhost:{}", port - ); - - let state = ServerState { - registrar, - http_port, - node_id, - logger, - }; - - let socket_addr: SocketAddr = (Ipv4Addr::new(0, 0, 0, 0), port).into(); - let http_server = HttpServerBuilder::default() - .set_middleware(RpcLogger { - logger: state.logger.clone(), - }) - .build(socket_addr) - .await?; - - let mut rpc_module = RpcModule::new(state); - rpc_module - .register_async_method("subgraph_create", |params, state| async move { - state.create_handler(params.parse()?).await - }) - .unwrap(); - rpc_module - .register_async_method("subgraph_deploy", |params, state| async move { - state.deploy_handler(params.parse()?).await - }) - .unwrap(); - rpc_module - .register_async_method("subgraph_remove", |params, state| async move { - state.remove_handler(params.parse()?).await - }) - .unwrap(); - rpc_module - .register_async_method("subgraph_reassign", |params, state| async move { - state.reassign_handler(params.parse()?).await - }) - .unwrap(); - rpc_module - .register_async_method("subgraph_pause", |params, state| async move { - state.pause_handler(params.parse()?).await - }) - .unwrap(); - rpc_module - .register_async_method("subgraph_resume", |params, state| async move { - state.resume_handler(params.parse()?).await - }) - .unwrap(); - - let _handle = http_server.start(rpc_module)?; - Ok(Self { _handle }) - } -} - -struct ServerState { - registrar: Arc, - http_port: u16, - node_id: NodeId, - logger: Logger, -} - -impl ServerState { - const DEPLOY_ERROR: i64 = 0; - const REMOVE_ERROR: i64 = 1; - const CREATE_ERROR: i64 = 2; - const REASSIGN_ERROR: i64 = 3; - const PAUSE_ERROR: i64 = 4; - const RESUME_ERROR: i64 = 5; - - /// Handler for the `subgraph_create` endpoint. - async fn create_handler(&self, params: SubgraphCreateParams) -> JsonRpcResult { - info!(&self.logger, "Received subgraph_create request"; "params" => format!("{:?}", params)); - - match self.registrar.create_subgraph(params.name.clone()).await { - Ok(result) => { - Ok(serde_json::to_value(result).expect("invalid subgraph creation result")) - } - Err(e) => Err(json_rpc_error( - &self.logger, - "subgraph_create", - e, - Self::CREATE_ERROR, - params, - )), - } - } - - /// Handler for the `subgraph_deploy` endpoint. - async fn deploy_handler(&self, params: SubgraphDeployParams) -> JsonRpcResult { - info!(&self.logger, "Received subgraph_deploy request"; "params" => format!("{:?}", params)); - - let node_id = params.node_id.clone().unwrap_or(self.node_id.clone()); - let routes = subgraph_routes(¶ms.name, self.http_port); - match self - .registrar - .create_subgraph_version( - params.name.clone(), - params.ipfs_hash.clone(), - node_id, - params.debug_fork.clone(), - // Here it doesn't make sense to receive another - // startBlock, we'll use the one from the manifest. - None, - None, - params.history_blocks, - false, - ) - .await - { - Ok(_) => Ok(routes), - Err(e) => Err(json_rpc_error( - &self.logger, - "subgraph_deploy", - e, - Self::DEPLOY_ERROR, - params, - )), - } - } - - /// Handler for the `subgraph_remove` endpoint. - async fn remove_handler(&self, params: SubgraphRemoveParams) -> JsonRpcResult { - info!(&self.logger, "Received subgraph_remove request"; "params" => format!("{:?}", params)); - - match self.registrar.remove_subgraph(params.name.clone()).await { - Ok(_) => Ok(Value::Null), - Err(e) => Err(json_rpc_error( - &self.logger, - "subgraph_remove", - e, - Self::REMOVE_ERROR, - params, - )), - } - } - - /// Handler for the `subgraph_assign` endpoint. - async fn reassign_handler(&self, params: SubgraphReassignParams) -> JsonRpcResult { - info!(&self.logger, "Received subgraph_reassignment request"; "params" => format!("{:?}", params)); - - match self - .registrar - .reassign_subgraph(¶ms.ipfs_hash, ¶ms.node_id) - .await - { - Ok(_) => Ok(Value::Null), - Err(e) => Err(json_rpc_error( - &self.logger, - "subgraph_reassign", - e, - Self::REASSIGN_ERROR, - params, - )), - } - } - - /// Handler for the `subgraph_pause` endpoint. - async fn pause_handler(&self, params: SubgraphPauseParams) -> JsonRpcResult { - info!(&self.logger, "Received subgraph_pause request"; "params" => format!("{:?}", params)); - - match self.registrar.pause_subgraph(¶ms.deployment).await { - Ok(_) => Ok(Value::Null), - Err(e) => Err(json_rpc_error( - &self.logger, - "subgraph_pause", - e, - Self::PAUSE_ERROR, - params, - )), - } - } - - /// Handler for the `subgraph_resume` endpoint. - async fn resume_handler(&self, params: SubgraphPauseParams) -> JsonRpcResult { - info!(&self.logger, "Received subgraph_resume request"; "params" => format!("{:?}", params)); - - match self.registrar.resume_subgraph(¶ms.deployment).await { - Ok(_) => Ok(Value::Null), - Err(e) => Err(json_rpc_error( - &self.logger, - "subgraph_resume", - e, - Self::RESUME_ERROR, - params, - )), - } - } -} - -fn json_rpc_error( - logger: &Logger, - operation: &str, - e: SubgraphRegistrarError, - code: i64, - params: impl std::fmt::Debug, -) -> JsonRpcError { - error!(logger, "{} failed", operation; - "error" => format!("{:?}", e), - "params" => format!("{:?}", params)); - - let message = if let SubgraphRegistrarError::Unknown(_) = e { - "internal error".to_owned() - } else { - e.to_string() - }; - - JsonRpcError::Call(CallError::Custom(ErrorObject::owned( - code as _, - message, - None::, - ))) -} - -fn subgraph_routes(name: &SubgraphName, http_port: u16) -> JsonValue { - let http_base_url = ENV_VARS - .external_http_base_url - .clone() - .unwrap_or_else(|| format!(":{}", http_port)); - - let mut map = BTreeMap::new(); - map.insert( - "playground", - format!("{}/subgraphs/name/{}/graphql", http_base_url, name), - ); - map.insert( - "queries", - format!("{}/subgraphs/name/{}", http_base_url, name), - ); - - serde_json::to_value(map).expect("invalid subgraph routes") -} - -#[derive(Debug, Deserialize)] -struct SubgraphCreateParams { - name: SubgraphName, -} - -#[derive(Debug, Deserialize)] -struct SubgraphDeployParams { - name: SubgraphName, - ipfs_hash: DeploymentHash, - node_id: Option, - debug_fork: Option, - history_blocks: Option, -} - -#[derive(Debug, Deserialize)] -struct SubgraphRemoveParams { - name: SubgraphName, -} - -#[derive(Debug, Deserialize)] -struct SubgraphReassignParams { - ipfs_hash: DeploymentHash, - node_id: NodeId, -} - -#[derive(Debug, Deserialize)] -struct SubgraphPauseParams { - deployment: DeploymentHash, -} +pub use server::{JsonRpcServer, JsonRpcServerError}; diff --git a/server/json-rpc/src/server.rs b/server/json-rpc/src/server.rs new file mode 100644 index 00000000000..0495d186b1a --- /dev/null +++ b/server/json-rpc/src/server.rs @@ -0,0 +1,95 @@ +//! Axum-based JSON-RPC server implementation. +//! +//! This module provides the `JsonRpcServer` that serves JSON-RPC requests +//! over HTTP using axum. + +use std::net::{Ipv4Addr, SocketAddr}; +use std::sync::Arc; + +use axum::routing::post; +use axum::Router; +use graph::prelude::{NodeId, SubgraphRegistrar}; +use slog::{info, Logger}; +use thiserror::Error; +use tokio::sync::Notify; + +use crate::handlers::{jsonrpc_handler, AppState}; + +/// Errors that can occur when starting the JSON-RPC server. +#[derive(Debug, Error)] +pub enum JsonRpcServerError { + #[error("Failed to bind to address: {0}")] + Bind(#[from] std::io::Error), +} + +/// Handle to a running JSON-RPC server. +/// +/// Dropping this handle does not stop the server. Use `stop()` for graceful shutdown. +pub struct JsonRpcServer { + notify: Arc, +} + +impl JsonRpcServer { + /// Start the JSON-RPC server. + /// + /// # Arguments + /// + /// * `port` - The port to listen on for JSON-RPC requests + /// * `http_port` - The HTTP port used for subgraph route URLs in deploy responses + /// * `registrar` - The subgraph registrar for handling operations + /// * `node_id` - Default node ID for deployments + /// * `logger` - Logger for request/response logging + pub async fn serve( + port: u16, + http_port: u16, + registrar: Arc, + node_id: NodeId, + logger: Logger, + ) -> Result + where + R: SubgraphRegistrar, + { + let logger = logger.new(graph::prelude::o!("component" => "JsonRpcServer")); + + info!( + logger, + "Starting JSON-RPC admin server at: http://localhost:{}", port + ); + + let state = Arc::new(AppState { + registrar, + http_port, + node_id, + logger, + }); + + let app = Router::new() + .route("/", post(jsonrpc_handler::)) + .with_state(state); + + let addr: SocketAddr = (Ipv4Addr::new(0, 0, 0, 0), port).into(); + let listener = tokio::net::TcpListener::bind(addr).await?; + + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + + graph::spawn(async move { + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .with_graceful_shutdown(async move { + notify_clone.notified().await; + }) + .await + .unwrap_or_else(|err| panic!("JSON-RPC server failed: {err}")); + }); + + Ok(Self { notify }) + } + + /// Stop the server gracefully. + pub fn stop(self) { + self.notify.notify_one(); + } +} From 813f8a18b1dc17fcd1b16a78de8cd61554b1ea8a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 29 Jan 2026 13:42:26 -0800 Subject: [PATCH 2/4] server/json-rpc: Log connection info with JSON-RPC call --- server/json-rpc/src/handlers.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/server/json-rpc/src/handlers.rs b/server/json-rpc/src/handlers.rs index c0958cef675..0c2f66da86b 100644 --- a/server/json-rpc/src/handlers.rs +++ b/server/json-rpc/src/handlers.rs @@ -39,15 +39,12 @@ pub async fn jsonrpc_handler( headers: HeaderMap, body: String, ) -> impl IntoResponse { - // Log the incoming request with proxy headers - info!( - &state.logger, - "JSON-RPC request"; - "remote_addr" => %remote_addr, - "x_forwarded_for" => headers.get("x-forwarded-for").and_then(|v| v.to_str().ok()), - "x_real_ip" => headers.get("x-real-ip").and_then(|v| v.to_str().ok()), - "x_forwarded_proto" => headers.get("x-forwarded-proto").and_then(|v| v.to_str().ok()) - ); + fn header<'a>(headers: &'a HeaderMap, key: &str) -> &'a str { + headers + .get(key) + .and_then(|v| v.to_str().ok()) + .unwrap_or("unset") + } // Parse the JSON-RPC request let request: JsonRpcRequest = match serde_json::from_str(&body) { @@ -69,7 +66,11 @@ pub async fn jsonrpc_handler( &state.logger, "JSON-RPC call"; "method" => &request.method, - "params" => ?request.params + "params" => ?request.params, + "remote_addr" => %remote_addr, + "x_forwarded_for" => header(&headers, "x-forwarded-for"), + "x_real_ip" => header(&headers, "x-real-ip"), + "x_forwarded_proto" => header(&headers, "x-forwarded-proto") ); // Dispatch to the appropriate handler From 6f920042ee9a59c7b6ed25659f531ea99568cf79 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 29 Jan 2026 14:04:50 -0800 Subject: [PATCH 3/4] server/json-rpc: Simplify handlers with to_response helper Consolidate the repetitive success/error handling pattern into a single `to_response` helper function. Inline the error module into handlers.rs and remove redundant per-handler logging (the dispatch function already logs method, params, and connection info). --- server/json-rpc/src/error.rs | 47 -------- server/json-rpc/src/handlers.rs | 208 +++++++++++++++++--------------- server/json-rpc/src/lib.rs | 1 - 3 files changed, 114 insertions(+), 142 deletions(-) delete mode 100644 server/json-rpc/src/error.rs diff --git a/server/json-rpc/src/error.rs b/server/json-rpc/src/error.rs deleted file mode 100644 index cc21dc3cd99..00000000000 --- a/server/json-rpc/src/error.rs +++ /dev/null @@ -1,47 +0,0 @@ -//! Error handling for the JSON-RPC admin server. -//! -//! This module defines error codes and mapping from `SubgraphRegistrarError` -//! to JSON-RPC error responses. - -use graph::prelude::SubgraphRegistrarError; -use slog::{error, Logger}; - -use crate::jsonrpc::JsonRpcError; - -/// Application-specific error codes for subgraph operations. -/// -/// These codes are preserved from the original jsonrpsee implementation -/// to maintain backward compatibility with existing clients. -pub mod error_codes { - pub const DEPLOY_ERROR: i64 = 0; - pub const REMOVE_ERROR: i64 = 1; - pub const CREATE_ERROR: i64 = 2; - pub const REASSIGN_ERROR: i64 = 3; - pub const PAUSE_ERROR: i64 = 4; - pub const RESUME_ERROR: i64 = 5; -} - -/// Convert a `SubgraphRegistrarError` to a `JsonRpcError`. -/// -/// Logs the error and returns an appropriate JSON-RPC error response. -/// For `Unknown` errors, the message is masked as "internal error" to avoid -/// leaking sensitive information. -pub fn registrar_error_to_jsonrpc( - logger: &Logger, - operation: &str, - e: SubgraphRegistrarError, - code: i64, - params: impl std::fmt::Debug, -) -> JsonRpcError { - error!(logger, "{} failed", operation; - "error" => format!("{:?}", e), - "params" => format!("{:?}", params)); - - let message = if let SubgraphRegistrarError::Unknown(_) = e { - "internal error".to_owned() - } else { - e.to_string() - }; - - JsonRpcError::new(code, message) -} diff --git a/server/json-rpc/src/handlers.rs b/server/json-rpc/src/handlers.rs index 0c2f66da86b..a3582e94032 100644 --- a/server/json-rpc/src/handlers.rs +++ b/server/json-rpc/src/handlers.rs @@ -4,6 +4,7 @@ //! for each JSON-RPC method. use std::collections::BTreeMap; +use std::fmt::Debug; use std::net::SocketAddr; use std::sync::Arc; @@ -12,15 +13,24 @@ use axum::http::{HeaderMap, StatusCode}; use axum::response::IntoResponse; use axum::Json; use graph::prelude::{ - DeploymentHash, NodeId, SubgraphName, SubgraphRegistrar, Value as GraphValue, ENV_VARS, + DeploymentHash, NodeId, SubgraphName, SubgraphRegistrar, SubgraphRegistrarError, ENV_VARS, }; use serde::Deserialize; use serde_json::{self, Value as JsonValue}; -use slog::{info, Logger}; +use slog::{error, info, Logger}; -use crate::error::{error_codes, registrar_error_to_jsonrpc}; use crate::jsonrpc::{JsonRpcError, JsonRpcId, JsonRpcRequest, JsonRpcResponse}; +/// Application-specific error codes for subgraph operations. +mod error_codes { + pub const DEPLOY_ERROR: i64 = 0; + pub const REMOVE_ERROR: i64 = 1; + pub const CREATE_ERROR: i64 = 2; + pub const REASSIGN_ERROR: i64 = 3; + pub const PAUSE_ERROR: i64 = 4; + pub const RESUME_ERROR: i64 = 5; +} + /// Shared application state for the JSON-RPC server. pub struct AppState { pub registrar: Arc, @@ -102,6 +112,33 @@ fn parse_params Deserialize<'de>>( }) } +/// Convert a registrar result to a JSON-RPC response. +fn to_response( + logger: &Logger, + method: &str, + error_code: i64, + params: &P, + result: Result, + id: JsonRpcId, +) -> JsonRpcResponse { + match result { + Ok(value) => JsonRpcResponse::success(id, value), + Err(e) => { + error!(logger, "{} failed", method; + "error" => format!("{:?}", e), + "params" => format!("{:?}", params)); + + let message = if let SubgraphRegistrarError::Unknown(_) = e { + "internal error".to_owned() + } else { + e.to_string() + }; + + JsonRpcResponse::error(id, JsonRpcError::new(error_code, message)) + } + } +} + /// Handler for `subgraph_create`. async fn handle_create( state: &AppState, @@ -113,24 +150,20 @@ async fn handle_create( Err(resp) => return resp, }; - info!(&state.logger, "Received subgraph_create request"; "params" => format!("{:?}", params)); + let result = state + .registrar + .create_subgraph(params.name.clone()) + .await + .map(|r| serde_json::to_value(r).expect("invalid result")); - match state.registrar.create_subgraph(params.name.clone()).await { - Ok(result) => { - let value = serde_json::to_value(result).expect("invalid subgraph creation result"); - JsonRpcResponse::success(id, value) - } - Err(e) => { - let error = registrar_error_to_jsonrpc( - &state.logger, - "subgraph_create", - e, - error_codes::CREATE_ERROR, - ¶ms, - ); - JsonRpcResponse::error(id, error) - } - } + to_response( + &state.logger, + "subgraph_create", + error_codes::CREATE_ERROR, + ¶ms, + result, + id, + ) } /// Handler for `subgraph_deploy`. @@ -144,12 +177,10 @@ async fn handle_deploy( Err(resp) => return resp, }; - info!(&state.logger, "Received subgraph_deploy request"; "params" => format!("{:?}", params)); - let node_id = params.node_id.clone().unwrap_or(state.node_id.clone()); let routes = subgraph_routes(¶ms.name, state.http_port); - match state + let result = state .registrar .create_subgraph_version( params.name.clone(), @@ -164,19 +195,16 @@ async fn handle_deploy( false, ) .await - { - Ok(_) => JsonRpcResponse::success(id, routes), - Err(e) => { - let error = registrar_error_to_jsonrpc( - &state.logger, - "subgraph_deploy", - e, - error_codes::DEPLOY_ERROR, - ¶ms, - ); - JsonRpcResponse::error(id, error) - } - } + .map(|_| routes); + + to_response( + &state.logger, + "subgraph_deploy", + error_codes::DEPLOY_ERROR, + ¶ms, + result, + id, + ) } /// Handler for `subgraph_remove`. @@ -190,21 +218,20 @@ async fn handle_remove( Err(resp) => return resp, }; - info!(&state.logger, "Received subgraph_remove request"; "params" => format!("{:?}", params)); + let result = state + .registrar + .remove_subgraph(params.name.clone()) + .await + .map(|_| JsonValue::Null); - match state.registrar.remove_subgraph(params.name.clone()).await { - Ok(_) => JsonRpcResponse::success(id, serde_json::to_value(GraphValue::Null).unwrap()), - Err(e) => { - let error = registrar_error_to_jsonrpc( - &state.logger, - "subgraph_remove", - e, - error_codes::REMOVE_ERROR, - ¶ms, - ); - JsonRpcResponse::error(id, error) - } - } + to_response( + &state.logger, + "subgraph_remove", + error_codes::REMOVE_ERROR, + ¶ms, + result, + id, + ) } /// Handler for `subgraph_reassign`. @@ -218,25 +245,20 @@ async fn handle_reassign( Err(resp) => return resp, }; - info!(&state.logger, "Received subgraph_reassignment request"; "params" => format!("{:?}", params)); - - match state + let result = state .registrar .reassign_subgraph(¶ms.ipfs_hash, ¶ms.node_id) .await - { - Ok(_) => JsonRpcResponse::success(id, serde_json::to_value(GraphValue::Null).unwrap()), - Err(e) => { - let error = registrar_error_to_jsonrpc( - &state.logger, - "subgraph_reassign", - e, - error_codes::REASSIGN_ERROR, - ¶ms, - ); - JsonRpcResponse::error(id, error) - } - } + .map(|_| JsonValue::Null); + + to_response( + &state.logger, + "subgraph_reassign", + error_codes::REASSIGN_ERROR, + ¶ms, + result, + id, + ) } /// Handler for `subgraph_pause`. @@ -250,21 +272,20 @@ async fn handle_pause( Err(resp) => return resp, }; - info!(&state.logger, "Received subgraph_pause request"; "params" => format!("{:?}", params)); + let result = state + .registrar + .pause_subgraph(¶ms.deployment) + .await + .map(|_| JsonValue::Null); - match state.registrar.pause_subgraph(¶ms.deployment).await { - Ok(_) => JsonRpcResponse::success(id, serde_json::to_value(GraphValue::Null).unwrap()), - Err(e) => { - let error = registrar_error_to_jsonrpc( - &state.logger, - "subgraph_pause", - e, - error_codes::PAUSE_ERROR, - ¶ms, - ); - JsonRpcResponse::error(id, error) - } - } + to_response( + &state.logger, + "subgraph_pause", + error_codes::PAUSE_ERROR, + ¶ms, + result, + id, + ) } /// Handler for `subgraph_resume`. @@ -278,21 +299,20 @@ async fn handle_resume( Err(resp) => return resp, }; - info!(&state.logger, "Received subgraph_resume request"; "params" => format!("{:?}", params)); + let result = state + .registrar + .resume_subgraph(¶ms.deployment) + .await + .map(|_| JsonValue::Null); - match state.registrar.resume_subgraph(¶ms.deployment).await { - Ok(_) => JsonRpcResponse::success(id, serde_json::to_value(GraphValue::Null).unwrap()), - Err(e) => { - let error = registrar_error_to_jsonrpc( - &state.logger, - "subgraph_resume", - e, - error_codes::RESUME_ERROR, - ¶ms, - ); - JsonRpcResponse::error(id, error) - } - } + to_response( + &state.logger, + "subgraph_resume", + error_codes::RESUME_ERROR, + ¶ms, + result, + id, + ) } /// Build the subgraph routes response for deploy. diff --git a/server/json-rpc/src/lib.rs b/server/json-rpc/src/lib.rs index 66a0e85e67c..401a457c57e 100644 --- a/server/json-rpc/src/lib.rs +++ b/server/json-rpc/src/lib.rs @@ -3,7 +3,6 @@ //! This crate provides a JSON-RPC 2.0 server for managing subgraphs, //! supporting operations like create, deploy, remove, reassign, pause, and resume. -mod error; mod handlers; mod jsonrpc; mod server; From d5a499eda43501bec825cf2c485389e845a310ad Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 29 Jan 2026 14:14:33 -0800 Subject: [PATCH 4/4] server/json-rpc: Inline params structs into handlers as much as possible --- server/json-rpc/src/handlers.rs | 67 ++++++++++++++++----------------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/server/json-rpc/src/handlers.rs b/server/json-rpc/src/handlers.rs index a3582e94032..23482a5f2bc 100644 --- a/server/json-rpc/src/handlers.rs +++ b/server/json-rpc/src/handlers.rs @@ -145,6 +145,11 @@ async fn handle_create( request: &JsonRpcRequest, id: JsonRpcId, ) -> JsonRpcResponse { + #[derive(Debug, Deserialize)] + pub struct SubgraphCreateParams { + pub name: SubgraphName, + } + let params: SubgraphCreateParams = match parse_params(request, id.clone()) { Ok(p) => p, Err(resp) => return resp, @@ -172,13 +177,21 @@ async fn handle_deploy( request: &JsonRpcRequest, id: JsonRpcId, ) -> JsonRpcResponse { + #[derive(Debug, Deserialize)] + pub struct SubgraphDeployParams { + pub name: SubgraphName, + pub ipfs_hash: DeploymentHash, + pub node_id: Option, + pub debug_fork: Option, + pub history_blocks: Option, + } + let params: SubgraphDeployParams = match parse_params(request, id.clone()) { Ok(p) => p, Err(resp) => return resp, }; let node_id = params.node_id.clone().unwrap_or(state.node_id.clone()); - let routes = subgraph_routes(¶ms.name, state.http_port); let result = state .registrar @@ -195,7 +208,7 @@ async fn handle_deploy( false, ) .await - .map(|_| routes); + .map(|_| subgraph_routes(¶ms.name, state.http_port)); to_response( &state.logger, @@ -213,6 +226,11 @@ async fn handle_remove( request: &JsonRpcRequest, id: JsonRpcId, ) -> JsonRpcResponse { + #[derive(Debug, Deserialize)] + pub struct SubgraphRemoveParams { + pub name: SubgraphName, + } + let params: SubgraphRemoveParams = match parse_params(request, id.clone()) { Ok(p) => p, Err(resp) => return resp, @@ -240,6 +258,12 @@ async fn handle_reassign( request: &JsonRpcRequest, id: JsonRpcId, ) -> JsonRpcResponse { + #[derive(Debug, Deserialize)] + pub struct SubgraphReassignParams { + pub ipfs_hash: DeploymentHash, + pub node_id: NodeId, + } + let params: SubgraphReassignParams = match parse_params(request, id.clone()) { Ok(p) => p, Err(resp) => return resp, @@ -261,6 +285,13 @@ async fn handle_reassign( ) } +// Parameter structs for pause and resume + +#[derive(Debug, Deserialize)] +pub struct SubgraphPauseParams { + pub deployment: DeploymentHash, +} + /// Handler for `subgraph_pause`. async fn handle_pause( state: &AppState, @@ -334,35 +365,3 @@ fn subgraph_routes(name: &SubgraphName, http_port: u16) -> JsonValue { serde_json::to_value(map).expect("invalid subgraph routes") } - -// Parameter structs for each method - -#[derive(Debug, Deserialize)] -pub struct SubgraphCreateParams { - pub name: SubgraphName, -} - -#[derive(Debug, Deserialize)] -pub struct SubgraphDeployParams { - pub name: SubgraphName, - pub ipfs_hash: DeploymentHash, - pub node_id: Option, - pub debug_fork: Option, - pub history_blocks: Option, -} - -#[derive(Debug, Deserialize)] -pub struct SubgraphRemoveParams { - pub name: SubgraphName, -} - -#[derive(Debug, Deserialize)] -pub struct SubgraphReassignParams { - pub ipfs_hash: DeploymentHash, - pub node_id: NodeId, -} - -#[derive(Debug, Deserialize)] -pub struct SubgraphPauseParams { - pub deployment: DeploymentHash, -}