Skip to content
2 changes: 1 addition & 1 deletion src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<DB: Blockstore> ChainIndex<DB> {
SizeTrackingLruCache::new_with_metrics(
"tipset_by_height".into(),
// 20480 * 900 = 18432000 which is sufficient for mainnet
20480.try_into().expect("infallible"),
nonzero!(20480_usize),
)
});

Expand Down
13 changes: 10 additions & 3 deletions src/interpreter/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ type ForestExecutorV4<DB> = DefaultExecutor_v4<ForestKernelV4<DB>>;

pub type ApplyResult = anyhow::Result<(ApplyRet, Duration)>;

pub type ApplyBlockResult =
anyhow::Result<(Vec<Receipt>, Vec<Vec<StampedEvent>>, Vec<Option<Cid>>), anyhow::Error>;
pub type ApplyBlockResult = anyhow::Result<(
Vec<Receipt>,
Vec<Option<Vec<StampedEvent>>>,
Vec<Option<Cid>>,
)>;

/// Comes from <https://github.com/filecoin-project/lotus/blob/v1.23.2/chain/vm/fvm.go#L473>
pub const IMPLICIT_MESSAGE_GAS_LIMIT: i64 = i64::MAX / 2;
Expand Down Expand Up @@ -387,7 +390,11 @@ where
receipts.push(msg_receipt.clone());

events_roots.push(ret.msg_receipt().events_root());
events.push(ret.events());
if ret.msg_receipt().events_root().is_some() {
events.push(Some(ret.events()));
} else {
events.push(None);
}

// Add processed Cid to set of processed messages
processed.insert(cid);
Expand Down
17 changes: 5 additions & 12 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,8 @@ impl Block {
let ExecutedTipset {
state_root,
executed_messages,
} = ctx
.state_manager
.load_executed_tipset_without_events(&tipset)
.await?;
..
} = ctx.state_manager.load_executed_tipset(&tipset).await?;
let has_transactions = !executed_messages.is_empty();
let state_tree = ctx.state_manager.get_state_tree(&state_root)?;

Expand Down Expand Up @@ -1419,10 +1417,8 @@ async fn get_block_receipts<DB: Blockstore + Send + Sync + 'static>(
let ExecutedTipset {
state_root,
executed_messages,
} = ctx
.state_manager
.load_executed_tipset_without_events(&ts_ref)
.await?;
..
} = ctx.state_manager.load_executed_tipset(&ts_ref).await?;

// Load the state tree
let state_tree = ctx.state_manager.get_state_tree(&state_root)?;
Expand Down Expand Up @@ -1933,10 +1929,7 @@ async fn eth_fee_history<B: Blockstore + Send + Sync + 'static>(
let base_fee = &ts.block_headers().first().parent_base_fee;
let ExecutedTipset {
executed_messages, ..
} = ctx
.state_manager
.load_executed_tipset_without_events(&ts)
.await?;
} = ctx.state_manager.load_executed_tipset(&ts).await?;
let mut tx_gas_rewards = Vec::with_capacity(executed_messages.len());
for ExecutedMessage {
message, receipt, ..
Expand Down
128 changes: 83 additions & 45 deletions src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ use crate::shim::{
use crate::state_manager::cache::TipsetStateCache;
use crate::state_manager::chain_rand::draw_randomness;
use crate::state_migration::run_state_migrations;
use crate::utils::get_size::GetSize;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::{GetSize, vec_heap_size_helper};
use ahash::{HashMap, HashMapExt};
use anyhow::{Context as _, bail, ensure};
use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
Expand All @@ -78,6 +79,7 @@ use rayon::prelude::ParallelBridge;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::ops::RangeInclusive;
use std::sync::LazyLock;
use std::time::Duration;
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::{RwLock, broadcast::error::RecvError};
Expand All @@ -89,30 +91,55 @@ pub const EVENTS_AMT_BITWIDTH: u32 = 5;
/// Intermediary for retrieving state objects and updating actor states.
type CidPair = (Cid, Cid);

fn executed_tipset_cache() -> &'static SizeTrackingLruCache<TipsetKey, ExecutedTipset> {
// A tipset key should always map to a deterministic state output, so it's safe to cache the entire executed tipset with the same key.
static CACHE: LazyLock<SizeTrackingLruCache<TipsetKey, ExecutedTipset>> = LazyLock::new(|| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are storing the ExecutedTipset directly do you think it could be an issue because we are cloning when using cache.

// 100-200MiB on mainet with capacity 1024
SizeTrackingLruCache::new_with_metrics("executed_tipset".into(), nonzero!(1024usize))
});
&CACHE
}

