diff --git a/crates/client/src/btc_chain/esplora_bitcoin_adaptor.rs b/crates/client/src/btc_chain/esplora_bitcoin_adaptor.rs index c0dd6df5..693d194b 100644 --- a/crates/client/src/btc_chain/esplora_bitcoin_adaptor.rs +++ b/crates/client/src/btc_chain/esplora_bitcoin_adaptor.rs @@ -1,7 +1,12 @@ use crate::btc_chain::bitcoin_adaptor::BitcoinAdaptor; +use crate::timeout_config::get_btc_request_timeout_secs; +use anyhow::anyhow; use bitcoin::block::Header; use bitcoin::{Address as BtcAddress, Block, Network, Transaction, Txid}; use esplora_client::{AsyncClient, Builder, MerkleProof, Tx, Utxo}; +use std::future::Future; +use std::time::Duration; +use tracing::warn; const TEST_URL: &str = "https://mempool.space/testnet/api"; const MAIN_URL: &str = "https://mempool.space/api"; @@ -16,15 +21,39 @@ pub fn get_esplora_url(network: Network) -> &'static str { pub struct EsploraBitcoinAdaptor { esplora: AsyncClient, network: Network, + request_timeout: Duration, } impl EsploraBitcoinAdaptor { pub fn new(network: Network, esplora_url: Option<&str>) -> Self { + let timeout_secs = get_btc_request_timeout_secs(); EsploraBitcoinAdaptor { esplora: Builder::new(esplora_url.unwrap_or(get_esplora_url(network))) .build_async() .expect("Could not build esplora client"), network, + request_timeout: Duration::from_secs(timeout_secs), + } + } + + async fn with_timeout(&self, request_name: &'static str, future: F) -> anyhow::Result + where + F: Future>, + { + match tokio::time::timeout(self.request_timeout, future).await { + Ok(Ok(value)) => Ok(value), + Ok(Err(err)) => Err(err.into()), + Err(_) => { + warn!( + request_name, + timeout_secs = self.request_timeout.as_secs(), + "esplora request timeout" + ); + Err(anyhow!( + "esplora request timeout: {request_name}, timeout_secs={} ", + self.request_timeout.as_secs() + )) + } } } } @@ -36,35 +65,35 @@ impl BitcoinAdaptor for EsploraBitcoinAdaptor { } async fn get_tx_status(&self, txid: &Txid) -> anyhow::Result { - Ok(self.esplora.get_tx_status(txid).await?) + self.with_timeout("get_tx_status", self.esplora.get_tx_status(txid)).await } async fn get_tx(&self, txid: &Txid) -> anyhow::Result> { - Ok(self.esplora.get_tx(txid).await?) + self.with_timeout("get_tx", self.esplora.get_tx(txid)).await } async fn get_tx_info(&self, txid: &Txid) -> anyhow::Result> { - Ok(self.esplora.get_tx_info(txid).await?) + self.with_timeout("get_tx_info", self.esplora.get_tx_info(txid)).await } async fn get_address_utxo(&self, address: BtcAddress) -> anyhow::Result> { - Ok(self.esplora.get_address_utxo(address).await?) + self.with_timeout("get_address_utxo", self.esplora.get_address_utxo(address)).await } async fn get_height(&self) -> anyhow::Result { - Ok(self.esplora.get_height().await?) + self.with_timeout("get_height", self.esplora.get_height()).await } async fn get_fee_estimates(&self) -> anyhow::Result> { - Ok(self.esplora.get_fee_estimates().await?) + self.with_timeout("get_fee_estimates", self.esplora.get_fee_estimates()).await } async fn broadcast(&self, tx: &Transaction) -> anyhow::Result<()> { - Ok(self.esplora.broadcast(tx).await?) + self.with_timeout("broadcast", self.esplora.broadcast(tx)).await } async fn broadcast_package(&self, txns: &[Transaction]) -> anyhow::Result<()> { - Ok(self.esplora.broadcast_package(txns).await?) + self.with_timeout("broadcast_package", self.esplora.broadcast_package(txns)).await } async fn get_output_status( @@ -72,25 +101,25 @@ impl BitcoinAdaptor for EsploraBitcoinAdaptor { txid: &Txid, vout: u64, ) -> anyhow::Result> { - Ok(self.esplora.get_output_status(txid, vout).await?) + self.with_timeout("get_output_status", self.esplora.get_output_status(txid, vout)).await } async fn get_block_hash(&self, block_height: u32) -> anyhow::Result { - Ok(self.esplora.get_block_hash(block_height).await?) + self.with_timeout("get_block_hash", self.esplora.get_block_hash(block_height)).await } async fn get_block_by_hash( &self, block_hash: &bitcoin::BlockHash, ) -> anyhow::Result> { - Ok(self.esplora.get_block_by_hash(block_hash).await?) + self.with_timeout("get_block_by_hash", self.esplora.get_block_by_hash(block_hash)).await } async fn get_merkle_proof(&self, tx_id: &Txid) -> anyhow::Result> { - Ok(self.esplora.get_merkle_proof(tx_id).await?) + self.with_timeout("get_merkle_proof", self.esplora.get_merkle_proof(tx_id)).await } async fn get_header_by_hash(&self, block_hash: &bitcoin::BlockHash) -> anyhow::Result
{ - Ok(self.esplora.get_header_by_hash(block_hash).await?) + self.with_timeout("get_header_by_hash", self.esplora.get_header_by_hash(block_hash)).await } } diff --git a/crates/client/src/goat_chain/goat_adaptor.rs b/crates/client/src/goat_chain/goat_adaptor.rs index e6998b2d..ec9c314a 100644 --- a/crates/client/src/goat_chain/goat_adaptor.rs +++ b/crates/client/src/goat_chain/goat_adaptor.rs @@ -8,6 +8,7 @@ use crate::goat_chain::goat_adaptor::IGateway::IGatewayInstance; use crate::goat_chain::goat_adaptor::IPegBtc::IPegBtcInstance; use crate::goat_chain::goat_adaptor::ISequencerSetPublisher::ISequencerSetPublisherInstance; use crate::goat_chain::goat_adaptor::IStakeManagement::IStakeManagementInstance; +use crate::timeout_config::get_goat_rpc_timeout_secs; use alloy::eips::BlockNumberOrTag; use alloy::providers::Identity; use alloy::providers::ext::DebugApi; @@ -30,6 +31,20 @@ use std::time::Duration; use tokio::time; use uuid::Uuid; +fn build_goat_rpc_client() -> reqwest::Client { + let timeout = Duration::from_secs(get_goat_rpc_timeout_secs()); + match reqwest::Client::builder().timeout(timeout).build() { + Ok(client) => client, + Err(err) => { + tracing::warn!( + timeout_secs = timeout.as_secs(), + "failed to build GOAT RPC reqwest client with timeout: {err}, fallback to default client" + ); + reqwest::Client::new() + } + } +} + sol!( #[derive(Debug)] #[allow(missing_docs)] @@ -261,10 +276,11 @@ pub struct GoatInitConfig { impl GoatInitConfig { pub async fn new(rpc_url: Url) -> anyhow::Result { + let provider = + ProviderBuilder::new().connect_reqwest(build_goat_rpc_client(), rpc_url.clone()); Ok(GoatInitConfig { private_key: None, - chain_id: ProviderBuilder::new().connect_http(rpc_url.clone()).get_chain_id().await? - as u32, + chain_id: provider.get_chain_id().await? as u32, rpc_url, gateway_address: None, sequencer_set_publisher_address: None, @@ -1384,7 +1400,8 @@ impl GoatAdaptor { } else { PrivateKeySigner::random() }; - let provider = ProviderBuilder::new().connect_http(config.rpc_url); + let provider = + ProviderBuilder::new().connect_reqwest(build_goat_rpc_client(), config.rpc_url); Self { chain_id, signer: EthereumWallet::new(signer), diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 909bbea5..23e97107 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -2,6 +2,7 @@ pub mod btc_chain; pub mod goat_chain; pub mod graphs; pub mod http_client; +pub mod timeout_config; mod utils; pub use goat_chain::Utxo; diff --git a/crates/client/src/timeout_config.rs b/crates/client/src/timeout_config.rs new file mode 100644 index 00000000..6190daf2 --- /dev/null +++ b/crates/client/src/timeout_config.rs @@ -0,0 +1,20 @@ +pub const ENV_BTC_REQUEST_TIMEOUT_SECS: &str = "BTC_REQUEST_TIMEOUT_SECS"; +pub const DEFAULT_BTC_REQUEST_TIMEOUT_SECS: u64 = 15; +pub const ENV_GOAT_RPC_TIMEOUT_SECS: &str = "GOAT_RPC_TIMEOUT_SECS"; +pub const DEFAULT_GOAT_RPC_TIMEOUT_SECS: u64 = 20; + +pub fn get_btc_request_timeout_secs() -> u64 { + std::env::var(ENV_BTC_REQUEST_TIMEOUT_SECS) + .ok() + .and_then(|value| value.parse::().ok()) + .map(|timeout| timeout.clamp(1, 120)) + .unwrap_or(DEFAULT_BTC_REQUEST_TIMEOUT_SECS) +} + +pub fn get_goat_rpc_timeout_secs() -> u64 { + std::env::var(ENV_GOAT_RPC_TIMEOUT_SECS) + .ok() + .and_then(|value| value.parse::().ok()) + .map(|timeout| timeout.clamp(1, 120)) + .unwrap_or(DEFAULT_GOAT_RPC_TIMEOUT_SECS) +} diff --git a/node/src/env.rs b/node/src/env.rs index 4e460a9a..260adc1d 100644 --- a/node/src/env.rs +++ b/node/src/env.rs @@ -129,6 +129,8 @@ pub const REGULAR_TASK_INTERVAL_SECOND: u64 = 20; pub const SEQUENCER_SET_MONITOR_INTERVAL_SECS: u64 = 5; pub const ENV_INSTANCE_MAINTENANCE_BATCH_SIZE: &str = "INSTANCE_MAINTENANCE_BATCH_SIZE"; pub const DEFAULT_INSTANCE_MAINTENANCE_BATCH_SIZE: u32 = 50; +pub const ENV_MAINTENANCE_RUN_TIMEOUT_SECS: &str = "MAINTENANCE_RUN_TIMEOUT_SECS"; +pub const DEFAULT_MAINTENANCE_RUN_TIMEOUT_SECS: u64 = 60; pub fn get_network() -> Network { let network = std::env::var(ENV_BITCOIN_NETWORK).unwrap_or("testnet4".to_string()); @@ -541,6 +543,14 @@ pub fn get_instance_maintenance_batch_size() -> u32 { .unwrap_or(DEFAULT_INSTANCE_MAINTENANCE_BATCH_SIZE) } +pub fn get_maintenance_run_timeout_secs() -> u64 { + std::env::var(ENV_MAINTENANCE_RUN_TIMEOUT_SECS) + .ok() + .and_then(|value| value.parse::().ok()) + .map(|timeout| timeout.clamp(1, 300)) + .unwrap_or(DEFAULT_MAINTENANCE_RUN_TIMEOUT_SECS) +} + pub fn should_always_challenge() -> bool { match std::env::var(ENV_ALWAYS_CHALLENGE) { Ok(val) => val.to_lowercase() == "true", diff --git a/node/src/scheduled_tasks/mod.rs b/node/src/scheduled_tasks/mod.rs index 3bf51978..217361ec 100644 --- a/node/src/scheduled_tasks/mod.rs +++ b/node/src/scheduled_tasks/mod.rs @@ -6,7 +6,7 @@ mod sequencer_set_hash_monitor_task; mod spv_maintenance_tasks; use crate::action::GOATMessageContent; -use crate::env::{is_enable_update_spv_contract, is_relayer}; +use crate::env::{get_maintenance_run_timeout_secs, is_enable_update_spv_contract, is_relayer}; use crate::scheduled_tasks::graph_maintenance_tasks::{ detect_init_withdraw_call, detect_kickoff, detect_take1_or_challenge, process_graph_challenge, }; @@ -22,11 +22,11 @@ use client::goat_chain::GOATClient; pub use event_watch_task::{is_processing_gateway_history_events, run_watch_event_task}; pub use sequencer_set_hash_monitor_task::run_sequencer_set_hash_monitor_task; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use store::localdb::{LocalDB, StorageProcessor}; use store::{Graph, MessageType}; use tokio_util::sync::CancellationToken; -use tracing::{error, warn}; +use tracing::{error, info, warn}; async fn fetch_on_turn_graph_by_status<'a>( storage_processor: &mut StorageProcessor<'a>, @@ -116,19 +116,34 @@ pub async fn run_maintenance_tasks( interval: u64, cancellation_token: CancellationToken, ) -> anyhow::Result { + let mut tick: u64 = 0; + let maintenance_run_timeout = Duration::from_secs(get_maintenance_run_timeout_secs()); loop { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(interval)) => { + tick += 1; + let tick_start = Instant::now(); + info!(tick, interval_secs = interval, "maintenance task tick start"); // Execute the normal monitoring logic - match run(actor.clone(),&local_db,btc_client.clone(),goat_client.clone()).await - { - Ok(_) => {} - Err(err) => {error!("run_scheduled_tasks, err {:?}", err)} + match tokio::time::timeout( + maintenance_run_timeout, + run(actor.clone(),&local_db,btc_client.clone(),goat_client.clone()), + ).await { + Ok(Ok(_)) => {} + Ok(Err(err)) => {error!("run_scheduled_tasks, err {:?}", err)} + Err(_) => { + error!( + tick, + timeout_secs = maintenance_run_timeout.as_secs(), + "maintenance run timeout" + ) + } } + info!(tick, elapsed_ms = tick_start.elapsed().as_millis() as u64, "maintenance task tick end"); } _ = cancellation_token.cancelled() => { - tracing::info!("Watch event task received shutdown signal"); - return Ok("watch_shutdown".to_string()); + tracing::info!("maintenance task received shutdown signal"); + return Ok("maintenance_shutdown".to_string()); } } }