Skip to content

feat: Allow development of custom stores#67

Open
gonzalezzfelipe wants to merge 16 commits intotxpipe:mainfrom
gonzalezzfelipe:feat/allow-custom-stores
Open

feat: Allow development of custom stores#67
gonzalezzfelipe wants to merge 16 commits intotxpipe:mainfrom
gonzalezzfelipe:feat/allow-custom-stores

Conversation

@gonzalezzfelipe
Copy link
Contributor

@gonzalezzfelipe gonzalezzfelipe commented May 26, 2025

Summary by CodeRabbit

  • New Features

    • Store module is now public and supports pluggable backends (including a Redb-backed variant).
    • Added async-capable atomic update workflow for coordinated multi-worker updates.
  • Refactor

    • Converted core store and runtime interactions to async/await for improved concurrency.
    • Standardized error reporting for store operations to consistent string messages.
  • Tests

    • Updated end-to-end and runtime initialization paths to use the public Redb-backed store variant.

@gonzalezzfelipe gonzalezzfelipe marked this pull request as draft May 27, 2025 01:56
@gonzalezzfelipe gonzalezzfelipe marked this pull request as ready for review May 27, 2025 18:32
@coderabbitai
Copy link

coderabbitai bot commented Oct 13, 2025

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Expose a public, async, trait-based store abstraction (StoreTrait, AtomicUpdateTrait) with Redb and Custom backends; convert store APIs and runtime call sites to async/await; change store error variant to use String; add LogSeq/WorkerId/LogEntry public types and atomic update flow.

Changes

Cohort / File(s) Summary
Store core
balius-runtime/src/store/mod.rs
Add public Store enum, StoreTrait, AtomicUpdate enum, AtomicUpdateTrait, LogSeq, WorkerId, and LogEntry; implement async delegation to Redb or locked Custom backend; add handle_reset API.
Redb backend
balius-runtime/src/store/redb.rs
Convert Redb store implementation to async: AtomicUpdate::new, async update_worker_cursor/commit, and async StoreTrait methods (find_chain_point, write_ahead, get_worker_cursor, start_atomic_update); re-export shared types.
Public surface & errors
balius-runtime/src/lib.rs
Make pub mod store and re-export AtomicUpdateTrait, Store, StoreTrait; change public Error::Store variant to Store(String) and update From<redb::...> conversions to produce String.
Runtime integrations
balius-runtime/src/lib.rs, balius-runtime/src/drivers/chainsync.rs
Adapt runtime call sites to async store API (await find_chain_point, handle_reset); import StoreTrait where needed.
Call site updates (store construction)
balius-runtime/tests/e2e.rs, balius/src/bin/command/test.rs, baliusd/src/main.rs
Change store construction to wrap Redb store as Store::Redb(RedbStore::open(...)) and import balius_runtime::store::redb::Store as RedbStore.

Sequence Diagram(s)

sequenceDiagram
    participant Runtime
    participant Store as StoreTrait
    participant Atomic as AtomicUpdate
    participant Redb as RedbStore/DB

    Runtime->>Store: write_ahead(undo_blocks, next_block)
    Store-->>Runtime: log_seq
    Runtime->>Store: start_atomic_update(log_seq)
    Store-->>Atomic: AtomicUpdate (wrap Redb transactional state)
    Runtime->>Atomic: update_worker_cursor(worker_id)
    Atomic->>Redb: write worker cursor in tx
    Atomic-->>Runtime: ok
    Runtime->>Atomic: commit()
    Atomic->>Redb: commit transaction
    Atomic-->>Runtime: commit result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • scarmuega

Poem

🐰
I hopped into code, nose all a-twitch,
Pulled threads of sync into async-rich stitch;
Redb and Custom now share one neat door,
Cursors hop forward, commits ask for more —
A burrow of stores, async dreams galore! 🎋

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'feat: Allow development of custom stores' directly aligns with the main objective of introducing a pluggable store abstraction that enables custom store backends alongside the existing Redb implementation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (8)
baliusd/src/main.rs (1)

65-71: Consider making cache size configurable.

Currently passing None uses the default 50MB. If config has a cache/tuning field, plumb it through to RedbStore::open to control memory.

balius-runtime/src/lib.rs (1)

45-46: Avoid flattening store errors to String; preserve error context.