/// Result of executing an individual chain message in a tipset.
///
/// Includes the executed message itself, the execution receipt, and
/// optional events emitted by the actor during execution.
#[derive(Debug, Clone)]
pub struct ExecutedMessage {
pub message: ChainMessage,
pub receipt: Receipt,
pub events: Option<Vec<StampedEvent>>,
}

impl GetSize for ExecutedMessage {
fn get_heap_size(&self) -> usize {
self.message.get_heap_size()
+ self.receipt.get_heap_size()
+ self
.events
.as_ref()
.map(vec_heap_size_helper)
.unwrap_or_default()
}
}

/// Aggregated execution result for a tipset.
///
/// `state_root` is the resulting state tree root after message execution
/// and `executed_messages` contains per-message execution details.
#[derive(Debug, Clone)]
pub struct ExecutedTipset {
pub state_root: Cid,
#[allow(dead_code)]
pub receipt_root: Cid,
pub executed_messages: Vec<ExecutedMessage>,
}

/// Options controlling how `load_executed_tipset` fetches extra execution data.
///
/// `include_events` toggles whether event logs are loaded from receipts.
pub struct LoadExecutedTipsetOptions {
pub include_events: bool,
impl GetSize for ExecutedTipset {
fn get_heap_size(&self) -> usize {
// state_root (Cid) and receipt_root (Cid) have no heap allocation, so we only calculate the heap size of executed_messages
vec_heap_size_helper(&self.executed_messages)
}
}

#[derive(Debug, Default, Clone, GetSize)]
Expand Down Expand Up @@ -471,48 +498,29 @@ where
.await
}

/// Load an executed tipset, including message receipts and state root,
/// without loading event logs from receipts.
pub async fn load_executed_tipset_without_events(
self: &Arc<Self>,
ts: &Tipset,
) -> anyhow::Result<ExecutedTipset> {
let receipt_ts = self.chain_store().load_child_tipset(ts).ok();
self.load_executed_tipset_inner(
ts,
receipt_ts.as_ref(),
LoadExecutedTipsetOptions {
include_events: false,
},
)
.await
}

/// Load an executed tipset, including message receipts and state root,
/// with event logs loaded when available.
/// Load an executed tipset, including state root, message receipts and events with caching.
pub async fn load_executed_tipset(
self: &Arc<Self>,
ts: &Tipset,
) -> anyhow::Result<ExecutedTipset> {
let cache = executed_tipset_cache();
if let Some(cached) = cache.get_cloned(ts.key()) {
return Ok(cached);
}
let receipt_ts = self.chain_store().load_child_tipset(ts).ok();
self.load_executed_tipset_inner(
ts,
receipt_ts.as_ref(),
LoadExecutedTipsetOptions {
include_events: true,
},
)
.await
let result = self
.load_executed_tipset_inner(ts, receipt_ts.as_ref())
.await?;
cache.push(ts.key().clone(), result.clone());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hanabi1224 Do you think this will be an issue if there are multiple simultaneous request for the same tipset, they all will have to compute the tipset to get the data?

I think that's why earlier we have the get_or_else pattern which used the mutex locks to prevents this.

Let me know your thoughts on this?

Ok(result)
}

