-
Notifications
You must be signed in to change notification settings - Fork 186
perf: add lru cache for load_executed_tipset to speed up hot queries #6761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6296d16
5fdd7b1
d0b182b
eec3715
62eb46c
a1fb72e
a12b2b2
0a6a7ff
4ec750c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,7 +54,8 @@ use crate::shim::{ | |
| use crate::state_manager::cache::TipsetStateCache; | ||
| use crate::state_manager::chain_rand::draw_randomness; | ||
| use crate::state_migration::run_state_migrations; | ||
| use crate::utils::get_size::GetSize; | ||
| use crate::utils::cache::SizeTrackingLruCache; | ||
| use crate::utils::get_size::{GetSize, vec_heap_size_helper}; | ||
| use ahash::{HashMap, HashMapExt}; | ||
| use anyhow::{Context as _, bail, ensure}; | ||
| use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _}; | ||
|
|
@@ -78,6 +79,7 @@ use rayon::prelude::ParallelBridge; | |
| use schemars::JsonSchema; | ||
| use serde::{Deserialize, Serialize}; | ||
| use std::ops::RangeInclusive; | ||
| use std::sync::LazyLock; | ||
| use std::time::Duration; | ||
| use std::{num::NonZeroUsize, sync::Arc}; | ||
| use tokio::sync::{RwLock, broadcast::error::RecvError}; | ||
|
|
@@ -89,30 +91,55 @@ pub const EVENTS_AMT_BITWIDTH: u32 = 5; | |
| /// Intermediary for retrieving state objects and updating actor states. | ||
| type CidPair = (Cid, Cid); | ||
|
|
||
| fn executed_tipset_cache() -> &'static SizeTrackingLruCache<TipsetKey, ExecutedTipset> { | ||
| // A tipset key should always map to a deterministic state output, so it's safe to cache the entire executed tipset with the same key. | ||
| static CACHE: LazyLock<SizeTrackingLruCache<TipsetKey, ExecutedTipset>> = LazyLock::new(|| { | ||
| // 100-200MiB on mainet with capacity 1024 | ||
| SizeTrackingLruCache::new_with_metrics("executed_tipset".into(), nonzero!(1024usize)) | ||
| }); | ||
| &CACHE | ||
| } | ||
|
|
||
| /// Result of executing an individual chain message in a tipset. | ||
| /// | ||
| /// Includes the executed message itself, the execution receipt, and | ||
| /// optional events emitted by the actor during execution. | ||
| #[derive(Debug, Clone)] | ||
| pub struct ExecutedMessage { | ||
| pub message: ChainMessage, | ||
| pub receipt: Receipt, | ||
| pub events: Option<Vec<StampedEvent>>, | ||
| } | ||
|
|
||
| impl GetSize for ExecutedMessage { | ||
| fn get_heap_size(&self) -> usize { | ||
| self.message.get_heap_size() | ||
| + self.receipt.get_heap_size() | ||
| + self | ||
| .events | ||
| .as_ref() | ||
| .map(vec_heap_size_helper) | ||
| .unwrap_or_default() | ||
| } | ||
| } | ||
|
|
||
| /// Aggregated execution result for a tipset. | ||
| /// | ||
| /// `state_root` is the resulting state tree root after message execution | ||
| /// and `executed_messages` contains per-message execution details. | ||
| #[derive(Debug, Clone)] | ||
| pub struct ExecutedTipset { | ||
| pub state_root: Cid, | ||
| #[allow(dead_code)] | ||
| pub receipt_root: Cid, | ||
| pub executed_messages: Vec<ExecutedMessage>, | ||
| } | ||
|
|
||
| /// Options controlling how `load_executed_tipset` fetches extra execution data. | ||
| /// | ||
| /// `include_events` toggles whether event logs are loaded from receipts. | ||
| pub struct LoadExecutedTipsetOptions { | ||
| pub include_events: bool, | ||
| impl GetSize for ExecutedTipset { | ||
| fn get_heap_size(&self) -> usize { | ||
| // state_root (Cid) and receipt_root (Cid) have no heap allocation, so we only calculate the heap size of executed_messages | ||
| vec_heap_size_helper(&self.executed_messages) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Default, Clone, GetSize)] | ||
|
|
@@ -471,48 +498,29 @@ where | |
| .await | ||
| } | ||
|
|
||
| /// Load an executed tipset, including message receipts and state root, | ||
| /// without loading event logs from receipts. | ||
| pub async fn load_executed_tipset_without_events( | ||
| self: &Arc<Self>, | ||
| ts: &Tipset, | ||
| ) -> anyhow::Result<ExecutedTipset> { | ||
| let receipt_ts = self.chain_store().load_child_tipset(ts).ok(); | ||
| self.load_executed_tipset_inner( | ||
| ts, | ||
| receipt_ts.as_ref(), | ||
| LoadExecutedTipsetOptions { | ||
| include_events: false, | ||
| }, | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| /// Load an executed tipset, including message receipts and state root, | ||
| /// with event logs loaded when available. | ||
| /// Load an executed tipset, including state root, message receipts and events with caching. | ||
| pub async fn load_executed_tipset( | ||
| self: &Arc<Self>, | ||
| ts: &Tipset, | ||
| ) -> anyhow::Result<ExecutedTipset> { | ||
| let cache = executed_tipset_cache(); | ||
| if let Some(cached) = cache.get_cloned(ts.key()) { | ||
| return Ok(cached); | ||
| } | ||
| let receipt_ts = self.chain_store().load_child_tipset(ts).ok(); | ||
| self.load_executed_tipset_inner( | ||
| ts, | ||
| receipt_ts.as_ref(), | ||
| LoadExecutedTipsetOptions { | ||
| include_events: true, | ||
| }, | ||
| ) | ||
| .await | ||
| let result = self | ||
| .load_executed_tipset_inner(ts, receipt_ts.as_ref()) | ||
| .await?; | ||
| cache.push(ts.key().clone(), result.clone()); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hanabi1224 Do you think this will be an issue if there are multiple simultaneous request for the same I think that's why earlier we have the Let me know your thoughts on this? |
||
| Ok(result) | ||
| } | ||
|
|
||
| async fn load_executed_tipset_inner( | ||
| self: &Arc<Self>, | ||
| msg_ts: &Tipset, | ||
| // when `msg_ts` is the current head, `receipt_ts` is `None` | ||
| receipt_ts: Option<&Tipset>, | ||
| options: LoadExecutedTipsetOptions, | ||
| ) -> anyhow::Result<ExecutedTipset> { | ||
| let LoadExecutedTipsetOptions { include_events } = options; | ||
| if let Some(receipt_ts) = receipt_ts { | ||
| anyhow::ensure!( | ||
| msg_ts.key() == receipt_ts.parents(), | ||
|
|
@@ -521,19 +529,21 @@ where | |
| } | ||
| let messages = self.chain_store().messages_for_tipset(msg_ts)?; | ||
| let mut recomputed = false; | ||
| let (state_root, receipts) = match receipt_ts.and_then(|ts| { | ||
| Receipt::get_receipts(self.cs.blockstore(), *ts.parent_message_receipts()) | ||
| let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| { | ||
| let receipt_root = *ts.parent_message_receipts(); | ||
| Receipt::get_receipts(self.cs.blockstore(), receipt_root) | ||
| .ok() | ||
| .map(|r| (*ts.parent_state(), r)) | ||
| .map(|r| (*ts.parent_state(), receipt_root, r)) | ||
| }) { | ||
| Some((state_root, receipts)) => (state_root, receipts), | ||
| Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts), | ||
| None => { | ||
| let state_output = self | ||
| .compute_tipset_state(msg_ts.clone(), NO_CALLBACK, VMTrace::NotTraced) | ||
| .await?; | ||
| recomputed = true; | ||
| ( | ||
| state_output.state_root, | ||
| state_output.receipt_root, | ||
| Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?, | ||
| ) | ||
| } | ||
|
|
@@ -546,7 +556,7 @@ where | |
| ); | ||
| let mut executed_messages = Vec::with_capacity(messages.len()); | ||
| for (message, receipt) in messages.into_iter().zip(receipts.into_iter()) { | ||
| let events = if include_events && let Some(events_root) = receipt.events_root() { | ||
| let events = if let Some(events_root) = receipt.events_root() { | ||
| Some( | ||
| match StampedEvent::get_events(self.cs.blockstore(), &events_root) { | ||
| Ok(events) => events, | ||
|
|
@@ -574,6 +584,7 @@ where | |
| } | ||
| Ok(ExecutedTipset { | ||
| state_root, | ||
| receipt_root, | ||
| executed_messages, | ||
| }) | ||
| } | ||
|
|
@@ -2090,29 +2101,56 @@ where | |
| vm.apply_block_messages(&block_messages, epoch, callback)?; | ||
|
|
||
| // step 5: construct receipt root from receipts | ||
| let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts)?; | ||
| let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?; | ||
|
|
||
| // step 6: store events AMTs in the blockstore | ||
| for (msg_events, events_root) in events.iter().zip(events_roots.iter()) { | ||
| if let Some(event_root) = events_root { | ||
| for (events, events_root) in events.iter().zip(events_roots.iter()) { | ||
| if let Some(events) = events { | ||
| let event_root = | ||
| events_root.context("events root should be present when events present")?; | ||
| // Store the events AMT - the root CID should match the one computed by FVM | ||
| let derived_event_root = Amt::new_from_iter_with_bit_width( | ||
| chain_index.db(), | ||
| EVENTS_AMT_BITWIDTH, | ||
| msg_events.iter(), | ||
| events.iter(), | ||
| ) | ||
| .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?; | ||
|
|
||
| // Verify the stored root matches the FVM-computed root | ||
| ensure!( | ||
| derived_event_root.eq(event_root), | ||
| derived_event_root == event_root, | ||
| "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}." | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| let state_root = vm.flush()?; | ||
|
|
||
| // Update executed tipset cache | ||
| let messages: Vec<ChainMessage> = block_messages | ||
| .into_iter() | ||
| .flat_map(|bm| bm.messages) | ||
| .collect_vec(); | ||
| anyhow::ensure!( | ||
| messages.len() == receipts.len() && messages.len() == events.len(), | ||
| "length of messages, receipts, and events should match", | ||
| ); | ||
| let executed_tipset = ExecutedTipset { | ||
| state_root, | ||
| receipt_root, | ||
| executed_messages: messages | ||
| .into_iter() | ||
| .zip(receipts) | ||
| .zip(events) | ||
| .map(|((message, receipt), events)| ExecutedMessage { | ||
| message, | ||
| receipt, | ||
| events, | ||
| }) | ||
| .collect(), | ||
| }; | ||
| executed_tipset_cache().push(tipset.key().clone(), executed_tipset); | ||
|
|
||
| Ok(StateOutput { | ||
| state_root, | ||
| receipt_root, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are storing the
ExecutedTipsetdirectly do you think it could be an issue because we are cloning when using cache.