diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index b8385866d33..542bdf75b0e 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -15,7 +15,7 @@ use graph::components::versions::VERSIONS; use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap}; use graph::data::subgraph::{status, DeploymentFeatures}; use graph::data::value::Object; -use graph::futures03::TryFutureExt; +use graph::futures03::{future, TryFutureExt}; use graph::prelude::*; use graph_graphql::prelude::{a, ExecutionContext, Resolver}; @@ -352,7 +352,10 @@ where )) } - fn resolve_proof_of_indexing(&self, field: &a::Field) -> Result { + async fn resolve_proof_of_indexing( + &self, + field: &a::Field, + ) -> Result { let deployment_id = field .get_required::("subgraph") .expect("Valid subgraphId required"); @@ -381,7 +384,7 @@ where let poi_fut = self .store .get_proof_of_indexing(&deployment_id, &indexer, block.clone()); - let poi = match graph::futures03::executor::block_on(poi_fut) { + let poi = match poi_fut.await { Ok(Some(poi)) => r::Value::String(format!("0x{}", hex::encode(poi))), Ok(None) => r::Value::Null, Err(e) => { @@ -414,28 +417,29 @@ where return Err(QueryExecutionError::TooExpensive); } - let mut public_poi_results = vec![]; - for request in requests { - let (poi_result, request) = match self - .store - .get_public_proof_of_indexing(&request.deployment, request.block_number, self) - .await - { - Ok(Some(poi)) => (Some(poi), request), - Ok(None) => (None, request), - Err(e) => { - error!( - self.logger, - "Failed to query public proof of indexing"; - "subgraph" => &request.deployment, - "block" => format!("{}", request.block_number), - "error" => format!("{:?}", e) - ); - (None, request) - } - }; + // Process all POI requests in parallel for better throughput + let poi_futures: Vec<_> = requests + .into_iter() + .map(|request| async move { + let poi_result = match self + .store + .get_public_proof_of_indexing(&request.deployment, request.block_number, self) + .await + { + Ok(Some(poi)) => Some(poi), + Ok(None) => None, + Err(e) => { + error!( + self.logger, + "Failed to query public proof of indexing"; + "subgraph" => &request.deployment, + "block" => format!("{}", request.block_number), + "error" => format!("{:?}", e) + ); + None + } + }; - public_poi_results.push( PublicProofOfIndexingResult { deployment: request.deployment, block: match poi_result { @@ -444,9 +448,11 @@ where }, proof_of_indexing: poi_result.map(|(_, poi)| poi), } - .into_value(), - ) - } + .into_value() + }) + .collect(); + + let public_poi_results = future::join_all(poi_futures).await; Ok(r::Value::List(public_poi_results)) } @@ -791,7 +797,7 @@ where field.name.as_str(), scalar_type.name.as_str(), ) { - ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field), + ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field).await, ("Query", "blockData", "JSONObject") => self.resolve_block_data(field).await, ("Query", "blockHashFromNumber", "Bytes") => { self.resolve_block_hash_from_number(field).await