Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 42 additions & 13 deletions crates/client/src/btc_chain/esplora_bitcoin_adaptor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use crate::btc_chain::bitcoin_adaptor::BitcoinAdaptor;
use crate::timeout_config::get_btc_request_timeout_secs;
use anyhow::anyhow;
use bitcoin::block::Header;
use bitcoin::{Address as BtcAddress, Block, Network, Transaction, Txid};
use esplora_client::{AsyncClient, Builder, MerkleProof, Tx, Utxo};
use std::future::Future;
use std::time::Duration;
use tracing::warn;

const TEST_URL: &str = "https://mempool.space/testnet/api";
const MAIN_URL: &str = "https://mempool.space/api";
Expand All @@ -16,15 +21,39 @@ pub fn get_esplora_url(network: Network) -> &'static str {
pub struct EsploraBitcoinAdaptor {
esplora: AsyncClient,
network: Network,
request_timeout: Duration,
}

impl EsploraBitcoinAdaptor {
pub fn new(network: Network, esplora_url: Option<&str>) -> Self {
let timeout_secs = get_btc_request_timeout_secs();
EsploraBitcoinAdaptor {
esplora: Builder::new(esplora_url.unwrap_or(get_esplora_url(network)))
.build_async()
.expect("Could not build esplora client"),
network,
request_timeout: Duration::from_secs(timeout_secs),
}
}

async fn with_timeout<T, F>(&self, request_name: &'static str, future: F) -> anyhow::Result<T>
where
F: Future<Output = Result<T, esplora_client::Error>>,
{
match tokio::time::timeout(self.request_timeout, future).await {
Ok(Ok(value)) => Ok(value),
Ok(Err(err)) => Err(err.into()),
Err(_) => {
warn!(
request_name,
timeout_secs = self.request_timeout.as_secs(),
"esplora request timeout"
);
Err(anyhow!(
"esplora request timeout: {request_name}, timeout_secs={} ",
self.request_timeout.as_secs()
))
}
}
}
}
Expand All @@ -36,61 +65,61 @@ impl BitcoinAdaptor for EsploraBitcoinAdaptor {
}

async fn get_tx_status(&self, txid: &Txid) -> anyhow::Result<esplora_client::TxStatus> {
Ok(self.esplora.get_tx_status(txid).await?)
self.with_timeout("get_tx_status", self.esplora.get_tx_status(txid)).await
}

async fn get_tx(&self, txid: &Txid) -> anyhow::Result<Option<Transaction>> {
Ok(self.esplora.get_tx(txid).await?)
self.with_timeout("get_tx", self.esplora.get_tx(txid)).await
}

async fn get_tx_info(&self, txid: &Txid) -> anyhow::Result<Option<Tx>> {
Ok(self.esplora.get_tx_info(txid).await?)
self.with_timeout("get_tx_info", self.esplora.get_tx_info(txid)).await
}

async fn get_address_utxo(&self, address: BtcAddress) -> anyhow::Result<Vec<Utxo>> {
Ok(self.esplora.get_address_utxo(address).await?)
self.with_timeout("get_address_utxo", self.esplora.get_address_utxo(address)).await
}

async fn get_height(&self) -> anyhow::Result<u32> {
Ok(self.esplora.get_height().await?)
self.with_timeout("get_height", self.esplora.get_height()).await
}

async fn get_fee_estimates(&self) -> anyhow::Result<std::collections::HashMap<u16, f64>> {
Ok(self.esplora.get_fee_estimates().await?)
self.with_timeout("get_fee_estimates", self.esplora.get_fee_estimates()).await
}

async fn broadcast(&self, tx: &Transaction) -> anyhow::Result<()> {
Ok(self.esplora.broadcast(tx).await?)
self.with_timeout("broadcast", self.esplora.broadcast(tx)).await
}

async fn broadcast_package(&self, txns: &[Transaction]) -> anyhow::Result<()> {
Ok(self.esplora.broadcast_package(txns).await?)
self.with_timeout("broadcast_package", self.esplora.broadcast_package(txns)).await
}

async fn get_output_status(
&self,
txid: &Txid,
vout: u64,
) -> anyhow::Result<Option<esplora_client::OutputStatus>> {
Ok(self.esplora.get_output_status(txid, vout).await?)
self.with_timeout("get_output_status", self.esplora.get_output_status(txid, vout)).await
}

async fn get_block_hash(&self, block_height: u32) -> anyhow::Result<bitcoin::BlockHash> {
Ok(self.esplora.get_block_hash(block_height).await?)
self.with_timeout("get_block_hash", self.esplora.get_block_hash(block_height)).await
}

async fn get_block_by_hash(
&self,
block_hash: &bitcoin::BlockHash,
) -> anyhow::Result<Option<Block>> {
Ok(self.esplora.get_block_by_hash(block_hash).await?)
self.with_timeout("get_block_by_hash", self.esplora.get_block_by_hash(block_hash)).await
}

async fn get_merkle_proof(&self, tx_id: &Txid) -> anyhow::Result<Option<MerkleProof>> {
Ok(self.esplora.get_merkle_proof(tx_id).await?)
self.with_timeout("get_merkle_proof", self.esplora.get_merkle_proof(tx_id)).await
}

async fn get_header_by_hash(&self, block_hash: &bitcoin::BlockHash) -> anyhow::Result<Header> {
Ok(self.esplora.get_header_by_hash(block_hash).await?)
self.with_timeout("get_header_by_hash", self.esplora.get_header_by_hash(block_hash)).await
}
}
23 changes: 20 additions & 3 deletions crates/client/src/goat_chain/goat_adaptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::goat_chain::goat_adaptor::IGateway::IGatewayInstance;
use crate::goat_chain::goat_adaptor::IPegBtc::IPegBtcInstance;
use crate::goat_chain::goat_adaptor::ISequencerSetPublisher::ISequencerSetPublisherInstance;
use crate::goat_chain::goat_adaptor::IStakeManagement::IStakeManagementInstance;
use crate::timeout_config::get_goat_rpc_timeout_secs;
use alloy::eips::BlockNumberOrTag;
use alloy::providers::Identity;
use alloy::providers::ext::DebugApi;
Expand All @@ -30,6 +31,20 @@ use std::time::Duration;
use tokio::time;
use uuid::Uuid;

