From 961a0ce8438015c7b352231eaeaa28a82b742134 Mon Sep 17 00:00:00 2001 From: KSlashh <48985735+KSlashh@users.noreply.github.com> Date: Mon, 2 Mar 2026 21:55:25 +0800 Subject: [PATCH 1/2] fix: blocking tasks --- .../src/btc_chain/esplora_bitcoin_adaptor.rs | 55 ++++++++++++++----- crates/client/src/goat_chain/goat_adaptor.rs | 23 +++++++- crates/client/src/lib.rs | 1 + crates/client/src/timeout_config.rs | 20 +++++++ node/src/env.rs | 10 ++++ node/src/scheduled_tasks/mod.rs | 50 ++++++++++++++--- 6 files changed, 134 insertions(+), 25 deletions(-) create mode 100644 crates/client/src/timeout_config.rs diff --git a/crates/client/src/btc_chain/esplora_bitcoin_adaptor.rs b/crates/client/src/btc_chain/esplora_bitcoin_adaptor.rs index c0dd6df5..693d194b 100644 --- a/crates/client/src/btc_chain/esplora_bitcoin_adaptor.rs +++ b/crates/client/src/btc_chain/esplora_bitcoin_adaptor.rs @@ -1,7 +1,12 @@ use crate::btc_chain::bitcoin_adaptor::BitcoinAdaptor; +use crate::timeout_config::get_btc_request_timeout_secs; +use anyhow::anyhow; use bitcoin::block::Header; use bitcoin::{Address as BtcAddress, Block, Network, Transaction, Txid}; use esplora_client::{AsyncClient, Builder, MerkleProof, Tx, Utxo}; +use std::future::Future; +use std::time::Duration; +use tracing::warn; const TEST_URL: &str = "https://mempool.space/testnet/api"; const MAIN_URL: &str = "https://mempool.space/api"; @@ -16,15 +21,39 @@ pub fn get_esplora_url(network: Network) -> &'static str { pub struct EsploraBitcoinAdaptor { esplora: AsyncClient, network: Network, + request_timeout: Duration, } impl EsploraBitcoinAdaptor { pub fn new(network: Network, esplora_url: Option<&str>) -> Self { + let timeout_secs = get_btc_request_timeout_secs(); EsploraBitcoinAdaptor { esplora: Builder::new(esplora_url.unwrap_or(get_esplora_url(network))) .build_async() .expect("Could not build esplora client"), network, + request_timeout: Duration::from_secs(timeout_secs), + } + } + + async fn with_timeout(&self, request_name: &'static str, future: F) -> anyhow::Result + where + F: Future>, + { + match tokio::time::timeout(self.request_timeout, future).await { + Ok(Ok(value)) => Ok(value), + Ok(Err(err)) => Err(err.into()), + Err(_) => { + warn!( + request_name, + timeout_secs = self.request_timeout.as_secs(), + "esplora request timeout" + ); + Err(anyhow!( + "esplora request timeout: {request_name}, timeout_secs={} ", + self.request_timeout.as_secs() + )) + } } } } @@ -36,35 +65,35 @@ impl BitcoinAdaptor for EsploraBitcoinAdaptor { } async fn get_tx_status(&self, txid: &Txid) -> anyhow::Result { - Ok(self.esplora.get_tx_status(txid).await?) + self.with_timeout("get_tx_status", self.esplora.get_tx_status(txid)).await } async fn get_tx(&self, txid: &Txid) -> anyhow::Result> { - Ok(self.esplora.get_tx(txid).await?) + self.with_timeout("get_tx", self.esplora.get_tx(txid)).await } async fn get_tx_info(&self, txid: &Txid) -> anyhow::Result> { - Ok(self.esplora.get_tx_info(txid).await?) + self.with_timeout("get_tx_info", self.esplora.get_tx_info(txid)).await } async fn get_address_utxo(&self, address: BtcAddress) -> anyhow::Result> { - Ok(self.esplora.get_address_utxo(address).await?) + self.with_timeout("get_address_utxo", self.esplora.get_address_utxo(address)).await } async fn get_height(&self) -> anyhow::Result { - Ok(self.esplora.get_height().await?) + self.with_timeout("get_height", self.esplora.get_height()).await } async fn get_fee_estimates(&self) -> anyhow::Result> { - Ok(self.esplora.get_fee_estimates().await?) + self.with_timeout("get_fee_estimates", self.esplora.get_fee_estimates()).await } async fn broadcast(&self, tx: &Transaction) -> anyhow::Result<()> { - Ok(self.esplora.broadcast(tx).await?) + self.with_timeout("broadcast", self.esplora.broadcast(tx)).await } async fn broadcast_package(&self, txns: &[Transaction]) -> anyhow::Result<()> { - Ok(self.esplora.broadcast_package(txns).await?) + self.with_timeout("broadcast_package", self.esplora.broadcast_package(txns)).await } async fn get_output_status( @@ -72,25 +101,25 @@ impl BitcoinAdaptor for EsploraBitcoinAdaptor { txid: &Txid, vout: u64, ) -> anyhow::Result> { - Ok(self.esplora.get_output_status(txid, vout).await?) + self.with_timeout("get_output_status", self.esplora.get_output_status(txid, vout)).await } async fn get_block_hash(&self, block_height: u32) -> anyhow::Result { - Ok(self.esplora.get_block_hash(block_height).await?) + self.with_timeout("get_block_hash", self.esplora.get_block_hash(block_height)).await } async fn get_block_by_hash( &self, block_hash: &bitcoin::BlockHash, ) -> anyhow::Result> { - Ok(self.esplora.get_block_by_hash(block_hash).await?) + self.with_timeout("get_block_by_hash", self.esplora.get_block_by_hash(block_hash)).await } async fn get_merkle_proof(&self, tx_id: &Txid) -> anyhow::Result> { - Ok(self.esplora.get_merkle_proof(tx_id).await?) + self.with_timeout("get_merkle_proof", self.esplora.get_merkle_proof(tx_id)).await } async fn get_header_by_hash(&self, block_hash: &bitcoin::BlockHash) -> anyhow::Result
{ - Ok(self.esplora.get_header_by_hash(block_hash).await?) + self.with_timeout("get_header_by_hash", self.esplora.get_header_by_hash(block_hash)).await } } diff --git a/crates/client/src/goat_chain/goat_adaptor.rs b/crates/client/src/goat_chain/goat_adaptor.rs index e6998b2d..ec9c314a 100644 --- a/crates/client/src/goat_chain/goat_adaptor.rs +++ b/crates/client/src/goat_chain/goat_adaptor.rs @@ -8,6 +8,7 @@ use crate::goat_chain::goat_adaptor::IGateway::IGatewayInstance; use crate::goat_chain::goat_adaptor::IPegBtc::IPegBtcInstance; use crate::goat_chain::goat_adaptor::ISequencerSetPublisher::ISequencerSetPublisherInstance; use crate::goat_chain::goat_adaptor::IStakeManagement::IStakeManagementInstance; +use crate::timeout_config::get_goat_rpc_timeout_secs; use alloy::eips::BlockNumberOrTag; use alloy::providers::Identity; use alloy::providers::ext::DebugApi; @@ -30,6 +31,20 @@ use std::time::Duration; use tokio::time; use uuid::Uuid; +fn build_goat_rpc_client() -> reqwest::Client { + let timeout = Duration::from_secs(get_goat_rpc_timeout_secs()); + match reqwest::Client::builder().timeout(timeout).build() { + Ok(client) => client, + Err(err) => { + tracing::warn!( + timeout_secs = timeout.as_secs(), + "failed to build GOAT RPC reqwest client with timeout: {err}, fallback to default client" + ); + reqwest::Client::new() + } + } +} + sol!( #[derive(Debug)] #[allow(missing_docs)] @@ -261,10 +276,11 @@ pub struct GoatInitConfig { impl GoatInitConfig { pub async fn new(rpc_url: Url) -> anyhow::Result { + let provider = + ProviderBuilder::new().connect_reqwest(build_goat_rpc_client(), rpc_url.clone()); Ok(GoatInitConfig { private_key: None, - chain_id: ProviderBuilder::new().connect_http(rpc_url.clone()).get_chain_id().await? - as u32, + chain_id: provider.get_chain_id().await? as u32, rpc_url, gateway_address: None, sequencer_set_publisher_address: None, @@ -1384,7 +1400,8 @@ impl GoatAdaptor { } else { PrivateKeySigner::random() }; - let provider = ProviderBuilder::new().connect_http(config.rpc_url); + let provider = + ProviderBuilder::new().connect_reqwest(build_goat_rpc_client(), config.rpc_url); Self { chain_id, signer: EthereumWallet::new(signer), diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 909bbea5..23e97107 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -2,6 +2,7 @@ pub mod btc_chain; pub mod goat_chain; pub mod graphs; pub mod http_client; +pub mod timeout_config; mod utils; pub use goat_chain::Utxo; diff --git a/crates/client/src/timeout_config.rs b/crates/client/src/timeout_config.rs new file mode 100644 index 00000000..6190daf2 --- /dev/null +++ b/crates/client/src/timeout_config.rs @@ -0,0 +1,20 @@ +pub const ENV_BTC_REQUEST_TIMEOUT_SECS: &str = "BTC_REQUEST_TIMEOUT_SECS"; +pub const DEFAULT_BTC_REQUEST_TIMEOUT_SECS: u64 = 15; +pub const ENV_GOAT_RPC_TIMEOUT_SECS: &str = "GOAT_RPC_TIMEOUT_SECS"; +pub const DEFAULT_GOAT_RPC_TIMEOUT_SECS: u64 = 20; + +pub fn get_btc_request_timeout_secs() -> u64 { + std::env::var(ENV_BTC_REQUEST_TIMEOUT_SECS) + .ok() + .and_then(|value| value.parse::().ok()) + .map(|timeout| timeout.clamp(1, 120)) + .unwrap_or(DEFAULT_BTC_REQUEST_TIMEOUT_SECS) +} + +pub fn get_goat_rpc_timeout_secs() -> u64 { + std::env::var(ENV_GOAT_RPC_TIMEOUT_SECS) + .ok() + .and_then(|value| value.parse::().ok()) + .map(|timeout| timeout.clamp(1, 120)) + .unwrap_or(DEFAULT_GOAT_RPC_TIMEOUT_SECS) +} diff --git a/node/src/env.rs b/node/src/env.rs index 4e460a9a..260adc1d 100644 --- a/node/src/env.rs +++ b/node/src/env.rs @@ -129,6 +129,8 @@ pub const REGULAR_TASK_INTERVAL_SECOND: u64 = 20; pub const SEQUENCER_SET_MONITOR_INTERVAL_SECS: u64 = 5; pub const ENV_INSTANCE_MAINTENANCE_BATCH_SIZE: &str = "INSTANCE_MAINTENANCE_BATCH_SIZE"; pub const DEFAULT_INSTANCE_MAINTENANCE_BATCH_SIZE: u32 = 50; +pub const ENV_MAINTENANCE_RUN_TIMEOUT_SECS: &str = "MAINTENANCE_RUN_TIMEOUT_SECS"; +pub const DEFAULT_MAINTENANCE_RUN_TIMEOUT_SECS: u64 = 60; pub fn get_network() -> Network { let network = std::env::var(ENV_BITCOIN_NETWORK).unwrap_or("testnet4".to_string()); @@ -541,6 +543,14 @@ pub fn get_instance_maintenance_batch_size() -> u32 { .unwrap_or(DEFAULT_INSTANCE_MAINTENANCE_BATCH_SIZE) } +pub fn get_maintenance_run_timeout_secs() -> u64 { + std::env::var(ENV_MAINTENANCE_RUN_TIMEOUT_SECS) + .ok() + .and_then(|value| value.parse::().ok()) + .map(|timeout| timeout.clamp(1, 300)) + .unwrap_or(DEFAULT_MAINTENANCE_RUN_TIMEOUT_SECS) +} + pub fn should_always_challenge() -> bool { match std::env::var(ENV_ALWAYS_CHALLENGE) { Ok(val) => val.to_lowercase() == "true", diff --git a/node/src/scheduled_tasks/mod.rs b/node/src/scheduled_tasks/mod.rs index 3bf51978..6c79a6d6 100644 --- a/node/src/scheduled_tasks/mod.rs +++ b/node/src/scheduled_tasks/mod.rs @@ -6,7 +6,7 @@ mod sequencer_set_hash_monitor_task; mod spv_maintenance_tasks; use crate::action::GOATMessageContent; -use crate::env::{is_enable_update_spv_contract, is_relayer}; +use crate::env::{get_maintenance_run_timeout_secs, is_enable_update_spv_contract, is_relayer}; use crate::scheduled_tasks::graph_maintenance_tasks::{ detect_init_withdraw_call, detect_kickoff, detect_take1_or_challenge, process_graph_challenge, }; @@ -22,11 +22,11 @@ use client::goat_chain::GOATClient; pub use event_watch_task::{is_processing_gateway_history_events, run_watch_event_task}; pub use sequencer_set_hash_monitor_task::run_sequencer_set_hash_monitor_task; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use store::localdb::{LocalDB, StorageProcessor}; use store::{Graph, MessageType}; use tokio_util::sync::CancellationToken; -use tracing::{error, warn}; +use tracing::{error, info, warn}; async fn fetch_on_turn_graph_by_status<'a>( storage_processor: &mut StorageProcessor<'a>, @@ -50,61 +50,75 @@ async fn run( local_db: &LocalDB, btc_client: Arc, goat_client: Arc, + stage: Arc>, ) -> anyhow::Result<()> { let btc_client = btc_client.as_ref(); let goat_client = goat_client.as_ref(); + *stage.lock().await = "node_available_pbtc_update_monitor"; if (actor == Actor::Operator || is_relayer()) && let Err(err) = node_available_pbtc_update_monitor(local_db, goat_client).await { warn!("node_available_pbtc_update_monitor, err {:?}", err) } + *stage.lock().await = "spv_header_hash_update"; if is_enable_update_spv_contract() && let Err(err) = spv_header_hash_update(btc_client, goat_client).await { warn!("spv_header_hash_update, err {:?}", err) } + *stage.lock().await = "is_processing_gateway_history_events"; if is_processing_gateway_history_events(local_db, goat_client).await? { warn!("Still in history events processing"); return Ok(()); } + *stage.lock().await = "instance_answers_monitor"; if let Err(err) = instance_answers_monitor(local_db, btc_client, goat_client).await { warn!("instance_answers_monitor, err {:?}", err) } + *stage.lock().await = "instance_window_expiration_monitor"; if let Err(err) = instance_window_expiration_monitor(local_db, goat_client).await { warn!("instance_window_expiration_monitor, err {:?}", err) } + *stage.lock().await = "instance_expiration_monitor"; if let Err(err) = instance_expiration_monitor(local_db, btc_client).await { warn!("instance_expiration_monitor, err {:?}", err) } + *stage.lock().await = "instance_btc_tx_monitor"; if let Err(err) = instance_btc_tx_monitor(local_db, btc_client).await { warn!("instance_btc_tx_monitor, err {:?}", err) } + *stage.lock().await = "instance_bridge_out_monitor"; if let Err(err) = instance_bridge_out_monitor(local_db).await { warn!("instance_bridge_out_monitor, err {:?}", err) } + *stage.lock().await = "detect_init_withdraw_call"; if let Err(err) = detect_init_withdraw_call(local_db).await { warn!("detect_init_withdraw_call, err {:?}", err) } + *stage.lock().await = "detect_kickoff"; if let Err(err) = detect_kickoff(local_db, btc_client).await { warn!("detect_kickoff, err {:?}", err) } + *stage.lock().await = "detect_take1_or_challenge"; if let Err(err) = detect_take1_or_challenge(local_db, btc_client).await { warn!("detect_take1_or_challenge, err {:?}", err) } + *stage.lock().await = "process_graph_challenge"; if let Err(err) = process_graph_challenge(local_db, btc_client).await { warn!("process_grpah_challenge, err {:?}", err) } + *stage.lock().await = "completed"; Ok(()) } @@ -116,19 +130,37 @@ pub async fn run_maintenance_tasks( interval: u64, cancellation_token: CancellationToken, ) -> anyhow::Result { + let mut tick: u64 = 0; + let maintenance_run_timeout = Duration::from_secs(get_maintenance_run_timeout_secs()); loop { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(interval)) => { + tick += 1; + let tick_start = Instant::now(); + info!(tick, interval_secs = interval, "maintenance task tick start"); + let stage = Arc::new(tokio::sync::Mutex::new("starting")); // Execute the normal monitoring logic - match run(actor.clone(),&local_db,btc_client.clone(),goat_client.clone()).await - { - Ok(_) => {} - Err(err) => {error!("run_scheduled_tasks, err {:?}", err)} + match tokio::time::timeout( + maintenance_run_timeout, + run(actor.clone(),&local_db,btc_client.clone(),goat_client.clone(), stage.clone()), + ).await { + Ok(Ok(_)) => {} + Ok(Err(err)) => {error!("run_scheduled_tasks, err {:?}", err)} + Err(_) => { + let timeout_stage = *stage.lock().await; + error!( + tick, + timeout_secs = maintenance_run_timeout.as_secs(), + stage = timeout_stage, + "maintenance run timeout" + ) + } } + info!(tick, elapsed_ms = tick_start.elapsed().as_millis() as u64, "maintenance task tick end"); } _ = cancellation_token.cancelled() => { - tracing::info!("Watch event task received shutdown signal"); - return Ok("watch_shutdown".to_string()); + tracing::info!("maintenance task received shutdown signal"); + return Ok("maintenance_shutdown".to_string()); } } } From 0959eabb8ec83271d229ae26cd30198b8ec4ca7e Mon Sep 17 00:00:00 2001 From: KSlashh <48985735+KSlashh@users.noreply.github.com> Date: Mon, 2 Mar 2026 22:19:07 +0800 Subject: [PATCH 2/2] simplify maintenance task --- node/src/scheduled_tasks/mod.rs | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/node/src/scheduled_tasks/mod.rs b/node/src/scheduled_tasks/mod.rs index 6c79a6d6..217361ec 100644 --- a/node/src/scheduled_tasks/mod.rs +++ b/node/src/scheduled_tasks/mod.rs @@ -50,75 +50,61 @@ async fn run( local_db: &LocalDB, btc_client: Arc, goat_client: Arc, - stage: Arc>, ) -> anyhow::Result<()> { let btc_client = btc_client.as_ref(); let goat_client = goat_client.as_ref(); - *stage.lock().await = "node_available_pbtc_update_monitor"; if (actor == Actor::Operator || is_relayer()) && let Err(err) = node_available_pbtc_update_monitor(local_db, goat_client).await { warn!("node_available_pbtc_update_monitor, err {:?}", err) } - *stage.lock().await = "spv_header_hash_update"; if is_enable_update_spv_contract() && let Err(err) = spv_header_hash_update(btc_client, goat_client).await { warn!("spv_header_hash_update, err {:?}", err) } - *stage.lock().await = "is_processing_gateway_history_events"; if is_processing_gateway_history_events(local_db, goat_client).await? { warn!("Still in history events processing"); return Ok(()); } - *stage.lock().await = "instance_answers_monitor"; if let Err(err) = instance_answers_monitor(local_db, btc_client, goat_client).await { warn!("instance_answers_monitor, err {:?}", err) } - *stage.lock().await = "instance_window_expiration_monitor"; if let Err(err) = instance_window_expiration_monitor(local_db, goat_client).await { warn!("instance_window_expiration_monitor, err {:?}", err) } - *stage.lock().await = "instance_expiration_monitor"; if let Err(err) = instance_expiration_monitor(local_db, btc_client).await { warn!("instance_expiration_monitor, err {:?}", err) } - *stage.lock().await = "instance_btc_tx_monitor"; if let Err(err) = instance_btc_tx_monitor(local_db, btc_client).await { warn!("instance_btc_tx_monitor, err {:?}", err) } - *stage.lock().await = "instance_bridge_out_monitor"; if let Err(err) = instance_bridge_out_monitor(local_db).await { warn!("instance_bridge_out_monitor, err {:?}", err) } - *stage.lock().await = "detect_init_withdraw_call"; if let Err(err) = detect_init_withdraw_call(local_db).await { warn!("detect_init_withdraw_call, err {:?}", err) } - *stage.lock().await = "detect_kickoff"; if let Err(err) = detect_kickoff(local_db, btc_client).await { warn!("detect_kickoff, err {:?}", err) } - *stage.lock().await = "detect_take1_or_challenge"; if let Err(err) = detect_take1_or_challenge(local_db, btc_client).await { warn!("detect_take1_or_challenge, err {:?}", err) } - *stage.lock().await = "process_graph_challenge"; if let Err(err) = process_graph_challenge(local_db, btc_client).await { warn!("process_grpah_challenge, err {:?}", err) } - *stage.lock().await = "completed"; Ok(()) } @@ -138,20 +124,17 @@ pub async fn run_maintenance_tasks( tick += 1; let tick_start = Instant::now(); info!(tick, interval_secs = interval, "maintenance task tick start"); - let stage = Arc::new(tokio::sync::Mutex::new("starting")); // Execute the normal monitoring logic match tokio::time::timeout( maintenance_run_timeout, - run(actor.clone(),&local_db,btc_client.clone(),goat_client.clone(), stage.clone()), + run(actor.clone(),&local_db,btc_client.clone(),goat_client.clone()), ).await { Ok(Ok(_)) => {} Ok(Err(err)) => {error!("run_scheduled_tasks, err {:?}", err)} Err(_) => { - let timeout_stage = *stage.lock().await; error!( tick, timeout_secs = maintenance_run_timeout.as_secs(), - stage = timeout_stage, "maintenance run timeout" ) }