async fn load_executed_tipset_inner(
self: &Arc<Self>,
msg_ts: &Tipset,
// when `msg_ts` is the current head, `receipt_ts` is `None`
receipt_ts: Option<&Tipset>,
options: LoadExecutedTipsetOptions,
) -> anyhow::Result<ExecutedTipset> {
let LoadExecutedTipsetOptions { include_events } = options;
if let Some(receipt_ts) = receipt_ts {
anyhow::ensure!(
msg_ts.key() == receipt_ts.parents(),
Expand All @@ -521,19 +529,21 @@ where
}
let messages = self.chain_store().messages_for_tipset(msg_ts)?;
let mut recomputed = false;
let (state_root, receipts) = match receipt_ts.and_then(|ts| {
Receipt::get_receipts(self.cs.blockstore(), *ts.parent_message_receipts())
let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| {
let receipt_root = *ts.parent_message_receipts();
Receipt::get_receipts(self.cs.blockstore(), receipt_root)
.ok()
.map(|r| (*ts.parent_state(), r))
.map(|r| (*ts.parent_state(), receipt_root, r))
}) {
Some((state_root, receipts)) => (state_root, receipts),
Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts),
None => {
let state_output = self
.compute_tipset_state(msg_ts.clone(), NO_CALLBACK, VMTrace::NotTraced)
.await?;
recomputed = true;
(
state_output.state_root,
state_output.receipt_root,
Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?,
)
}
Expand All @@ -546,7 +556,7 @@ where
);
let mut executed_messages = Vec::with_capacity(messages.len());
for (message, receipt) in messages.into_iter().zip(receipts.into_iter()) {
let events = if include_events && let Some(events_root) = receipt.events_root() {
let events = if let Some(events_root) = receipt.events_root() {
Some(
match StampedEvent::get_events(self.cs.blockstore(), &events_root) {
Ok(events) => events,
Expand Down Expand Up @@ -574,6 +584,7 @@ where
}
Ok(ExecutedTipset {
state_root,
receipt_root,
executed_messages,
})
}
Expand Down Expand Up @@ -2090,29 +2101,56 @@ where
vm.apply_block_messages(&block_messages, epoch, callback)?;

// step 5: construct receipt root from receipts
let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts)?;
let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?;

// step 6: store events AMTs in the blockstore
for (msg_events, events_root) in events.iter().zip(events_roots.iter()) {
if let Some(event_root) = events_root {
for (events, events_root) in events.iter().zip(events_roots.iter()) {
if let Some(events) = events {
let event_root =
events_root.context("events root should be present when events present")?;
// Store the events AMT - the root CID should match the one computed by FVM
let derived_event_root = Amt::new_from_iter_with_bit_width(
chain_index.db(),
EVENTS_AMT_BITWIDTH,
msg_events.iter(),
events.iter(),
)
.map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;

// Verify the stored root matches the FVM-computed root
ensure!(
derived_event_root.eq(event_root),
derived_event_root == event_root,
"Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
);
}
}

let state_root = vm.flush()?;

// Update executed tipset cache
let messages: Vec<ChainMessage> = block_messages
.into_iter()
.flat_map(|bm| bm.messages)
.collect_vec();
anyhow::ensure!(
messages.len() == receipts.len() && messages.len() == events.len(),
"length of messages, receipts, and events should match",
);
let executed_tipset = ExecutedTipset {
state_root,
receipt_root,
executed_messages: messages
.into_iter()
.zip(receipts)
.zip(events)
.map(|((message, receipt), events)| ExecutedMessage {
message,
receipt,
events,
})
.collect(),
};
executed_tipset_cache().push(tipset.key().clone(), executed_tipset);

Ok(StateOutput {
state_root,
receipt_root,
Expand Down
4 changes: 4 additions & 0 deletions src/utils/get_size/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ macro_rules! impl_vec_alike_heap_size_helper {
};
}

pub fn vec_heap_size_helper<T: GetSize>(v: &Vec<T>) -> usize {
impl_vec_alike_heap_size_helper!(v, T)
}

pub fn vec_heap_size_with_fn_helper<T>(v: &Vec<T>, get_heap_size: impl Fn(&T) -> usize) -> usize {
impl_vec_alike_heap_size_with_fn_helper!(v, T, std::mem::size_of::<T>, get_heap_size)
}
Expand Down
Loading