Converting redb errors to String drops causes and hinders diagnostics. Prefer a structured variant, e.g., Store(#[source] anyhow::Error) or Store(#[source] Box<dyn std::error::Error + Send + Sync>), and keep From impls to wrap the source.

Also applies to: 87-121

balius-runtime/src/store/redb.rs (1)

66-86: Fix typo in error messages ("commited" → "committed").

Small polish to error strings returned after transaction consumption.

Apply this diff:

-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));
         };
@@
-        let Some(wx) = self.wx.take() else {
-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+        let Some(wx) = self.wx.take() else {
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));
         };
balius-runtime/src/store/mod.rs (5)

26-30: Avoid locking AtomicUpdate behind an async Mutex; use Box and drop the lock across await.

AtomicUpdate is an ephemeral transaction handle and does not need shared ownership. Locking a tokio::Mutex across awaits adds contention and complexity without benefits here.

Switch Custom to Box<dyn AtomicUpdateTrait + Send + Sync> and call directly.

Apply:

 #[allow(clippy::large_enum_variant)]
 pub enum AtomicUpdate {
     Redb(redb::AtomicUpdate),
-    Custom(Arc<Mutex<dyn AtomicUpdateTrait + Send + Sync>>),
+    Custom(Box<dyn AtomicUpdateTrait + Send + Sync>),
 }

 #[async_trait::async_trait]
 impl AtomicUpdateTrait for AtomicUpdate {
     async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
         match self {
             AtomicUpdate::Redb(au) => au.update_worker_cursor(id).await,
-            AtomicUpdate::Custom(au) => au.lock().await.update_worker_cursor(id).await,
+            AtomicUpdate::Custom(au) => au.update_worker_cursor(id).await,
         }
     }
     async fn commit(&mut self) -> Result<(), super::Error> {
         match self {
             AtomicUpdate::Redb(au) => au.commit().await,
-            AtomicUpdate::Custom(au) => au.lock().await.commit().await,
+            AtomicUpdate::Custom(au) => au.commit().await,
         }
     }
 }

Also applies to: 32-46


69-72: Async locking around Custom store holds the mutex across awaits; consider redesign to avoid this hot‑path lock.

The outer tokio::Mutex guard is held while awaiting find_chain_point/write_ahead/get_worker_cursor/start_atomic_update. This serializes all operations on Custom stores and risks priority inversion.

Recommended:

  • Change StoreTrait receivers to &self and move synchronization inside backends (interior mutability).
  • Store Custom as Arc<dyn StoreTrait + Send + Sync> (no Mutex), or Box<dyn StoreTrait + Send + Sync> if Clone is not required.

This removes the lock across await and lets each backend choose optimal concurrency.

Also applies to: 81-87, 91-94, 98-100


22-24: Unify Error type path for readability.

Mixing crate::Error (imported as Error) and super::Error is inconsistent. Prefer one, e.g., Error everywhere in this module.

Minimal change:

-    async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error>;
-    async fn commit(&mut self) -> Result<(), super::Error>;
+    async fn update_worker_cursor(&mut self, id: &str) -> Result<(), Error>;
+    async fn commit(&mut self) -> Result<(), Error>;

…and likewise adjust the impl signatures and StoreTrait methods returning super::Error to Error.

Also applies to: 34-34, 40-40, 63-64, 90-90, 96-96


12-18: Derive common traits on LogEntry for diagnostics and tests.

Deriving Clone, Debug, and PartialEq is handy for logging/tracing and assertions.

