diff --git a/src/chain/store/index.rs b/src/chain/store/index.rs index 65c8d80fa582..2d8febeb00d6 100644 --- a/src/chain/store/index.rs +++ b/src/chain/store/index.rs @@ -130,7 +130,7 @@ impl ChainIndex { SizeTrackingLruCache::new_with_metrics( "tipset_by_height".into(), // 20480 * 900 = 18432000 which is sufficient for mainnet - 20480.try_into().expect("infallible"), + nonzero!(20480_usize), ) }); diff --git a/src/interpreter/vm.rs b/src/interpreter/vm.rs index 6985c695577a..3057d3e2f860 100644 --- a/src/interpreter/vm.rs +++ b/src/interpreter/vm.rs @@ -77,8 +77,11 @@ type ForestExecutorV4 = DefaultExecutor_v4>; pub type ApplyResult = anyhow::Result<(ApplyRet, Duration)>; -pub type ApplyBlockResult = - anyhow::Result<(Vec, Vec>, Vec>), anyhow::Error>; +pub type ApplyBlockResult = anyhow::Result<( + Vec, + Vec>>, + Vec>, +)>; /// Comes from pub const IMPLICIT_MESSAGE_GAS_LIMIT: i64 = i64::MAX / 2; @@ -387,7 +390,11 @@ where receipts.push(msg_receipt.clone()); events_roots.push(ret.msg_receipt().events_root()); - events.push(ret.events()); + if ret.msg_receipt().events_root().is_some() { + events.push(Some(ret.events())); + } else { + events.push(None); + } // Add processed Cid to set of processed messages processed.insert(cid); diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 801486b68ffd..83ba7058270e 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -500,10 +500,8 @@ impl Block { let ExecutedTipset { state_root, executed_messages, - } = ctx - .state_manager - .load_executed_tipset_without_events(&tipset) - .await?; + .. + } = ctx.state_manager.load_executed_tipset(&tipset).await?; let has_transactions = !executed_messages.is_empty(); let state_tree = ctx.state_manager.get_state_tree(&state_root)?; @@ -1419,10 +1417,8 @@ async fn get_block_receipts( let ExecutedTipset { state_root, executed_messages, - } = ctx - .state_manager - .load_executed_tipset_without_events(&ts_ref) - .await?; + .. + } = ctx.state_manager.load_executed_tipset(&ts_ref).await?; // Load the state tree let state_tree = ctx.state_manager.get_state_tree(&state_root)?; @@ -1933,10 +1929,7 @@ async fn eth_fee_history( let base_fee = &ts.block_headers().first().parent_base_fee; let ExecutedTipset { executed_messages, .. - } = ctx - .state_manager - .load_executed_tipset_without_events(&ts) - .await?; + } = ctx.state_manager.load_executed_tipset(&ts).await?; let mut tx_gas_rewards = Vec::with_capacity(executed_messages.len()); for ExecutedMessage { message, receipt, .. diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index a60b9aaa23d4..c62cfcf7d385 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -54,7 +54,8 @@ use crate::shim::{ use crate::state_manager::cache::TipsetStateCache; use crate::state_manager::chain_rand::draw_randomness; use crate::state_migration::run_state_migrations; -use crate::utils::get_size::GetSize; +use crate::utils::cache::SizeTrackingLruCache; +use crate::utils::get_size::{GetSize, vec_heap_size_helper}; use ahash::{HashMap, HashMapExt}; use anyhow::{Context as _, bail, ensure}; use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _}; @@ -78,6 +79,7 @@ use rayon::prelude::ParallelBridge; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::ops::RangeInclusive; +use std::sync::LazyLock; use std::time::Duration; use std::{num::NonZeroUsize, sync::Arc}; use tokio::sync::{RwLock, broadcast::error::RecvError}; @@ -89,30 +91,55 @@ pub const EVENTS_AMT_BITWIDTH: u32 = 5; /// Intermediary for retrieving state objects and updating actor states. type CidPair = (Cid, Cid); +fn executed_tipset_cache() -> &'static SizeTrackingLruCache { + // A tipset key should always map to a deterministic state output, so it's safe to cache the entire executed tipset with the same key. + static CACHE: LazyLock> = LazyLock::new(|| { + // 100-200MiB on mainet with capacity 1024 + SizeTrackingLruCache::new_with_metrics("executed_tipset".into(), nonzero!(1024usize)) + }); + &CACHE +} + /// Result of executing an individual chain message in a tipset. /// /// Includes the executed message itself, the execution receipt, and /// optional events emitted by the actor during execution. +#[derive(Debug, Clone)] pub struct ExecutedMessage { pub message: ChainMessage, pub receipt: Receipt, pub events: Option>, } +impl GetSize for ExecutedMessage { + fn get_heap_size(&self) -> usize { + self.message.get_heap_size() + + self.receipt.get_heap_size() + + self + .events + .as_ref() + .map(vec_heap_size_helper) + .unwrap_or_default() + } +} + /// Aggregated execution result for a tipset. /// /// `state_root` is the resulting state tree root after message execution /// and `executed_messages` contains per-message execution details. +#[derive(Debug, Clone)] pub struct ExecutedTipset { pub state_root: Cid, + #[allow(dead_code)] + pub receipt_root: Cid, pub executed_messages: Vec, } -/// Options controlling how `load_executed_tipset` fetches extra execution data. -/// -/// `include_events` toggles whether event logs are loaded from receipts. -pub struct LoadExecutedTipsetOptions { - pub include_events: bool, +impl GetSize for ExecutedTipset { + fn get_heap_size(&self) -> usize { + // state_root (Cid) and receipt_root (Cid) have no heap allocation, so we only calculate the heap size of executed_messages + vec_heap_size_helper(&self.executed_messages) + } } #[derive(Debug, Default, Clone, GetSize)] @@ -471,38 +498,21 @@ where .await } - /// Load an executed tipset, including message receipts and state root, - /// without loading event logs from receipts. - pub async fn load_executed_tipset_without_events( - self: &Arc, - ts: &Tipset, - ) -> anyhow::Result { - let receipt_ts = self.chain_store().load_child_tipset(ts).ok(); - self.load_executed_tipset_inner( - ts, - receipt_ts.as_ref(), - LoadExecutedTipsetOptions { - include_events: false, - }, - ) - .await - } - - /// Load an executed tipset, including message receipts and state root, - /// with event logs loaded when available. + /// Load an executed tipset, including state root, message receipts and events with caching. pub async fn load_executed_tipset( self: &Arc, ts: &Tipset, ) -> anyhow::Result { + let cache = executed_tipset_cache(); + if let Some(cached) = cache.get_cloned(ts.key()) { + return Ok(cached); + } let receipt_ts = self.chain_store().load_child_tipset(ts).ok(); - self.load_executed_tipset_inner( - ts, - receipt_ts.as_ref(), - LoadExecutedTipsetOptions { - include_events: true, - }, - ) - .await + let result = self + .load_executed_tipset_inner(ts, receipt_ts.as_ref()) + .await?; + cache.push(ts.key().clone(), result.clone()); + Ok(result) } async fn load_executed_tipset_inner( @@ -510,9 +520,7 @@ where msg_ts: &Tipset, // when `msg_ts` is the current head, `receipt_ts` is `None` receipt_ts: Option<&Tipset>, - options: LoadExecutedTipsetOptions, ) -> anyhow::Result { - let LoadExecutedTipsetOptions { include_events } = options; if let Some(receipt_ts) = receipt_ts { anyhow::ensure!( msg_ts.key() == receipt_ts.parents(), @@ -521,12 +529,13 @@ where } let messages = self.chain_store().messages_for_tipset(msg_ts)?; let mut recomputed = false; - let (state_root, receipts) = match receipt_ts.and_then(|ts| { - Receipt::get_receipts(self.cs.blockstore(), *ts.parent_message_receipts()) + let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| { + let receipt_root = *ts.parent_message_receipts(); + Receipt::get_receipts(self.cs.blockstore(), receipt_root) .ok() - .map(|r| (*ts.parent_state(), r)) + .map(|r| (*ts.parent_state(), receipt_root, r)) }) { - Some((state_root, receipts)) => (state_root, receipts), + Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts), None => { let state_output = self .compute_tipset_state(msg_ts.clone(), NO_CALLBACK, VMTrace::NotTraced) @@ -534,6 +543,7 @@ where recomputed = true; ( state_output.state_root, + state_output.receipt_root, Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?, ) } @@ -546,7 +556,7 @@ where ); let mut executed_messages = Vec::with_capacity(messages.len()); for (message, receipt) in messages.into_iter().zip(receipts.into_iter()) { - let events = if include_events && let Some(events_root) = receipt.events_root() { + let events = if let Some(events_root) = receipt.events_root() { Some( match StampedEvent::get_events(self.cs.blockstore(), &events_root) { Ok(events) => events, @@ -574,6 +584,7 @@ where } Ok(ExecutedTipset { state_root, + receipt_root, executed_messages, }) } @@ -2090,22 +2101,24 @@ where vm.apply_block_messages(&block_messages, epoch, callback)?; // step 5: construct receipt root from receipts - let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts)?; + let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?; // step 6: store events AMTs in the blockstore - for (msg_events, events_root) in events.iter().zip(events_roots.iter()) { - if let Some(event_root) = events_root { + for (events, events_root) in events.iter().zip(events_roots.iter()) { + if let Some(events) = events { + let event_root = + events_root.context("events root should be present when events present")?; // Store the events AMT - the root CID should match the one computed by FVM let derived_event_root = Amt::new_from_iter_with_bit_width( chain_index.db(), EVENTS_AMT_BITWIDTH, - msg_events.iter(), + events.iter(), ) .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?; // Verify the stored root matches the FVM-computed root ensure!( - derived_event_root.eq(event_root), + derived_event_root == event_root, "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}." ); } @@ -2113,6 +2126,31 @@ where let state_root = vm.flush()?; + // Update executed tipset cache + let messages: Vec = block_messages + .into_iter() + .flat_map(|bm| bm.messages) + .collect_vec(); + anyhow::ensure!( + messages.len() == receipts.len() && messages.len() == events.len(), + "length of messages, receipts, and events should match", + ); + let executed_tipset = ExecutedTipset { + state_root, + receipt_root, + executed_messages: messages + .into_iter() + .zip(receipts) + .zip(events) + .map(|((message, receipt), events)| ExecutedMessage { + message, + receipt, + events, + }) + .collect(), + }; + executed_tipset_cache().push(tipset.key().clone(), executed_tipset); + Ok(StateOutput { state_root, receipt_root, diff --git a/src/utils/get_size/mod.rs b/src/utils/get_size/mod.rs index 6c35b3f2714a..b5a696a3a1eb 100644 --- a/src/utils/get_size/mod.rs +++ b/src/utils/get_size/mod.rs @@ -35,6 +35,10 @@ macro_rules! impl_vec_alike_heap_size_helper { }; } +pub fn vec_heap_size_helper(v: &Vec) -> usize { + impl_vec_alike_heap_size_helper!(v, T) +} + pub fn vec_heap_size_with_fn_helper(v: &Vec, get_heap_size: impl Fn(&T) -> usize) -> usize { impl_vec_alike_heap_size_with_fn_helper!(v, T, std::mem::size_of::, get_heap_size) }