fn build_goat_rpc_client() -> reqwest::Client {
let timeout = Duration::from_secs(get_goat_rpc_timeout_secs());
match reqwest::Client::builder().timeout(timeout).build() {
Ok(client) => client,
Err(err) => {
tracing::warn!(
timeout_secs = timeout.as_secs(),
"failed to build GOAT RPC reqwest client with timeout: {err}, fallback to default client"
);
reqwest::Client::new()
}
}
}

sol!(
#[derive(Debug)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -261,10 +276,11 @@ pub struct GoatInitConfig {

impl GoatInitConfig {
pub async fn new(rpc_url: Url) -> anyhow::Result<GoatInitConfig> {
let provider =
ProviderBuilder::new().connect_reqwest(build_goat_rpc_client(), rpc_url.clone());
Ok(GoatInitConfig {
private_key: None,
chain_id: ProviderBuilder::new().connect_http(rpc_url.clone()).get_chain_id().await?
as u32,
chain_id: provider.get_chain_id().await? as u32,
rpc_url,
gateway_address: None,
sequencer_set_publisher_address: None,
Expand Down Expand Up @@ -1384,7 +1400,8 @@ impl GoatAdaptor {
} else {
PrivateKeySigner::random()
};
let provider = ProviderBuilder::new().connect_http(config.rpc_url);
let provider =
ProviderBuilder::new().connect_reqwest(build_goat_rpc_client(), config.rpc_url);
Self {
chain_id,
signer: EthereumWallet::new(signer),
Expand Down
1 change: 1 addition & 0 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod btc_chain;
pub mod goat_chain;
pub mod graphs;
pub mod http_client;
pub mod timeout_config;
mod utils;

pub use goat_chain::Utxo;
20 changes: 20 additions & 0 deletions crates/client/src/timeout_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
pub const ENV_BTC_REQUEST_TIMEOUT_SECS: &str = "BTC_REQUEST_TIMEOUT_SECS";
pub const DEFAULT_BTC_REQUEST_TIMEOUT_SECS: u64 = 15;
pub const ENV_GOAT_RPC_TIMEOUT_SECS: &str = "GOAT_RPC_TIMEOUT_SECS";
pub const DEFAULT_GOAT_RPC_TIMEOUT_SECS: u64 = 20;

pub fn get_btc_request_timeout_secs() -> u64 {
std::env::var(ENV_BTC_REQUEST_TIMEOUT_SECS)
.ok()
.and_then(|value| value.parse::<u64>().ok())
.map(|timeout| timeout.clamp(1, 120))
.unwrap_or(DEFAULT_BTC_REQUEST_TIMEOUT_SECS)
}

pub fn get_goat_rpc_timeout_secs() -> u64 {
std::env::var(ENV_GOAT_RPC_TIMEOUT_SECS)
.ok()
.and_then(|value| value.parse::<u64>().ok())
.map(|timeout| timeout.clamp(1, 120))
.unwrap_or(DEFAULT_GOAT_RPC_TIMEOUT_SECS)
}
10 changes: 10 additions & 0 deletions node/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ pub const REGULAR_TASK_INTERVAL_SECOND: u64 = 20;
pub const SEQUENCER_SET_MONITOR_INTERVAL_SECS: u64 = 5;
pub const ENV_INSTANCE_MAINTENANCE_BATCH_SIZE: &str = "INSTANCE_MAINTENANCE_BATCH_SIZE";
pub const DEFAULT_INSTANCE_MAINTENANCE_BATCH_SIZE: u32 = 50;
pub const ENV_MAINTENANCE_RUN_TIMEOUT_SECS: &str = "MAINTENANCE_RUN_TIMEOUT_SECS";
pub const DEFAULT_MAINTENANCE_RUN_TIMEOUT_SECS: u64 = 60;

pub fn get_network() -> Network {
let network = std::env::var(ENV_BITCOIN_NETWORK).unwrap_or("testnet4".to_string());
Expand Down Expand Up @@ -541,6 +543,14 @@ pub fn get_instance_maintenance_batch_size() -> u32 {
.unwrap_or(DEFAULT_INSTANCE_MAINTENANCE_BATCH_SIZE)
}

pub fn get_maintenance_run_timeout_secs() -> u64 {
std::env::var(ENV_MAINTENANCE_RUN_TIMEOUT_SECS)
.ok()
.and_then(|value| value.parse::<u64>().ok())
.map(|timeout| timeout.clamp(1, 300))
.unwrap_or(DEFAULT_MAINTENANCE_RUN_TIMEOUT_SECS)
}

pub fn should_always_challenge() -> bool {
match std::env::var(ENV_ALWAYS_CHALLENGE) {
Ok(val) => val.to_lowercase() == "true",
Expand Down
33 changes: 24 additions & 9 deletions node/src/scheduled_tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod sequencer_set_hash_monitor_task;
mod spv_maintenance_tasks;

use crate::action::GOATMessageContent;
use crate::env::{is_enable_update_spv_contract, is_relayer};
use crate::env::{get_maintenance_run_timeout_secs, is_enable_update_spv_contract, is_relayer};
use crate::scheduled_tasks::graph_maintenance_tasks::{
detect_init_withdraw_call, detect_kickoff, detect_take1_or_challenge, process_graph_challenge,
};
Expand All @@ -22,11 +22,11 @@ use client::goat_chain::GOATClient;
pub use event_watch_task::{is_processing_gateway_history_events, run_watch_event_task};
pub use sequencer_set_hash_monitor_task::run_sequencer_set_hash_monitor_task;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use store::localdb::{LocalDB, StorageProcessor};
use store::{Graph, MessageType};
use tokio_util::sync::CancellationToken;
use tracing::{error, warn};
use tracing::{error, info, warn};

async fn fetch_on_turn_graph_by_status<'a>(
storage_processor: &mut StorageProcessor<'a>,
Expand Down Expand Up @@ -116,19 +116,34 @@ pub async fn run_maintenance_tasks(
interval: u64,
cancellation_token: CancellationToken,
) -> anyhow::Result<String> {
let mut tick: u64 = 0;
let maintenance_run_timeout = Duration::from_secs(get_maintenance_run_timeout_secs());
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(interval)) => {
tick += 1;
let tick_start = Instant::now();
info!(tick, interval_secs = interval, "maintenance task tick start");
// Execute the normal monitoring logic
match run(actor.clone(),&local_db,btc_client.clone(),goat_client.clone()).await
{
Ok(_) => {}
Err(err) => {error!("run_scheduled_tasks, err {:?}", err)}
match tokio::time::timeout(
maintenance_run_timeout,
run(actor.clone(),&local_db,btc_client.clone(),goat_client.clone()),
).await {
Ok(Ok(_)) => {}
Ok(Err(err)) => {error!("run_scheduled_tasks, err {:?}", err)}
Err(_) => {
error!(
tick,
timeout_secs = maintenance_run_timeout.as_secs(),
"maintenance run timeout"
)
}
}
info!(tick, elapsed_ms = tick_start.elapsed().as_millis() as u64, "maintenance task tick end");
}
_ = cancellation_token.cancelled() => {
tracing::info!("Watch event task received shutdown signal");
return Ok("watch_shutdown".to_string());
tracing::info!("maintenance task received shutdown signal");
return Ok("maintenance_shutdown".to_string());
}
}
}
Expand Down
Loading