-#[derive(Message)]
+#[derive(Clone, PartialEq, ::prost::Message, Debug)]
 pub struct LogEntry {

22-22: Use the WorkerId alias in APIs.

Be consistent and accept &WorkerId instead of &str to document intent and avoid accidental type mixups at call sites.

-    async fn update_worker_cursor(&mut self, id: &str) -> Result<(), Error>;
+    async fn update_worker_cursor(&mut self, id: &WorkerId) -> Result<(), Error>;
...
-    async fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, Error>;
+    async fn get_worker_cursor(&self, id: &WorkerId) -> Result<Option<LogSeq>, Error>;

Also applies to: 62-63, 90-90

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 86b459d and 6507f71.

📒 Files selected for processing (6)
  • balius-runtime/src/lib.rs (6 hunks)
  • balius-runtime/src/store/mod.rs (1 hunks)
  • balius-runtime/src/store/redb.rs (5 hunks)
  • balius-runtime/tests/e2e.rs (2 hunks)
  • balius/src/bin/command/test.rs (2 hunks)
  • baliusd/src/main.rs (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
balius-runtime/tests/e2e.rs (2)
balius-runtime/src/store/redb.rs (1)
  • open (104-120)
balius-runtime/src/lib.rs (1)
  • builder (494-496)
balius/src/bin/command/test.rs (1)
balius-runtime/src/store/redb.rs (1)
  • open (104-120)
balius-runtime/src/store/mod.rs (1)
balius-runtime/src/store/redb.rs (6)
  • update_worker_cursor (66-77)
  • commit (79-87)
  • find_chain_point (183-188)
  • write_ahead (190-210)
  • get_worker_cursor (213-224)
  • start_atomic_update (226-232)
baliusd/src/main.rs (2)
balius-runtime/src/store/redb.rs (2)
  • in_memory (97-102)
  • open (104-120)
balius-runtime/src/lib.rs (1)
  • builder (494-496)
balius-runtime/src/store/redb.rs (1)
balius-runtime/src/store/mod.rs (12)
  • update_worker_cursor (22-22)
  • update_worker_cursor (34-39)
  • commit (23-23)
  • commit (40-45)
  • find_chain_point (56-56)
  • find_chain_point (68-73)
  • write_ahead (57-61)
  • write_ahead (74-89)
  • get_worker_cursor (62-62)
  • get_worker_cursor (90-95)
  • start_atomic_update (63-63)
  • start_atomic_update (96-101)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (10)
baliusd/src/main.rs (2)

3-5: Import changes look correct.

New RedbStore alias and Store enum integration are consistent with the new API.


97-97: Runtime builder wrapping is correct.

Passing Store::Redb(store) matches the new pluggable store design.

balius/src/bin/command/test.rs (1)

93-97: LGTM.

Store initialization via Store::Redb(...) and error handling with IntoDiagnostic are correct and align with the new API.

balius-runtime/src/lib.rs (2)

31-35: Re-exports look good.

Publicly exposing store traits and types is consistent with the new pluggable store design.


618-632: Async store update flow looks correct.

write_ahead → start_atomic_update → per-worker cursor updates → commit matches WAL + atomic cursor update semantics.

balius-runtime/src/store/redb.rs (4)

51-62: AtomicUpdate constructor change is sound.

Option + new() cleanly prevents double-commit and allows move semantics.


190-210: write_ahead implementation looks correct.

Monotonic seq, WAL insert, and commit are in the right order.


212-224: get_worker_cursor read path is fine.

Gracefully handles missing table; returns Option as expected.


226-232: start_atomic_update is correct.

Begins a write transaction and wraps it in AtomicUpdate.

balius-runtime/src/store/mod.rs (1)

48-52: Confirm Clone semantics for Store.

Cloning Store duplicates handles to underlying backends. Ensure redb::Store’s Clone is cheap and safe (shared state correctness), and that cloning Custom stores is desired behavior.

Do we require Store to be Clone? If not, dropping Clone could simplify Custom to Box<dyn StoreTrait + Send + Sync> and remove the outer Mutex.

Comment on lines +183 to 188
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
let entry = self.get_entry(seq)?;
let block = Block::from_bytes(&entry.unwrap().next_block);

Ok(Some(block.chain_point()))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Avoid panic: handle missing WAL entry gracefully in find_chain_point.

entry.unwrap() will panic if the sequence isn’t found. Return Ok(None) instead.

Apply this diff:

-    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
-        let entry = self.get_entry(seq)?;
-        let block = Block::from_bytes(&entry.unwrap().next_block);
-
-        Ok(Some(block.chain_point()))
-    }
+    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
+        match self.get_entry(seq)? {
+            Some(entry) => {
+                let block = Block::from_bytes(&entry.next_block);
+                Ok(Some(block.chain_point()))
+            }
+            None => Ok(None),
+        }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
let entry = self.get_entry(seq)?;
let block = Block::from_bytes(&entry.unwrap().next_block);
Ok(Some(block.chain_point()))
}
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
match self.get_entry(seq)? {
Some(entry) => {
let block = Block::from_bytes(&entry.next_block);
Ok(Some(block.chain_point()))
}
None => Ok(None),
}
}
🤖 Prompt for AI Agents
In balius-runtime/src/store/redb.rs around lines 183 to 188, replace the
entry.unwrap() panic by handling a missing WAL entry: call self.get_entry(seq)?
and if it returns None return Ok(None); otherwise take the entry, construct the
Block from entry.next_block and return Ok(Some(block.chain_point())). Ensure you
do not call unwrap() so the function returns Ok(None) for missing sequences and
still propagates actual errors via the ? operator.

Comment on lines +44 to 47
let store = Store::Redb(RedbStore::open("tests/balius.db", None).unwrap());

let mut runtime = Runtime::builder(store)
let runtime = Runtime::builder(store)
.with_ledger(ledgers::mock::Ledger.into())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use in-memory store for tests to avoid cross-run contamination.

A fixed file path can introduce flakiness and require cleanup. Prefer in-memory for reliability and speed.

Apply this diff:

-    let store = Store::Redb(RedbStore::open("tests/balius.db", None).unwrap());
+    let store = Store::Redb(RedbStore::in_memory().unwrap());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let store = Store::Redb(RedbStore::open("tests/balius.db", None).unwrap());
let mut runtime = Runtime::builder(store)
let runtime = Runtime::builder(store)
.with_ledger(ledgers::mock::Ledger.into())
let store = Store::Redb(RedbStore::in_memory().unwrap());
let runtime = Runtime::builder(store)
.with_ledger(ledgers::mock::Ledger.into())
🤖 Prompt for AI Agents
In balius-runtime/tests/e2e.rs around lines 44 to 47, the test opens a
persistent DB at "tests/balius.db" which can cause cross-run contamination;
replace the persistent RedbStore::open(...) call with the in-memory Redb store
constructor (e.g., RedbStore::open_in_memory() or the library's equivalent) so
Store::Redb(...) uses an in-memory instance, remove the file path argument, and
ensure unwrap() is still used or handled the same way so the test remains
deterministic and leaves no on-disk artifacts.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
balius-runtime/src/store/redb.rs (1)

183-188: Avoid panic: handle missing WAL entry gracefully in find_chain_point.

The previous review flagged that entry.unwrap() will panic if the sequence isn't found. This issue remains unresolved.

Apply this diff:

     async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
-        let entry = self.get_entry(seq)?;
-        let block = Block::from_bytes(&entry.unwrap().next_block);
-
-        Ok(Some(block.chain_point()))
+        match self.get_entry(seq)? {
+            Some(entry) => {
+                let block = Block::from_bytes(&entry.next_block);
+                Ok(Some(block.chain_point()))
+            }
+            None => Ok(None),
+        }
     }
🧹 Nitpick comments (1)
balius-runtime/src/lib.rs (1)

45-45: Consider preserving error type information.

Converting all redb errors to String simplifies the API but loses type information, making it harder for library consumers to handle specific error cases programmatically. If you expect users to need programmatic error handling, consider using an enum or preserving error variants.

If you choose to keep String-based errors, this is acceptable for the current use case.

Also applies to: 89-89, 95-95, 101-101, 107-107, 113-113, 119-119

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6507f71 and 53ddfe2.

📒 Files selected for processing (4)
  • balius-runtime/src/drivers/chainsync.rs (2 hunks)
  • balius-runtime/src/lib.rs (6 hunks)
  • balius-runtime/src/store/mod.rs (1 hunks)
  • balius-runtime/src/store/redb.rs (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
balius-runtime/src/store/mod.rs (1)
balius-runtime/src/store/redb.rs (7)
  • update_worker_cursor (66-77)
  • commit (79-87)
  • find_chain_point (183-188)
  • write_ahead (190-210)
  • get_worker_cursor (213-224)
  • start_atomic_update (226-232)
  • handle_reset (234-250)
balius-runtime/src/store/redb.rs (1)
balius-runtime/src/store/mod.rs (14)
  • update_worker_cursor (22-22)
  • update_worker_cursor (34-39)
  • commit (23-23)
  • commit (40-45)
  • find_chain_point (56-56)
  • find_chain_point (69-74)
  • write_ahead (57-61)
  • write_ahead (75-90)
  • get_worker_cursor (62-62)
  • get_worker_cursor (91-96)
  • start_atomic_update (63-63)
  • start_atomic_update (97-102)
  • handle_reset (64-64)
  • handle_reset (104-109)
🔇 Additional comments (6)
balius-runtime/src/drivers/chainsync.rs (1)

56-56: LGTM! Async store operation integrated correctly.

The .await addition correctly handles the async handle_reset method from the new StoreTrait interface.

balius-runtime/src/lib.rs (2)

31-34: LGTM! Public API expanded to support custom stores.

The public store module and trait re-exports enable users to implement custom storage backends while maintaining the existing Redb implementation.


523-523: LGTM! Async store operations integrated correctly.

All store operations now correctly use .await to handle the async trait methods introduced in the pluggable store abstraction.

Also applies to: 569-569, 632-632, 636-636, 640-642, 645-645

balius-runtime/src/store/mod.rs (1)

1-110: LGTM! Well-designed pluggable store abstraction.

The introduction of StoreTrait and AtomicUpdateTrait with Redb and Custom variants provides a clean abstraction for pluggable storage backends. The use of Arc<Mutex<...>> for the Custom variant appropriately handles thread-safety and async access.

Key design decisions:

  • Async traits enable non-blocking storage operations
  • Enum dispatch pattern provides type-safe backend selection
  • Public traits allow external implementations
balius-runtime/src/store/redb.rs (2)

52-62: LGTM! Option pattern prevents double-commit.

Changing wx to Option<WriteTransaction> with take() semantics in the commit method correctly prevents accidental double-commits or operations after commit.


64-88: LGTM! Async trait implementation follows the design pattern.

The StoreTrait and AtomicUpdateTrait implementations correctly use async fn and integrate with the redb backend. The trait methods properly handle errors and maintain consistency with the pluggable store abstraction.

Also applies to: 181-250

async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
let Some(wx) = self.wx.as_mut() else {
return Err(super::Error::Store(
"Transaction already commited".to_string(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix typo: "commited" → "committed".

The error messages contain a spelling error.

Apply this diff:

             return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
+                "Transaction already committed".to_string(),
             ));

Also applies to: 82-82

🤖 Prompt for AI Agents
In balius-runtime/src/store/redb.rs around lines 69 and 82, the error string
"Transaction already commited" contains a typo; update both occurrences to
"Transaction already committed" so the error messages use the correct spelling.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
balius-runtime/src/store/redb.rs (1)

183-188: Avoid panic in find_chain_point: handle missing WAL entry

entry.unwrap() will panic if seq not found; return Ok(None) instead.

-    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
-        let entry = self.get_entry(seq)?;
-        let block = Block::from_bytes(&entry.unwrap().next_block);
-
-        Ok(Some(block.chain_point()))
-    }
+    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
+        match self.get_entry(seq)? {
+            Some(entry) => {
+                let block = Block::from_bytes(&entry.next_block);
+                Ok(Some(block.chain_point()))
+            }
+            None => Ok(None),
+        }
+    }
🧹 Nitpick comments (2)
balius-runtime/src/lib.rs (1)

31-35: Consider preserving error sources instead of String

Switching to String loses the original error chain. Prefer a source-carrying variant that stays backend-agnostic:

-    #[error("store error {0}")]
-    Store(String),
+    #[error("store error")]
+    Store(#[source] Box<dyn std::error::Error + Send + Sync>),

And map conversions as:

-        Self::Store(value.to_string())
+        Self::Store(Box::new(value))

Applies similarly to other redb::* conversions. This keeps rich context while decoupling from specific backends.

Also applies to: 45-46, 87-121

balius-runtime/src/store/redb.rs (1)

64-66: Match trait’s non-Send futures in impls if adopting ?Send

If StoreTrait/AtomicUpdateTrait switch to #[async_trait(?Send)], mirror it here:

-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl AtomicUpdateTrait for AtomicUpdate { ... }

-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl StoreTrait for Store { ... }

Prevents mismatched future bounds during implementation.

Also applies to: 79-88, 181-188

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6507f71 and 53ddfe2.

📒 Files selected for processing (4)
  • balius-runtime/src/drivers/chainsync.rs (2 hunks)
  • balius-runtime/src/lib.rs (6 hunks)
  • balius-runtime/src/store/mod.rs (1 hunks)
  • balius-runtime/src/store/redb.rs (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
balius-runtime/src/store/mod.rs (1)
balius-runtime/src/store/redb.rs (7)
  • update_worker_cursor (66-77)
  • commit (79-87)
  • find_chain_point (183-188)
  • write_ahead (190-210)
  • get_worker_cursor (213-224)
  • start_atomic_update (226-232)
  • handle_reset (234-250)
balius-runtime/src/store/redb.rs (1)
balius-runtime/src/store/mod.rs (14)
  • update_worker_cursor (22-22)
  • update_worker_cursor (34-39)
  • commit (23-23)
  • commit (40-45)
  • find_chain_point (56-56)
  • find_chain_point (69-74)
  • write_ahead (57-61)
  • write_ahead (75-90)
  • get_worker_cursor (62-62)
  • get_worker_cursor (91-96)
  • start_atomic_update (63-63)
  • start_atomic_update (97-102)
  • handle_reset (64-64)
  • handle_reset (104-109)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (1)
balius-runtime/src/drivers/chainsync.rs (1)

9-9: LGTM: chainsync updated to async StoreTrait

Import and await changes align with the new async store API.

Also applies to: 56-57

Comment on lines +20 to +24
#[async_trait::async_trait]
pub trait AtomicUpdateTrait {
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error>;
async fn commit(&mut self) -> Result<(), super::Error>;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix non-Send future hazard from locking tokio::Mutex across await

Custom dispatch holds a tokio::MutexGuard across .await, but #[async_trait] defaults to Send futures, leading to "future cannot be sent between threads safely". Make the traits/impls non-Send or refactor to avoid awaiting while holding the guard.

Apply this to relax futures:

-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 pub trait AtomicUpdateTrait {
-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl AtomicUpdateTrait for AtomicUpdate {
-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 pub trait StoreTrait {
-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl StoreTrait for Store {

Alternative (larger refactor): avoid outer Arc<Mutex> and require implementers to handle interior mutability, so no guard is held across .await.

Also applies to: 32-46, 54-65, 67-110

Comment on lines +66 to +72
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
let Some(wx) = self.wx.as_mut() else {
return Err(super::Error::Store(
"Transaction already commited".to_string(),
));
};

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Nit: fix typo in error message ("committed")

-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));
-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));

Also applies to: 80-85

🤖 Prompt for AI Agents
In balius-runtime/src/store/redb.rs around lines 66-72 and 80-85, there is a
typo in the error message "Transaction already commited"; update the string to
the correct spelling "Transaction already committed" in both places where the
error is constructed so the messages read consistently and correctly.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 53ddfe2 and 7f271a7.

📒 Files selected for processing (1)
  • balius-runtime/src/lib.rs (10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (6)
balius-runtime/src/lib.rs (6)

31-34: LGTM: Store abstraction properly exposed.

The new public store module and trait re-exports enable custom store implementations as intended by this PR.


488-494: LGTM: Improved concurrency model.

The shift from Arc<Mutex<WorkerMap>> to Arc<RwLock<HashMap<String, Mutex<LoadedWorker>>>> enables:

  • Multiple concurrent readers (e.g., parallel handle_request calls)
  • Per-worker granular locking for better scalability
  • Reduced contention in read-heavy workloads

The locking hierarchy is consistently applied throughout (outer RwLock → inner worker Mutex).


512-529: LGTM: Correct async locking pattern.

The method correctly:

  • Uses read lock for concurrent access to the worker map
  • Acquires per-worker locks only when needed
  • Properly awaits the async find_chain_point operation

531-585: LGTM: Proper async initialization.

Worker registration correctly:

  • Awaits async store operations
  • Uses write lock for map mutation
  • Wraps worker in Mutex for per-worker concurrency control

653-678: LGTM: Efficient concurrent request handling.

The method correctly uses read lock on the worker map followed by per-worker locking, enabling concurrent request handling for different workers.


627-651: Verify atomic update rollback on error.

The method iterates through workers and updates their cursors within an atomic transaction. If a worker's apply_chain fails (line 642), the error propagates via ?, but it's unclear whether the store_update transaction is properly rolled back.

Ensure that:

  1. AtomicUpdateTrait has a Drop implementation that rolls back uncommitted changes, or
  2. Explicit error handling is added to call a rollback method before returning

This should be verified against the AtomicUpdateTrait implementation in the store module to confirm transaction safety guarantees.


#[error("store error {0}")]
Store(Box<redb::Error>),
Store(String),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Error handling degradation: String loses type information.

Changing Error::Store from Box<redb::Error> to String is a breaking change that significantly degrades error handling:

  • Callers can no longer match on specific redb error variants
  • Programmatic error handling becomes impossible
  • Error context and type information is lost

Consider preserving structured error information while still supporting custom stores. For example, introduce an enum that can hold both redb errors and a generic string variant for custom store errors:

 #[derive(Error, Debug)]
 pub enum Error {
     // ... other variants
 
-    #[error("store error {0}")]
-    Store(String),
+    #[error("store error {0}")]
+    Store(#[from] StoreError),
+}
+
+#[derive(Error, Debug)]
+pub enum StoreError {
+    #[error("redb error: {0}")]
+    Redb(#[from] redb::Error),
+    
+    #[error("redb database error: {0}")]
+    RedbDatabase(#[from] redb::DatabaseError),
+    
+    #[error("redb transaction error: {0}")]
+    RedbTransaction(#[from] redb::TransactionError),
+    
+    #[error("redb table error: {0}")]
+    RedbTable(#[from] redb::TableError),
+    
+    #[error("redb commit error: {0}")]
+    RedbCommit(#[from] redb::CommitError),
+    
+    #[error("redb storage error: {0}")]
+    RedbStorage(#[from] redb::StorageError),
+    
+    #[error("custom store error: {0}")]
+    Custom(String),
 }

This approach maintains backward compatibility for error inspection while supporting custom store implementations.

Also applies to: 87-121

🤖 Prompt for AI Agents
In balius-runtime/src/lib.rs around lines 45 (and similarly for the related
cases at 87-121), the Error::Store variant was changed from Box<redb::Error> to
String which loses typed error information; revert this by introducing a small
enum (e.g., StoreError { Redb(Box<redb::Error>), Other(String) }) and change
Error::Store to hold that enum instead of plain String; add From implementations
(From<Box<redb::Error>> and From<String>) and update all construction sites to
wrap errors into the appropriate enum variant so callers can still match on
concrete redb errors while custom stores can provide string messages.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
balius-runtime/src/lib.rs (1)

45-45: Error handling degradation: String loses type information.

The change from Box<redb::Error> to String prevents callers from matching on specific error variants or performing programmatic error handling. As noted in the previous review, consider introducing a StoreError enum that can hold both redb errors and custom store errors to preserve type information while supporting the pluggable store architecture.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7f271a7 and 5046d00.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • balius-runtime/Cargo.toml (1 hunks)
  • balius-runtime/src/drivers/chainsync.rs (3 hunks)
  • balius-runtime/src/ledgers/u5c.rs (1 hunks)
  • balius-runtime/src/lib.rs (12 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
balius-runtime/src/drivers/chainsync.rs (1)
balius-runtime/src/lib.rs (2)
  • slot (157-161)
  • slot (265-269)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint
  • GitHub Check: build
🔇 Additional comments (8)
balius-runtime/src/ledgers/u5c.rs (1)

128-147: LGTM: API simplification improves readability.

The updated read_params implementation is cleaner and more maintainable. The direct method call and simplified error handling reduce potential error points.

balius-runtime/src/drivers/chainsync.rs (1)

101-109: LGTM: Correct field access for BlockRef.

The change from point.index to point.slot correctly aligns with the BlockRef structure in utxorpc 0.12.0.

balius-runtime/src/lib.rs (5)

489-495: LGTM: Improved concurrency with RwLock pattern.

The change from Arc<Mutex<WorkerMap>> to Arc<RwLock<HashMap<String, Mutex<LoadedWorker>>>> enables better concurrency. Multiple readers can access the worker map simultaneously, and per-worker mutexes ensure thread-safe state updates without blocking unrelated workers.


513-530: LGTM: Correct async implementation with RwLock.

The rewritten chain_cursor method correctly uses read-write lock semantics, allowing concurrent cursor checks while properly locking individual workers during iteration.


572-583: LGTM: Correct async store integration and locking.

Worker registration correctly uses async store operations and follows the new concurrency model by acquiring a write lock for insertion and wrapping the worker in a per-worker Mutex.


628-652: LGTM: Correct atomic update flow with proper locking.

The handle_chain method correctly implements async store operations with proper lock separation: a read lock for the worker map allows concurrent access, while per-worker locks ensure thread-safe state updates during the atomic update.


654-679: LGTM: Efficient request handling with fine-grained locking.

The handle_request method correctly uses a read lock for the worker map and a per-worker lock for processing, allowing concurrent requests to different workers without blocking each other.

balius-runtime/Cargo.toml (1)

26-26: utxorpc 0.12.0 does not exist for the Rust crate ecosystem.

The Rust utxorpc crate only has version 0.11.0 available on crates.io. Version 0.12.0 exists for other ecosystems (PyPI for Python, NuGet for .NET) but not for Rust. Correct the dependency to an existing version (e.g., 0.11.0 or check if a newer Rust release is available).

Likely an incorrect or invalid review comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
balius-runtime/src/lib.rs (1)

47-122: Store error should preserve structured redb error information

Switching Error::Store from a typed redb error representation to a plain String plus to_string() in all From<redb::...> impls discards structured error information and makes programmatic handling (matching on specific redb error kinds) impossible. This is a regression in error handling even though it simplifies custom store integration.

To keep the API usable for both redb and custom stores, consider introducing a dedicated StoreError enum and wrapping it in Error::Store instead of using a String. That way:

  • redb‑specific variants remain matchable by callers,
  • custom backends can still surface opaque errors via a generic variant.

For example:

 #[derive(Error, Debug)]
 pub enum Error {
@@
-    #[error("store error {0}")]
-    Store(String),
+    #[error("store error {0}")]
+    Store(StoreError),
@@
 }
+
+#[derive(Error, Debug)]
+pub enum StoreError {
+    #[error("redb error: {0}")]
+    Redb(#[from] redb::Error),
+
+    #[error("redb database error: {0}")]
+    RedbDatabase(#[from] redb::DatabaseError),
+
+    #[error("redb transaction error: {0}")]
+    RedbTransaction(#[from] redb::TransactionError),
+
+    #[error("redb table error: {0}")]
+    RedbTable(#[from] redb::TableError),
+
+    #[error("redb commit error: {0}")]
+    RedbCommit(#[from] redb::CommitError),
+
+    #[error("redb storage error: {0}")]
+    RedbStorage(#[from] redb::StorageError),
+
+    #[error("custom store error: {0}")]
+    Custom(String),
+}
@@
 impl From<redb::Error> for Error {
     fn from(value: redb::Error) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::Redb(value))
     }
 }
@@
 impl From<redb::DatabaseError> for Error {
     fn from(value: redb::DatabaseError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbDatabase(value))
     }
 }
@@
 impl From<redb::TransactionError> for Error {
     fn from(value: redb::TransactionError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbTransaction(value))
     }
 }
@@
 impl From<redb::TableError> for Error {
     fn from(value: redb::TableError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbTable(value))
     }
 }
@@
 impl From<redb::CommitError> for Error {
     fn from(value: redb::CommitError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbCommit(value))
     }
 }
@@
 impl From<redb::StorageError> for Error {
     fn from(value: redb::StorageError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbStorage(value))
     }
 }

Custom store implementations can then use Error::Store(StoreError::Custom(msg)) (or StoreError::Custom(msg).into()) without being tied to redb, while existing callers can still inspect redb failures precisely.

🧹 Nitpick comments (2)
balius-runtime/src/lib.rs (2)

574-575: Async worker cursor lookup is fine; consider defaulting semantics

Using self.store.get_worker_cursor(id).await? to hydrate the initial cursor is consistent with the new async store API. One design question: should the store be responsible for defaulting missing workers to None vs. treating that as an error? If you expect missing workers to be common (e.g., first start), ensure the store implementation returns Ok(None) instead of an error so this path doesn't fail unnecessarily.


646-673: Atomic update flow: document commit/rollback semantics for Store

The new async write‑ahead + atomic update sequence:

  • log_seq = self.store.write_ahead(undo_blocks, next_block).await?;
  • let mut store_update = self.store.start_atomic_update(log_seq).await?;
  • parallel apply_chain across workers (via join_all)
  • per‑worker update_worker_cursor(&x).await?;
  • final store_update.commit().await?;

is a solid pattern for pluggable backends, but it implicitly relies on a few contracts that should be guaranteed by the Store / AtomicUpdateTrait APIs:

  1. Dropping store_update without calling commit() must reliably roll back / no‑op any partial state, since any worker error causes ? to short‑circuit before commit.
  2. write_ahead + start_atomic_update(log_seq) must be idempotent and safe to retry after a crash between these calls or between write_ahead and commit.
  3. update_worker_cursor should not have externally visible side effects until the enclosing commit() succeeds.

If these guarantees are not already documented and enforced in the trait definitions, I recommend clarifying them there so custom store implementors don't accidentally introduce subtle consistency bugs.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5046d00 and cc9998c.

📒 Files selected for processing (1)
  • balius-runtime/src/lib.rs (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint
  • GitHub Check: build
🔇 Additional comments (2)
balius-runtime/src/lib.rs (2)

33-37: Public store module and re-exports look coherent

Exposing pub mod store; and re‑exporting AtomicUpdateTrait, Store, and StoreTrait gives consumers a clean entry point for custom backends and keeps the runtime API focused. Naming and visibility look good for the intended pluggable store abstraction.


526-529: Async find_chain_point integration looks correct

The new async call self.store.find_chain_point(seq).await cleanly plugs into the existing Result<Option<ChainPoint>, Error> signature, and the lowest‑cursor selection logic is unchanged. No issues here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants