From 5d3b8cba44c264b36e4233fefc3755cf20317544 Mon Sep 17 00:00:00 2001 From: Cody James Littley Date: Wed, 18 Feb 2026 12:52:21 -0600 Subject: [PATCH 01/16] Tweak threading model for writes, not yet complete --- sei-db/wal/changelog.go | 3 + sei-db/wal/wal.go | 294 +++++++++++++++++++--------------------- sei-db/wal/wal_test.go | 32 ++--- 3 files changed, 158 insertions(+), 171 deletions(-) diff --git a/sei-db/wal/changelog.go b/sei-db/wal/changelog.go index b9a44e6e58..b3ea68a3e9 100644 --- a/sei-db/wal/changelog.go +++ b/sei-db/wal/changelog.go @@ -1,6 +1,8 @@ package wal import ( + "context" + "github.com/sei-protocol/sei-chain/sei-db/common/logger" "github.com/sei-protocol/sei-chain/sei-db/proto" ) @@ -12,6 +14,7 @@ type ChangelogWAL = GenericWAL[proto.ChangelogEntry] // This is a convenience wrapper that handles serialization automatically. func NewChangelogWAL(logger logger.Logger, dir string, config Config) (ChangelogWAL, error) { return NewWAL( + context.Background(), func(e proto.ChangelogEntry) ([]byte, error) { return e.Marshal() }, func(data []byte) (proto.ChangelogEntry, error) { var e proto.ChangelogEntry diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index a1cd79ff84..4f2ed93a90 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -1,6 +1,7 @@ package wal import ( + "context" "errors" "fmt" "os" @@ -15,18 +16,23 @@ import ( // WAL is a generic write-ahead log implementation. type WAL[T any] struct { - dir string - log *wal.Log - config Config - logger logger.Logger - marshal MarshalFn[T] - unmarshal UnmarshalFn[T] - writeChannel chan T - mtx sync.RWMutex // guards WAL state: lazy init/close of writeChannel, isClosed checks - asyncWriteErrCh chan error // buffered=1; async writer reports first error non-blocking - isClosed bool - closeCh chan struct{} // signals shutdown to background goroutines - wg sync.WaitGroup // tracks background goroutines (pruning) + ctx context.Context + cancel context.CancelFunc + + dir string + log *wal.Log + config Config + logger logger.Logger + marshal MarshalFn[T] + unmarshal UnmarshalFn[T] + writeChannel chan T + mtx sync.RWMutex // guards WAL state: lazy init/close of writeChannel, isClosed checks + isClosed bool + // Once closed, any errors encountered during closing are written to this channel. If none were encountered, nil is written. + closeChan chan error + wg sync.WaitGroup // tracks background goroutines (pruning) + + writeChan chan *writeRequest[T] } type Config struct { @@ -49,6 +55,7 @@ type Config struct { // logger, dir, config, // ) func NewWAL[T any]( + ctx context.Context, marshal MarshalFn[T], unmarshal UnmarshalFn[T], logger logger.Logger, @@ -62,89 +69,84 @@ func NewWAL[T any]( if err != nil { return nil, err } + + ctx, cancel := context.WithCancel(ctx) + w := &WAL[T]{ - dir: dir, - log: log, - config: config, - logger: logger, - marshal: marshal, - unmarshal: unmarshal, - closeCh: make(chan struct{}), - asyncWriteErrCh: make(chan error, 1), + ctx: ctx, + cancel: cancel, + dir: dir, + log: log, + config: config, + logger: logger, + marshal: marshal, + unmarshal: unmarshal, + closeChan: make(chan error, 1), + writeChan: make(chan *writeRequest[T], config.WriteBufferSize), } - // Start the auto pruning goroutine - if config.KeepRecent > 0 && config.PruneInterval > 0 { - w.startPruning(config.KeepRecent, config.PruneInterval) - } + go w.mainLoop() + return w, nil } +// A request to write to the WAL. +type writeRequest[T any] struct { + // The data to write + entry T + // Errors are returned over this channel, nil is written if completed with no error + errChan chan error +} + // Write will append a new entry to the end of the log. // Whether the writes is in blocking or async manner depends on the buffer size. // For async writes, this also checks for any previous async write errors. func (walLog *WAL[T]) Write(entry T) error { - // Never hold walLog.mtx while doing a potentially-blocking send. Close() may run concurrently. - walLog.mtx.Lock() - defer walLog.mtx.Unlock() - if walLog.isClosed { - return errors.New("wal is closed") - } - if err := walLog.getAsyncWriteErrLocked(); err != nil { - return fmt.Errorf("async WAL write failed previously: %w", err) + + req := &writeRequest[T]{ + entry: entry, + errChan: make(chan error, 1), } - writeBufferSize := walLog.config.WriteBufferSize - if writeBufferSize > 0 { - if walLog.writeChannel == nil { - walLog.writeChannel = make(chan T, writeBufferSize) - walLog.startAsyncWriteGoroutine() - walLog.logger.Info(fmt.Sprintf("WAL async write is enabled with buffer size %d", writeBufferSize)) + + select { + case _, ok := <-walLog.ctx.Done(): + if !ok { + return fmt.Errorf("WAL is closed, cannot write") } - walLog.writeChannel <- entry - } else { - // synchronous write - bz, err := walLog.marshal(entry) - if err != nil { - return err + case walLog.writeChan <- req: + // request submitted sucessfully + } + + select { + case _, ok := <-walLog.ctx.Done(): + if !ok { + return fmt.Errorf("WAL was closed after write was submitted but before write was finalized, write may or may not be durable") } - lastOffset, err := walLog.log.LastIndex() + case err := <-req.errChan: if err != nil { - return err - } - if err := walLog.log.Write(lastOffset+1, bz); err != nil { - return err + return fmt.Errorf("failed to write data: %v", err) } } + return nil } -// startWriteGoroutine will start a goroutine to write entries to the log. -// This should only be called on initialization if async write is enabled -func (walLog *WAL[T]) startAsyncWriteGoroutine() { - walLog.wg.Add(1) - ch := walLog.writeChannel - go func() { - defer walLog.wg.Done() - for entry := range ch { - bz, err := walLog.marshal(entry) - if err != nil { - walLog.recordAsyncWriteErr(err) - return - } - nextOffset, err := walLog.NextOffset() - if err != nil { - walLog.recordAsyncWriteErr(err) - return - } - err = walLog.log.Write(nextOffset, bz) - if err != nil { - walLog.recordAsyncWriteErr(err) - return - } - - } - }() +// This method is called asyncronously in response to a call to Write. +func (walLog *WAL[T]) handleWrite(req *writeRequest[T]) { + bz, err := walLog.marshal(req.entry) + if err != nil { + req.errChan <- fmt.Errorf("marsalling error: %v", err) + return + } + lastOffset, err := walLog.log.LastIndex() + if err != nil { + req.errChan <- fmt.Errorf("error fetching last index: %v", err) + return + } + if err := walLog.log.Write(lastOffset+1, bz); err != nil { + req.errChan <- fmt.Errorf("failed to write: %v", err) + } } // TruncateAfter will remove all entries that are after the provided `index`. @@ -169,14 +171,6 @@ func (walLog *WAL[T]) LastOffset() (index uint64, err error) { return walLog.log.LastIndex() } -func (walLog *WAL[T]) NextOffset() (index uint64, err error) { - lastOffset, err := walLog.log.LastIndex() - if err != nil { - return 0, err - } - return lastOffset + 1, nil -} - // ReadAt will read the log entry at the provided index func (walLog *WAL[T]) ReadAt(index uint64) (T, error) { var zero T @@ -210,88 +204,48 @@ func (walLog *WAL[T]) Replay(start uint64, end uint64, processFn func(index uint return nil } -func (walLog *WAL[T]) startPruning(keepRecent uint64, pruneInterval time.Duration) { - walLog.wg.Add(1) - go func() { - defer walLog.wg.Done() - ticker := time.NewTicker(pruneInterval) - defer ticker.Stop() - for { - select { - case <-walLog.closeCh: - return - case <-ticker.C: - lastIndex, err := walLog.log.LastIndex() - if err != nil { - walLog.logger.Error("failed to get last index for pruning", "err", err) - continue - } - firstIndex, err := walLog.log.FirstIndex() - if err != nil { - walLog.logger.Error("failed to get first index for pruning", "err", err) - continue - } - if lastIndex > keepRecent && (lastIndex-keepRecent) > firstIndex { - prunePos := lastIndex - keepRecent - if err := walLog.TruncateBefore(prunePos); err != nil { - walLog.logger.Error(fmt.Sprintf("failed to prune changelog till index %d", prunePos), "err", err) - } - } - } - } - }() -} - -func (walLog *WAL[T]) Close() error { - walLog.mtx.Lock() - defer walLog.mtx.Unlock() - // Close should only be executed once. - if walLog.isClosed { - return nil - } - // Signal background goroutines to stop. - close(walLog.closeCh) - if walLog.writeChannel != nil { - close(walLog.writeChannel) - walLog.writeChannel = nil +func (walLog *WAL[T]) prune() { + keepRecent := walLog.config.KeepRecent + if keepRecent <= 0 || walLog.config.PruneInterval <= 0 { + // pruning is disabled + return } - // Wait for all background goroutines (pruning + async write) to finish. - walLog.wg.Wait() - walLog.isClosed = true - return walLog.log.Close() -} -// recordAsyncWriteErr records the first async write error (non-blocking). -func (walLog *WAL[T]) recordAsyncWriteErr(err error) { - if err == nil { + lastIndex, err := walLog.log.LastIndex() + if err != nil { + walLog.logger.Error("failed to get last index for pruning", "err", err) return } - select { - case walLog.asyncWriteErrCh <- err: - default: - // already recorded + firstIndex, err := walLog.log.FirstIndex() + if err != nil { + walLog.logger.Error("failed to get first index for pruning", "err", err) + return } -} -// getAsyncWriteErrLocked returns the async write error if present. -// To keep the error "sticky" without an extra cached field, we implement -// a "peek" by reading once and then non-blocking re-inserting the same -// error back into the buffered channel. -// Caller must hold walLog.mtx (read lock is sufficient). -func (walLog *WAL[T]) getAsyncWriteErrLocked() error { - select { - case err := <-walLog.asyncWriteErrCh: - // Put it back so subsequent callers still observe it. - select { - case walLog.asyncWriteErrCh <- err: - default: + if lastIndex > keepRecent && (lastIndex-keepRecent) > firstIndex { + prunePos := lastIndex - keepRecent + if err := walLog.TruncateBefore(prunePos); err != nil { + walLog.logger.Error(fmt.Sprintf("failed to prune changelog till index %d", prunePos), "err", err) } - return err - default: - return nil } } +// Shut down the WAL. This method is idempotent. +func (walLog *WAL[T]) Close() error { + walLog.cancel() + + err := <-walLog.closeChan + + // "reload" error into channel to make Close() idempotent + walLog.closeChan <- err + + if err != nil { + return fmt.Errorf("error encountered while shutting down %v", err) + } + + return nil +} + // open opens the replay log, try to truncate the corrupted tail if there's any func open(dir string, opts *wal.Options) (*wal.Log, error) { if opts == nil { @@ -325,3 +279,33 @@ func open(dir string, opts *wal.Options) (*wal.Log, error) { } return rlog, err } + +// The main loop doing work in the background. +func (walLog *WAL[T]) mainLoop() { + + pruneInterval := walLog.config.PruneInterval + if pruneInterval < time.Second { + pruneInterval = time.Second + } + pruneTicker := time.NewTicker(walLog.config.PruneInterval) + defer pruneTicker.Stop() + + running := true + for running { + select { + case <-walLog.ctx.Done(): + running = false + case req := <-walLog.writeChan: + walLog.handleWrite(req) + case <-pruneTicker.C: + walLog.prune() + } + } + + err := walLog.log.Close() + if err != nil { + walLog.closeChan <- fmt.Errorf("wal returned error during shutdown: %v", err) + } else { + walLog.closeChan <- nil + } +} diff --git a/sei-db/wal/wal_test.go b/sei-db/wal/wal_test.go index 76eaaf268b..e434f24190 100644 --- a/sei-db/wal/wal_test.go +++ b/sei-db/wal/wal_test.go @@ -114,7 +114,7 @@ func TestRandomRead(t *testing.T) { func prepareTestData(t *testing.T) *WAL[proto.ChangelogEntry] { dir := t.TempDir() - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) writeTestData(t, changelog) return changelog @@ -144,7 +144,7 @@ func TestSynchronousWrite(t *testing.T) { func TestAsyncWrite(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) require.NoError(t, err) for _, changes := range ChangeSets { cs := []*proto.NamedChangeSet{ @@ -160,7 +160,7 @@ func TestAsyncWrite(t *testing.T) { } err = changelog.Close() require.NoError(t, err) - changelog, err = NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog, err = NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) require.NoError(t, err) lastIndex, err := changelog.LastOffset() require.NoError(t, err) @@ -253,7 +253,7 @@ func TestTruncateBefore(t *testing.T) { func TestCloseSyncMode(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) // Write some data in sync mode @@ -267,7 +267,7 @@ func TestCloseSyncMode(t *testing.T) { require.True(t, changelog.isClosed) // Reopen and verify data persisted - changelog2, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + changelog2, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, changelog2.Close()) }) @@ -305,14 +305,14 @@ func TestReopenAndContinueWrite(t *testing.T) { dir := t.TempDir() // Create and write initial data - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) writeTestData(t, changelog) err = changelog.Close() require.NoError(t, err) // Reopen and continue writing - changelog2, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + changelog2, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) // Verify nextOffset is correctly set after reopen @@ -343,7 +343,7 @@ func TestReopenAndContinueWrite(t *testing.T) { func TestEmptyLog(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, changelog.Close()) }) @@ -359,7 +359,7 @@ func TestEmptyLog(t *testing.T) { func TestCheckErrorNoError(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) require.NoError(t, err) // Write some data to initialize async mode @@ -389,7 +389,7 @@ func TestAsyncWriteReopenAndContinue(t *testing.T) { dir := t.TempDir() // Create with async write and write data - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) require.NoError(t, err) for _, changes := range ChangeSets { @@ -403,7 +403,7 @@ func TestAsyncWriteReopenAndContinue(t *testing.T) { require.NoError(t, err) // Reopen with async write and continue - changelog2, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog2, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) require.NoError(t, err) // Write more entries @@ -418,7 +418,7 @@ func TestAsyncWriteReopenAndContinue(t *testing.T) { require.NoError(t, err) // Reopen and verify all 6 entries - changelog3, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + changelog3, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, changelog3.Close()) }) @@ -443,7 +443,7 @@ func TestReplaySingleEntry(t *testing.T) { func TestWriteMultipleChangesets(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, changelog.Close()) }) @@ -469,7 +469,7 @@ func TestWriteMultipleChangesets(t *testing.T) { func TestConcurrentCloseWithInFlightAsyncWrites(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 8}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 8}) require.NoError(t, err) // Intentionally avoid t.Cleanup here: we want Close() to race with in-flight async writes. @@ -535,7 +535,7 @@ func TestConcurrentCloseWithInFlightAsyncWrites(t *testing.T) { func TestConcurrentTruncateBeforeWithAsyncWrites(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{ + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{ WriteBufferSize: 10, KeepRecent: 10, PruneInterval: 1 * time.Millisecond, @@ -606,7 +606,7 @@ func TestConcurrentTruncateBeforeWithAsyncWrites(t *testing.T) { func TestGetLastIndex(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) writeTestData(t, changelog) err = changelog.Close() From ee4318ecfa44552a0be3f83636d01fed025cee5a Mon Sep 17 00:00:00 2001 From: Cody James Littley Date: Wed, 18 Feb 2026 14:40:08 -0600 Subject: [PATCH 02/16] small tweaks --- sei-db/ledger_db/parquet/wal.go | 2 + sei-db/wal/changelog.go | 6 + sei-db/wal/wal.go | 382 +++++++++++++++++++++++++++----- sei-db/wal/wal_test.go | 28 ++- 4 files changed, 348 insertions(+), 70 deletions(-) diff --git a/sei-db/ledger_db/parquet/wal.go b/sei-db/ledger_db/parquet/wal.go index d0a4ccbd07..6c7fb299e8 100644 --- a/sei-db/ledger_db/parquet/wal.go +++ b/sei-db/ledger_db/parquet/wal.go @@ -1,6 +1,7 @@ package parquet import ( + "context" "encoding/binary" "fmt" "os" @@ -109,6 +110,7 @@ func NewWAL(logger dbLogger.Logger, dir string) (dbwal.GenericWAL[WALEntry], err return nil, err } return dbwal.NewWAL( + context.Background(), encodeWALEntry, decodeWALEntry, logger, diff --git a/sei-db/wal/changelog.go b/sei-db/wal/changelog.go index b3ea68a3e9..9a30707e9f 100644 --- a/sei-db/wal/changelog.go +++ b/sei-db/wal/changelog.go @@ -13,6 +13,12 @@ type ChangelogWAL = GenericWAL[proto.ChangelogEntry] // NewChangelogWAL creates a new WAL for ChangelogEntry. // This is a convenience wrapper that handles serialization automatically. func NewChangelogWAL(logger logger.Logger, dir string, config Config) (ChangelogWAL, error) { + + if config.BufferSize > 0 { + // Emulate legacy behavior. Originally, buffer size >0 enabled async writes. + config.AsyncWrites = true + } + return NewWAL( context.Background(), func(e proto.ChangelogEntry) ([]byte, error) { return e.Marshal() }, diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index 4f2ed93a90..8f4594f43b 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -19,26 +19,44 @@ type WAL[T any] struct { ctx context.Context cancel context.CancelFunc - dir string - log *wal.Log - config Config - logger logger.Logger - marshal MarshalFn[T] - unmarshal UnmarshalFn[T] - writeChannel chan T - mtx sync.RWMutex // guards WAL state: lazy init/close of writeChannel, isClosed checks - isClosed bool - // Once closed, any errors encountered during closing are written to this channel. If none were encountered, nil is written. - closeChan chan error - wg sync.WaitGroup // tracks background goroutines (pruning) - - writeChan chan *writeRequest[T] + dir string + log *wal.Log + config Config + logger logger.Logger + marshal MarshalFn[T] + unmarshal UnmarshalFn[T] + mtx sync.RWMutex // guards WAL state: lazy init/close of writeChannel, isClosed checks + + writeChan chan *writeRequest[T] + truncateChan chan *truncateRequest + getOffsetChan chan *getOffsetRequest + readAtChan chan *readAtRequest[T] + replayChan chan *replayRequest[T] + // closeReqChan is sent on by Close() to request shutdown. Processed last so in-flight work can complete. + closeReqChan chan struct{} + // Once shut down, any errors encountered during closing are written to closeErrChan. If none, nil is written. + closeErrChan chan error } +// Configuration for the WAL. type Config struct { - WriteBufferSize int - KeepRecent uint64 - PruneInterval time.Duration + // The number of recent entries to keep in the log. + KeepRecent uint64 + // The interval at which to prune the log. + PruneInterval time.Duration + // If true, the writes are asynchronous, and will return immediately without waiting for the write to be durable + AsyncWrites bool + // The size of internal buffers. + // + // In order to support legacy configuration, if BufferSize is 0, then the buffer size is set to 1024. + BufferSize int + + // If true, do an fsync after each write. + FsyncEnabled bool + + // If true, make a deep copy of the data for every write. If false, then it is not safe to modify the data after + // reading/writing it. + DeepCopyEnabled bool } // NewWAL creates a new generic write-ahead log that persists entries. @@ -63,26 +81,36 @@ func NewWAL[T any]( config Config, ) (*WAL[T], error) { log, err := open(dir, &wal.Options{ - NoSync: true, - NoCopy: true, + NoSync: !config.FsyncEnabled, + NoCopy: !config.DeepCopyEnabled, }) if err != nil { return nil, err } + bufferSize := config.BufferSize + if config.BufferSize == 0 { + bufferSize = 1024 + } + ctx, cancel := context.WithCancel(ctx) w := &WAL[T]{ - ctx: ctx, - cancel: cancel, - dir: dir, - log: log, - config: config, - logger: logger, - marshal: marshal, - unmarshal: unmarshal, - closeChan: make(chan error, 1), - writeChan: make(chan *writeRequest[T], config.WriteBufferSize), + ctx: ctx, + cancel: cancel, + dir: dir, + log: log, + config: config, + logger: logger, + marshal: marshal, + unmarshal: unmarshal, + closeReqChan: make(chan struct{}), + closeErrChan: make(chan error, 1), + writeChan: make(chan *writeRequest[T], bufferSize), + truncateChan: make(chan *truncateRequest, bufferSize), + getOffsetChan: make(chan *getOffsetRequest, bufferSize), + readAtChan: make(chan *readAtRequest[T], bufferSize), + replayChan: make(chan *replayRequest[T], bufferSize), } go w.mainLoop() @@ -118,10 +146,16 @@ func (walLog *WAL[T]) Write(entry T) error { // request submitted sucessfully } + if walLog.config.AsyncWrites { + // Do not wait for the write to be durable + return nil + } + select { case _, ok := <-walLog.ctx.Done(): if !ok { - return fmt.Errorf("WAL was closed after write was submitted but before write was finalized, write may or may not be durable") + return fmt.Errorf("WAL was closed after write was submitted but before write was finalized, " + + "write may or may not be durable") } case err := <-req.errChan: if err != nil { @@ -147,61 +181,252 @@ func (walLog *WAL[T]) handleWrite(req *writeRequest[T]) { if err := walLog.log.Write(lastOffset+1, bz); err != nil { req.errChan <- fmt.Errorf("failed to write: %v", err) } + + req.errChan <- nil +} + +// A request to truncate the log. +type truncateRequest struct { + // If true, truncate before the provided index. Otherwise, truncate after the provided index. + before bool + // The index to truncate at. + index uint64 + // Errors are returned over this channel, nil is written if completed with no error + errChan chan error } // TruncateAfter will remove all entries that are after the provided `index`. // In other words the entry at `index` becomes the last entry in the log. func (walLog *WAL[T]) TruncateAfter(index uint64) error { - return walLog.log.TruncateBack(index) + return walLog.sendTruncate(false, index) } // TruncateBefore will remove all entries that are before the provided `index`. // In other words the entry at `index` becomes the first entry in the log. -// Need to add write lock because this would change the next write offset func (walLog *WAL[T]) TruncateBefore(index uint64) error { - return walLog.log.TruncateFront(index) + return walLog.sendTruncate(true, index) } -func (walLog *WAL[T]) FirstOffset() (index uint64, err error) { - return walLog.log.FirstIndex() +// sendTruncate sends a truncate request to the main loop and waits for completion. +func (walLog *WAL[T]) sendTruncate(before bool, index uint64) error { + req := &truncateRequest{ + before: before, + index: index, + errChan: make(chan error, 1), + } + + select { + case _, ok := <-walLog.ctx.Done(): + if !ok { + return fmt.Errorf("WAL is closed, cannot truncate") + } + case walLog.truncateChan <- req: + // request submitted successfully + } + + select { + case _, ok := <-walLog.ctx.Done(): + if !ok { + return fmt.Errorf("WAL was closed after truncate was submitted but before truncate was finalized") + } + case err := <-req.errChan: + if err != nil { + return fmt.Errorf("failed to truncate: %w", err) + } + } + + return nil } -// LastOffset returns the last written offset/index of the log -func (walLog *WAL[T]) LastOffset() (index uint64, err error) { - return walLog.log.LastIndex() +// handleTruncate runs on the main loop and performs the truncation. +func (walLog *WAL[T]) handleTruncate(req *truncateRequest) { + var err error + if req.before { + err = walLog.log.TruncateFront(req.index) + } else { + err = walLog.log.TruncateBack(req.index) + } + if err != nil { + req.errChan <- fmt.Errorf("failed to truncate: %w", err) + return + } + req.errChan <- nil +} + +// A request to get the first or last offset/index of the log. +type getOffsetRequest struct { + // If true, get the first offset/index. Otherwise, get the last offset/index. + first bool + // The channel to send the response to. + responseChan chan *getOffsetResponse } -// ReadAt will read the log entry at the provided index +// A response to a get offset request. +type getOffsetResponse struct { + // The offset/index of the first or last entry in the log. + index uint64 + // The error, if any, encountered while getting the offset. + err error +} + +func (walLog *WAL[T]) FirstOffset() (uint64, error) { + return walLog.sendGetOffset(true) +} + +// LastOffset returns the last written offset/index of the log. +func (walLog *WAL[T]) LastOffset() (uint64, error) { + return walLog.sendGetOffset(false) +} + +// sendGetOffset sends a get-offset request to the main loop and waits for the response. +func (walLog *WAL[T]) sendGetOffset(first bool) (uint64, error) { + req := &getOffsetRequest{ + first: first, + responseChan: make(chan *getOffsetResponse, 1), + } + + select { + case _, ok := <-walLog.ctx.Done(): + if !ok { + return 0, fmt.Errorf("WAL is closed, cannot get offset") + } + case walLog.getOffsetChan <- req: + // request submitted successfully + } + + select { + case <-walLog.ctx.Done(): + return 0, fmt.Errorf("WAL was closed after get offset was submitted but before response") + case resp := <-req.responseChan: + if resp.err != nil { + return 0, resp.err + } + return resp.index, nil + } +} + +// handleGetOffset runs on the main loop and returns the first or last index. +func (walLog *WAL[T]) handleGetOffset(req *getOffsetRequest) { + var index uint64 + var err error + if req.first { + index, err = walLog.log.FirstIndex() + } else { + index, err = walLog.log.LastIndex() + } + req.responseChan <- &getOffsetResponse{index: index, err: err} +} + +// A request to read an entry at a specific index. +type readAtRequest[T any] struct { + index uint64 + responseChan chan *readAtResponse[T] +} + +// A response to a read-at request. +type readAtResponse[T any] struct { + entry T + err error +} + +// ReadAt will read the log entry at the provided index. func (walLog *WAL[T]) ReadAt(index uint64) (T, error) { var zero T - bz, err := walLog.log.Read(index) + req := &readAtRequest[T]{ + index: index, + responseChan: make(chan *readAtResponse[T], 1), + } + + select { + case _, ok := <-walLog.ctx.Done(): + if !ok { + return zero, fmt.Errorf("WAL is closed, cannot read") + } + case walLog.readAtChan <- req: + // request submitted successfully + } + + select { + case <-walLog.ctx.Done(): + return zero, fmt.Errorf("WAL was closed after read was submitted but before response") + case resp := <-req.responseChan: + if resp.err != nil { + return zero, resp.err + } + return resp.entry, nil + } +} + +// handleReadAt runs on the main loop and reads and unmarshals the entry at the index. +func (walLog *WAL[T]) handleReadAt(req *readAtRequest[T]) { + var zero T + bz, err := walLog.log.Read(req.index) if err != nil { - return zero, fmt.Errorf("read log failed, %w", err) + req.responseChan <- &readAtResponse[T]{entry: zero, err: fmt.Errorf("read log failed, %w", err)} + return } entry, err := walLog.unmarshal(bz) if err != nil { - return zero, fmt.Errorf("unmarshal rlog failed, %w", err) + req.responseChan <- &readAtResponse[T]{entry: zero, err: fmt.Errorf("unmarshal rlog failed, %w", err)} + return } - return entry, nil + req.responseChan <- &readAtResponse[T]{entry: entry, err: nil} } -// Replay will read the replay log and process each log entry with the provided function +// A request to replay a range of the log. +type replayRequest[T any] struct { + start uint64 + end uint64 + processFn func(index uint64, entry T) error + errChan chan error +} + +// Replay will read the replay log and process each log entry with the provided function. func (walLog *WAL[T]) Replay(start uint64, end uint64, processFn func(index uint64, entry T) error) error { - for i := start; i <= end; i++ { + req := &replayRequest[T]{ + start: start, + end: end, + processFn: processFn, + errChan: make(chan error, 1), + } + + select { + case _, ok := <-walLog.ctx.Done(): + if !ok { + return fmt.Errorf("WAL is closed, cannot replay") + } + case walLog.replayChan <- req: + // request submitted successfully + } + + select { + case <-walLog.ctx.Done(): + return fmt.Errorf("WAL was closed after replay was submitted but before completion") + case err := <-req.errChan: + return err + } +} + +// handleReplay runs on the main loop and replays the range, calling processFn for each entry. +func (walLog *WAL[T]) handleReplay(req *replayRequest[T]) { + for i := req.start; i <= req.end; i++ { bz, err := walLog.log.Read(i) if err != nil { - return fmt.Errorf("read log failed, %w", err) + req.errChan <- fmt.Errorf("read log failed, %w", err) + return } entry, err := walLog.unmarshal(bz) if err != nil { - return fmt.Errorf("unmarshal rlog failed, %w", err) + req.errChan <- fmt.Errorf("unmarshal rlog failed, %w", err) + return } - err = processFn(i, entry) + err = req.processFn(i, entry) if err != nil { - return err + req.errChan <- err + return } } - return nil + req.errChan <- nil } func (walLog *WAL[T]) prune() { @@ -224,20 +449,46 @@ func (walLog *WAL[T]) prune() { if lastIndex > keepRecent && (lastIndex-keepRecent) > firstIndex { prunePos := lastIndex - keepRecent - if err := walLog.TruncateBefore(prunePos); err != nil { + if err := walLog.log.TruncateFront(prunePos); err != nil { walLog.logger.Error(fmt.Sprintf("failed to prune changelog till index %d", prunePos), "err", err) } } } -// Shut down the WAL. This method is idempotent. +// drain processes all pending requests so in-flight work completes before shutdown. +func (walLog *WAL[T]) drain() { + for { + select { + case req := <-walLog.writeChan: + walLog.handleWrite(req) + case req := <-walLog.truncateChan: + walLog.handleTruncate(req) + case req := <-walLog.getOffsetChan: + walLog.handleGetOffset(req) + case req := <-walLog.readAtChan: + walLog.handleReadAt(req) + case req := <-walLog.replayChan: + walLog.handleReplay(req) + default: + return + } + } +} + +// Shut down the WAL. Sends a close request to the main loop so in-flight writes (and other work) +// can complete before teardown. Idempotent. func (walLog *WAL[T]) Close() error { - walLog.cancel() + select { + case <-walLog.ctx.Done(): + // already closed + case walLog.closeReqChan <- struct{}{}: + // close request sent + } - err := <-walLog.closeChan + err := <-walLog.closeErrChan // "reload" error into channel to make Close() idempotent - walLog.closeChan <- err + walLog.closeErrChan <- err if err != nil { return fmt.Errorf("error encountered while shutting down %v", err) @@ -287,7 +538,7 @@ func (walLog *WAL[T]) mainLoop() { if pruneInterval < time.Second { pruneInterval = time.Second } - pruneTicker := time.NewTicker(walLog.config.PruneInterval) + pruneTicker := time.NewTicker(pruneInterval) defer pruneTicker.Stop() running := true @@ -297,15 +548,30 @@ func (walLog *WAL[T]) mainLoop() { running = false case req := <-walLog.writeChan: walLog.handleWrite(req) + case req := <-walLog.truncateChan: + walLog.handleTruncate(req) + case req := <-walLog.getOffsetChan: + walLog.handleGetOffset(req) + case req := <-walLog.readAtChan: + walLog.handleReadAt(req) + case req := <-walLog.replayChan: + walLog.handleReplay(req) case <-pruneTicker.C: walLog.prune() + case <-walLog.closeReqChan: + running = false } } + // drain pending work, then tear down + walLog.drain() + err := walLog.log.Close() if err != nil { - walLog.closeChan <- fmt.Errorf("wal returned error during shutdown: %v", err) + walLog.closeErrChan <- fmt.Errorf("wal returned error during shutdown: %v", err) } else { - walLog.closeChan <- nil + walLog.closeErrChan <- nil } + + walLog.cancel() } diff --git a/sei-db/wal/wal_test.go b/sei-db/wal/wal_test.go index e434f24190..7b53e87ad2 100644 --- a/sei-db/wal/wal_test.go +++ b/sei-db/wal/wal_test.go @@ -144,7 +144,8 @@ func TestSynchronousWrite(t *testing.T) { func TestAsyncWrite(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, + Config{BufferSize: 10, AsyncWrites: true}) require.NoError(t, err) for _, changes := range ChangeSets { cs := []*proto.NamedChangeSet{ @@ -160,7 +161,8 @@ func TestAsyncWrite(t *testing.T) { } err = changelog.Close() require.NoError(t, err) - changelog, err = NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog, err = NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, + Config{BufferSize: 10, AsyncWrites: true}) require.NoError(t, err) lastIndex, err := changelog.LastOffset() require.NoError(t, err) @@ -263,9 +265,6 @@ func TestCloseSyncMode(t *testing.T) { err = changelog.Close() require.NoError(t, err) - // Verify isClosed is set - require.True(t, changelog.isClosed) - // Reopen and verify data persisted changelog2, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) require.NoError(t, err) @@ -359,7 +358,8 @@ func TestEmptyLog(t *testing.T) { func TestCheckErrorNoError(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, + Config{BufferSize: 10, AsyncWrites: true}) require.NoError(t, err) // Write some data to initialize async mode @@ -389,7 +389,8 @@ func TestAsyncWriteReopenAndContinue(t *testing.T) { dir := t.TempDir() // Create with async write and write data - changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, + Config{BufferSize: 10, AsyncWrites: true}) require.NoError(t, err) for _, changes := range ChangeSets { @@ -403,7 +404,8 @@ func TestAsyncWriteReopenAndContinue(t *testing.T) { require.NoError(t, err) // Reopen with async write and continue - changelog2, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 10}) + changelog2, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, + Config{BufferSize: 10, AsyncWrites: true}) require.NoError(t, err) // Write more entries @@ -469,7 +471,8 @@ func TestWriteMultipleChangesets(t *testing.T) { func TestConcurrentCloseWithInFlightAsyncWrites(t *testing.T) { dir := t.TempDir() - changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{WriteBufferSize: 8}) + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, + Config{BufferSize: 8, AsyncWrites: true}) require.NoError(t, err) // Intentionally avoid t.Cleanup here: we want Close() to race with in-flight async writes. @@ -536,9 +539,10 @@ func TestConcurrentCloseWithInFlightAsyncWrites(t *testing.T) { func TestConcurrentTruncateBeforeWithAsyncWrites(t *testing.T) { dir := t.TempDir() changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{ - WriteBufferSize: 10, - KeepRecent: 10, - PruneInterval: 1 * time.Millisecond, + BufferSize: 10, + KeepRecent: 10, + PruneInterval: 1 * time.Millisecond, + AsyncWrites: true, }) require.NoError(t, err) From 28a91d203d7b29f657c782f75a296e77121ef6c9 Mon Sep 17 00:00:00 2001 From: Cody James Littley Date: Thu, 19 Feb 2026 08:04:19 -0600 Subject: [PATCH 03/16] cleanup --- sei-db/wal/wal.go | 111 +++++++++++++++++++++++++++----- sei-db/wal/wal_bench_test.go | 121 +++++++++++++++++++++++++++++++++++ sei-db/wal/wal_test.go | 54 ++++++++++++++++ 3 files changed, 271 insertions(+), 15 deletions(-) create mode 100644 sei-db/wal/wal_bench_test.go diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index 8f4594f43b..62b1f48789 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -27,6 +27,9 @@ type WAL[T any] struct { unmarshal UnmarshalFn[T] mtx sync.RWMutex // guards WAL state: lazy init/close of writeChannel, isClosed checks + // The size of write batches. + writeBatchSize int + writeChan chan *writeRequest[T] truncateChan chan *truncateRequest getOffsetChan chan *getOffsetRequest @@ -42,15 +45,21 @@ type WAL[T any] struct { type Config struct { // The number of recent entries to keep in the log. KeepRecent uint64 + // The interval at which to prune the log. PruneInterval time.Duration + // If true, the writes are asynchronous, and will return immediately without waiting for the write to be durable AsyncWrites bool + // The size of internal buffers. // // In order to support legacy configuration, if BufferSize is 0, then the buffer size is set to 1024. BufferSize int + // The size of write batches. If less than or equal to 1, then no batching is done. + WriteBatchSize int + // If true, do an fsync after each write. FsyncEnabled bool @@ -93,24 +102,30 @@ func NewWAL[T any]( bufferSize = 1024 } + writeBatchSize := config.WriteBatchSize + if writeBatchSize <= 1 { + writeBatchSize = 0 + } + ctx, cancel := context.WithCancel(ctx) w := &WAL[T]{ - ctx: ctx, - cancel: cancel, - dir: dir, - log: log, - config: config, - logger: logger, - marshal: marshal, - unmarshal: unmarshal, - closeReqChan: make(chan struct{}), - closeErrChan: make(chan error, 1), - writeChan: make(chan *writeRequest[T], bufferSize), - truncateChan: make(chan *truncateRequest, bufferSize), - getOffsetChan: make(chan *getOffsetRequest, bufferSize), - readAtChan: make(chan *readAtRequest[T], bufferSize), - replayChan: make(chan *replayRequest[T], bufferSize), + ctx: ctx, + cancel: cancel, + dir: dir, + log: log, + config: config, + logger: logger, + marshal: marshal, + unmarshal: unmarshal, + writeBatchSize: writeBatchSize, + closeReqChan: make(chan struct{}), + closeErrChan: make(chan error, 1), + writeChan: make(chan *writeRequest[T], bufferSize), + truncateChan: make(chan *truncateRequest, bufferSize), + getOffsetChan: make(chan *getOffsetRequest, bufferSize), + readAtChan: make(chan *readAtRequest[T], bufferSize), + replayChan: make(chan *replayRequest[T], bufferSize), } go w.mainLoop() @@ -168,6 +183,16 @@ func (walLog *WAL[T]) Write(entry T) error { // This method is called asyncronously in response to a call to Write. func (walLog *WAL[T]) handleWrite(req *writeRequest[T]) { + if walLog.writeBatchSize <= 1 { + walLog.handleUnbatchedWrite(req) + } else { + walLog.handleBatchedWrite(req) + } +} + +// handleUnbatchedWrite is called when no batching is enabled. Processes a single write request. +func (walLog *WAL[T]) handleUnbatchedWrite(req *writeRequest[T]) { + bz, err := walLog.marshal(req.entry) if err != nil { req.errChan <- fmt.Errorf("marsalling error: %v", err) @@ -185,6 +210,62 @@ func (walLog *WAL[T]) handleWrite(req *writeRequest[T]) { req.errChan <- nil } +// handleBatchedWrite is called when batching is enabled. This method may pop pending writes from the writeChan and +// include them in the batch. +func (walLog *WAL[T]) handleBatchedWrite(req *writeRequest[T]) { + + requests := make([]*writeRequest[T], 0) + requests = append(requests, req) + + keepLooking := true + for keepLooking && len(requests) < walLog.writeBatchSize { + select { + case req := <-walLog.writeChan: + requests = append(requests, req) + default: + // No more pending writes immediately available, so process the batch we have so far. + keepLooking = false + } + } + + lastOffset, err := walLog.log.LastIndex() + if err != nil { + err = fmt.Errorf("error fetching last index: %v", err) + for _, req := range requests { + req.errChan <- err + } + return + } + + batch := &wal.Batch{} + + for _, req := range requests { + bz, err := walLog.marshal(req.entry) + if err != nil { + // TODO: this can torpedo the entire batch, need to handle this better + err = fmt.Errorf("marsalling error: %v", err) + for _, req := range requests { + req.errChan <- err + } + return + } + batch.Write(lastOffset+1, bz) + lastOffset++ + } + + if err := walLog.log.WriteBatch(batch); err != nil { + err = fmt.Errorf("failed to write batch: %v", err) + for _, r := range requests { + r.errChan <- err + } + return + } + + for _, r := range requests { + r.errChan <- nil + } +} + // A request to truncate the log. type truncateRequest struct { // If true, truncate before the provided index. Otherwise, truncate after the provided index. diff --git a/sei-db/wal/wal_bench_test.go b/sei-db/wal/wal_bench_test.go new file mode 100644 index 0000000000..37245a7802 --- /dev/null +++ b/sei-db/wal/wal_bench_test.go @@ -0,0 +1,121 @@ +package wal + +import ( + "crypto/rand" + "fmt" + "testing" + "time" + + "github.com/tidwall/wal" + + "github.com/sei-protocol/sei-chain/sei-db/common/logger" +) + +func makePayload(size int) []byte { + buf := make([]byte, size) + if _, err := rand.Read(buf); err != nil { + panic(err) + } + return buf +} + +func BenchmarkTidwallWALWrite(b *testing.B) { + entrySizes := []int{64, 128, 1024, 4096, 16384, 65536} + fsyncModes := []struct { + name string + noSync bool + }{ + {"fsync", false}, + {"no-fsync", true}, + } + + for _, es := range entrySizes { + for _, fm := range fsyncModes { + name := fmt.Sprintf("entry=%dB/%s", es, fm.name) + noSync := fm.noSync + payload := makePayload(es) + + b.Run(name, func(b *testing.B) { + dir := b.TempDir() + log, err := wal.Open(dir, &wal.Options{ + NoSync: noSync, + NoCopy: true, + }) + if err != nil { + b.Fatal(err) + } + b.Cleanup(func() { _ = log.Close() }) + + b.ResetTimer() + start := time.Now() + + for i := 0; i < b.N; i++ { + if err := log.Write(uint64(i+1), payload); err != nil { + b.Fatal(err) + } + } + + elapsed := time.Since(start) + totalBytes := float64(b.N) * float64(es) + + b.ReportMetric(totalBytes/elapsed.Seconds(), "bytes/s") + b.ReportMetric(elapsed.Seconds()/float64(b.N)*1e6, "us/write") + }) + } + } +} + +func BenchmarkWALWrapperWrite(b *testing.B) { + entrySizes := []int{64, 128, 1024, 4096, 16384, 65536} + writeModes := []struct { + name string + bufferSize int + }{ + {"buffer-0", 0}, + {"buffer-256", 256}, + } + + marshal := func(entry []byte) ([]byte, error) { return entry, nil } + unmarshal := func(data []byte) ([]byte, error) { return data, nil } + + for _, es := range entrySizes { + for _, wm := range writeModes { + name := fmt.Sprintf("entry=%dB/%s", es, wm.name) + bufSize := wm.bufferSize + payload := makePayload(es) + + b.Run(name, func(b *testing.B) { + dir := b.TempDir() + w, err := NewWAL(b.Context(), marshal, unmarshal, logger.NewNopLogger(), dir, Config{ + BufferSize: bufSize, + FsyncEnabled: true, + AsyncWrites: bufSize > 0, + WriteBatchSize: 128, + }) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + start := time.Now() + + for i := 0; i < b.N; i++ { + if err := w.Write(payload); err != nil { + b.Fatal(err) + } + } + + if err := w.Close(); err != nil { + b.Fatal(err) + } + b.StopTimer() + + elapsed := time.Since(start) + totalBytes := float64(b.N) * float64(es) + + b.ReportMetric(totalBytes/elapsed.Seconds(), "bytes/s") + b.ReportMetric(elapsed.Seconds()/float64(b.N)*1e6, "us/write") + }) + } + } +} diff --git a/sei-db/wal/wal_test.go b/sei-db/wal/wal_test.go index 7b53e87ad2..ff5a0ca5a9 100644 --- a/sei-db/wal/wal_test.go +++ b/sei-db/wal/wal_test.go @@ -443,6 +443,60 @@ func TestReplaySingleEntry(t *testing.T) { require.Equal(t, 1, count) } +// TestBatchWrite exercises the batch write path by writing many entries quickly so they +// are processed in batches, then verifies all entries were written correctly. +func TestBatchWrite(t *testing.T) { + const ( + batchSize = 8 + numWrites = 32 + ) + dir := t.TempDir() + changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, + Config{ + WriteBatchSize: batchSize, + AsyncWrites: true, + BufferSize: 64, + }) + require.NoError(t, err) + + // Pump writes quickly so the main loop batches them (handleBatchedWrite drains up to batchSize). + for i := 0; i < numWrites; i++ { + entry := &proto.ChangelogEntry{} + entry.Changesets = []*proto.NamedChangeSet{{ + Name: fmt.Sprintf("batch-%d", i), + Changeset: iavl.ChangeSet{Pairs: MockKVPairs(fmt.Sprintf("key-%d", i), fmt.Sprintf("val-%d", i))}, + }} + require.NoError(t, changelog.Write(*entry)) + } + + require.NoError(t, changelog.Close()) + + // Reopen and verify all entries + changelog2, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, changelog2.Close()) }) + + first, err := changelog2.FirstOffset() + require.NoError(t, err) + require.Equal(t, uint64(1), first) + last, err := changelog2.LastOffset() + require.NoError(t, err) + require.Equal(t, uint64(numWrites), last) + + var replayed int + err = changelog2.Replay(1, uint64(numWrites), func(index uint64, entry proto.ChangelogEntry) error { + replayed++ + require.Len(t, entry.Changesets, 1) + require.Equal(t, fmt.Sprintf("batch-%d", index-1), entry.Changesets[0].Name) + require.Len(t, entry.Changesets[0].Changeset.Pairs, 1) + require.Equal(t, []byte(fmt.Sprintf("key-%d", index-1)), entry.Changesets[0].Changeset.Pairs[0].Key) + require.Equal(t, []byte(fmt.Sprintf("val-%d", index-1)), entry.Changesets[0].Changeset.Pairs[0].Value) + return nil + }) + require.NoError(t, err) + require.Equal(t, numWrites, replayed) +} + func TestWriteMultipleChangesets(t *testing.T) { dir := t.TempDir() changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{}) From 9724f02d5dc2d665e3589b496479cec302af77d4 Mon Sep 17 00:00:00 2001 From: Cody James Littley Date: Thu, 19 Feb 2026 08:06:03 -0600 Subject: [PATCH 04/16] removed file that will be comitted in another branch --- sei-db/wal/wal_bench_test.go | 121 ----------------------------------- 1 file changed, 121 deletions(-) delete mode 100644 sei-db/wal/wal_bench_test.go diff --git a/sei-db/wal/wal_bench_test.go b/sei-db/wal/wal_bench_test.go deleted file mode 100644 index 37245a7802..0000000000 --- a/sei-db/wal/wal_bench_test.go +++ /dev/null @@ -1,121 +0,0 @@ -package wal - -import ( - "crypto/rand" - "fmt" - "testing" - "time" - - "github.com/tidwall/wal" - - "github.com/sei-protocol/sei-chain/sei-db/common/logger" -) - -func makePayload(size int) []byte { - buf := make([]byte, size) - if _, err := rand.Read(buf); err != nil { - panic(err) - } - return buf -} - -func BenchmarkTidwallWALWrite(b *testing.B) { - entrySizes := []int{64, 128, 1024, 4096, 16384, 65536} - fsyncModes := []struct { - name string - noSync bool - }{ - {"fsync", false}, - {"no-fsync", true}, - } - - for _, es := range entrySizes { - for _, fm := range fsyncModes { - name := fmt.Sprintf("entry=%dB/%s", es, fm.name) - noSync := fm.noSync - payload := makePayload(es) - - b.Run(name, func(b *testing.B) { - dir := b.TempDir() - log, err := wal.Open(dir, &wal.Options{ - NoSync: noSync, - NoCopy: true, - }) - if err != nil { - b.Fatal(err) - } - b.Cleanup(func() { _ = log.Close() }) - - b.ResetTimer() - start := time.Now() - - for i := 0; i < b.N; i++ { - if err := log.Write(uint64(i+1), payload); err != nil { - b.Fatal(err) - } - } - - elapsed := time.Since(start) - totalBytes := float64(b.N) * float64(es) - - b.ReportMetric(totalBytes/elapsed.Seconds(), "bytes/s") - b.ReportMetric(elapsed.Seconds()/float64(b.N)*1e6, "us/write") - }) - } - } -} - -func BenchmarkWALWrapperWrite(b *testing.B) { - entrySizes := []int{64, 128, 1024, 4096, 16384, 65536} - writeModes := []struct { - name string - bufferSize int - }{ - {"buffer-0", 0}, - {"buffer-256", 256}, - } - - marshal := func(entry []byte) ([]byte, error) { return entry, nil } - unmarshal := func(data []byte) ([]byte, error) { return data, nil } - - for _, es := range entrySizes { - for _, wm := range writeModes { - name := fmt.Sprintf("entry=%dB/%s", es, wm.name) - bufSize := wm.bufferSize - payload := makePayload(es) - - b.Run(name, func(b *testing.B) { - dir := b.TempDir() - w, err := NewWAL(b.Context(), marshal, unmarshal, logger.NewNopLogger(), dir, Config{ - BufferSize: bufSize, - FsyncEnabled: true, - AsyncWrites: bufSize > 0, - WriteBatchSize: 128, - }) - if err != nil { - b.Fatal(err) - } - - b.ResetTimer() - start := time.Now() - - for i := 0; i < b.N; i++ { - if err := w.Write(payload); err != nil { - b.Fatal(err) - } - } - - if err := w.Close(); err != nil { - b.Fatal(err) - } - b.StopTimer() - - elapsed := time.Since(start) - totalBytes := float64(b.N) * float64(es) - - b.ReportMetric(totalBytes/elapsed.Seconds(), "bytes/s") - b.ReportMetric(elapsed.Seconds()/float64(b.N)*1e6, "us/write") - }) - } - } -} From 4a34e92b228ed9695db51df3de4e78d84bc2dfcc Mon Sep 17 00:00:00 2001 From: Cody James Littley Date: Thu, 19 Feb 2026 08:07:55 -0600 Subject: [PATCH 05/16] cleanup --- sei-db/wal/wal.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index 62b1f48789..4f640ef4a7 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -242,7 +242,6 @@ func (walLog *WAL[T]) handleBatchedWrite(req *writeRequest[T]) { for _, req := range requests { bz, err := walLog.marshal(req.entry) if err != nil { - // TODO: this can torpedo the entire batch, need to handle this better err = fmt.Errorf("marsalling error: %v", err) for _, req := range requests { req.errChan <- err From b80662ab65abfbaa1601330df30cd303de185ba1 Mon Sep 17 00:00:00 2001 From: Cody James Littley Date: Thu, 19 Feb 2026 08:17:56 -0600 Subject: [PATCH 06/16] cleanup --- sei-db/state_db/sc/memiavl/db.go | 3 ++- sei-db/wal/changelog.go | 3 ++- sei-db/wal/wal.go | 36 ++++++++++++++++---------------- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/sei-db/state_db/sc/memiavl/db.go b/sei-db/state_db/sc/memiavl/db.go index ae6d25b3f3..b5df7b919d 100644 --- a/sei-db/state_db/sc/memiavl/db.go +++ b/sei-db/state_db/sc/memiavl/db.go @@ -206,7 +206,8 @@ func OpenDB(logger logger.Logger, targetVersion int64, opts Options) (database * // MemIAVL owns changelog lifecycle: always open the WAL here. // Even in read-only mode we may need WAL replay to reconstruct non-snapshot versions. streamHandler, err := wal.NewChangelogWAL(logger, utils.GetChangelogPath(opts.Dir), wal.Config{ - WriteBufferSize: opts.AsyncCommitBuffer, + BufferSize: opts.AsyncCommitBuffer, + AsyncWrites: opts.AsyncCommitBuffer > 0, }) if err != nil { return nil, fmt.Errorf("failed to open changelog WAL: %w", err) diff --git a/sei-db/wal/changelog.go b/sei-db/wal/changelog.go index 9a30707e9f..93620ca1d8 100644 --- a/sei-db/wal/changelog.go +++ b/sei-db/wal/changelog.go @@ -14,8 +14,9 @@ type ChangelogWAL = GenericWAL[proto.ChangelogEntry] // This is a convenience wrapper that handles serialization automatically. func NewChangelogWAL(logger logger.Logger, dir string, config Config) (ChangelogWAL, error) { + // Legacy compatibility: originally, BufferSize > 0 implied async writes. Preserve that for + // callers who set BufferSize without explicitly setting AsyncWrites. if config.BufferSize > 0 { - // Emulate legacy behavior. Originally, buffer size >0 enabled async writes. config.AsyncWrites = true } diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index 4f640ef4a7..b39a1917dc 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "path/filepath" - "sync" "time" "github.com/tidwall/wal" @@ -25,7 +24,6 @@ type WAL[T any] struct { logger logger.Logger marshal MarshalFn[T] unmarshal UnmarshalFn[T] - mtx sync.RWMutex // guards WAL state: lazy init/close of writeChannel, isClosed checks // The size of write batches. writeBatchSize int @@ -57,7 +55,8 @@ type Config struct { // In order to support legacy configuration, if BufferSize is 0, then the buffer size is set to 1024. BufferSize int - // The size of write batches. If less than or equal to 1, then no batching is done. + // The size of write batches. If less than or equal to 0, a default of 64 is used. + // If 1, no batching is done. WriteBatchSize int // If true, do an fsync after each write. @@ -103,8 +102,8 @@ func NewWAL[T any]( } writeBatchSize := config.WriteBatchSize - if writeBatchSize <= 1 { - writeBatchSize = 0 + if writeBatchSize <= 0 { + writeBatchSize = 64 } ctx, cancel := context.WithCancel(ctx) @@ -158,7 +157,7 @@ func (walLog *WAL[T]) Write(entry T) error { return fmt.Errorf("WAL is closed, cannot write") } case walLog.writeChan <- req: - // request submitted sucessfully + // request submitted successfully } if walLog.config.AsyncWrites { @@ -174,14 +173,14 @@ func (walLog *WAL[T]) Write(entry T) error { } case err := <-req.errChan: if err != nil { - return fmt.Errorf("failed to write data: %v", err) + return fmt.Errorf("failed to write data: %w", err) } } return nil } -// This method is called asyncronously in response to a call to Write. +// This method is called asynchronously in response to a call to Write. func (walLog *WAL[T]) handleWrite(req *writeRequest[T]) { if walLog.writeBatchSize <= 1 { walLog.handleUnbatchedWrite(req) @@ -195,16 +194,17 @@ func (walLog *WAL[T]) handleUnbatchedWrite(req *writeRequest[T]) { bz, err := walLog.marshal(req.entry) if err != nil { - req.errChan <- fmt.Errorf("marsalling error: %v", err) + req.errChan <- fmt.Errorf("marshalling error: %w", err) return } lastOffset, err := walLog.log.LastIndex() if err != nil { - req.errChan <- fmt.Errorf("error fetching last index: %v", err) + req.errChan <- fmt.Errorf("error fetching last index: %w", err) return } if err := walLog.log.Write(lastOffset+1, bz); err != nil { - req.errChan <- fmt.Errorf("failed to write: %v", err) + req.errChan <- fmt.Errorf("failed to write: %w", err) + return } req.errChan <- nil @@ -230,7 +230,7 @@ func (walLog *WAL[T]) handleBatchedWrite(req *writeRequest[T]) { lastOffset, err := walLog.log.LastIndex() if err != nil { - err = fmt.Errorf("error fetching last index: %v", err) + err = fmt.Errorf("error fetching last index: %w", err) for _, req := range requests { req.errChan <- err } @@ -242,7 +242,7 @@ func (walLog *WAL[T]) handleBatchedWrite(req *writeRequest[T]) { for _, req := range requests { bz, err := walLog.marshal(req.entry) if err != nil { - err = fmt.Errorf("marsalling error: %v", err) + err = fmt.Errorf("marshalling error: %w", err) for _, req := range requests { req.errChan <- err } @@ -253,7 +253,7 @@ func (walLog *WAL[T]) handleBatchedWrite(req *writeRequest[T]) { } if err := walLog.log.WriteBatch(batch); err != nil { - err = fmt.Errorf("failed to write batch: %v", err) + err = fmt.Errorf("failed to write batch: %w", err) for _, r := range requests { r.errChan <- err } @@ -447,7 +447,7 @@ func (walLog *WAL[T]) handleReadAt(req *readAtRequest[T]) { } entry, err := walLog.unmarshal(bz) if err != nil { - req.responseChan <- &readAtResponse[T]{entry: zero, err: fmt.Errorf("unmarshal rlog failed, %w", err)} + req.responseChan <- &readAtResponse[T]{entry: zero, err: fmt.Errorf("unmarshal log failed, %w", err)} return } req.responseChan <- &readAtResponse[T]{entry: entry, err: nil} @@ -497,7 +497,7 @@ func (walLog *WAL[T]) handleReplay(req *replayRequest[T]) { } entry, err := walLog.unmarshal(bz) if err != nil { - req.errChan <- fmt.Errorf("unmarshal rlog failed, %w", err) + req.errChan <- fmt.Errorf("unmarshal log failed, %w", err) return } err = req.processFn(i, entry) @@ -571,7 +571,7 @@ func (walLog *WAL[T]) Close() error { walLog.closeErrChan <- err if err != nil { - return fmt.Errorf("error encountered while shutting down %v", err) + return fmt.Errorf("error encountered while shutting down: %w", err) } return nil @@ -648,7 +648,7 @@ func (walLog *WAL[T]) mainLoop() { err := walLog.log.Close() if err != nil { - walLog.closeErrChan <- fmt.Errorf("wal returned error during shutdown: %v", err) + walLog.closeErrChan <- fmt.Errorf("wal returned error during shutdown: %w", err) } else { walLog.closeErrChan <- nil } From 6258bbfe2dd18b607488b93309e1cea42ee4aa18 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 19 Feb 2026 09:42:40 -0600 Subject: [PATCH 07/16] fix build issue --- sei-db/state_db/sc/flatkv/store.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sei-db/state_db/sc/flatkv/store.go b/sei-db/state_db/sc/flatkv/store.go index 78f4bcade5..be0b14943d 100644 --- a/sei-db/state_db/sc/flatkv/store.go +++ b/sei-db/state_db/sc/flatkv/store.go @@ -209,9 +209,9 @@ func (s *CommitStore) open() (retErr error) { // Open changelog WAL changelogPath := filepath.Join(dir, "changelog") changelog, err := wal.NewChangelogWAL(s.log, changelogPath, wal.Config{ - WriteBufferSize: 0, // Synchronous writes for Phase 1 - KeepRecent: 0, // No pruning for Phase 1 - PruneInterval: 0, + AsyncWrites: false, // Synchronous writes for Phase 1 + KeepRecent: 0, // No pruning for Phase 1 + PruneInterval: 0, }) if err != nil { return fmt.Errorf("failed to open changelog: %w", err) From 2af9ae511ed2a912dc21f8d9d0f9d159c1611195 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 19 Feb 2026 09:50:14 -0600 Subject: [PATCH 08/16] fix merge issue --- sei-db/wal/wal_bench_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sei-db/wal/wal_bench_test.go b/sei-db/wal/wal_bench_test.go index b5714b270d..45db20aee2 100644 --- a/sei-db/wal/wal_bench_test.go +++ b/sei-db/wal/wal_bench_test.go @@ -86,8 +86,9 @@ func BenchmarkWALWrapperWrite(b *testing.B) { b.Run(name, func(b *testing.B) { dir := b.TempDir() - w, err := NewWAL(marshal, unmarshal, logger.NewNopLogger(), dir, Config{ - WriteBufferSize: bufSize, + w, err := NewWAL(b.Context(), marshal, unmarshal, logger.NewNopLogger(), dir, Config{ + BufferSize: bufSize, + AsyncWrites: bufSize > 0, }) if err != nil { b.Fatal(err) From d460a146549e4fdcba13d171dc891f526088af30 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 19 Feb 2026 10:26:50 -0600 Subject: [PATCH 09/16] made suggested changes --- sei-db/wal/changelog.go | 7 ----- sei-db/wal/wal.go | 52 +++++++++++++++++++++--------------- sei-db/wal/wal_bench_test.go | 3 +-- sei-db/wal/wal_test.go | 24 ++++++++--------- 4 files changed, 42 insertions(+), 44 deletions(-) diff --git a/sei-db/wal/changelog.go b/sei-db/wal/changelog.go index 93620ca1d8..b3ea68a3e9 100644 --- a/sei-db/wal/changelog.go +++ b/sei-db/wal/changelog.go @@ -13,13 +13,6 @@ type ChangelogWAL = GenericWAL[proto.ChangelogEntry] // NewChangelogWAL creates a new WAL for ChangelogEntry. // This is a convenience wrapper that handles serialization automatically. func NewChangelogWAL(logger logger.Logger, dir string, config Config) (ChangelogWAL, error) { - - // Legacy compatibility: originally, BufferSize > 0 implied async writes. Preserve that for - // callers who set BufferSize without explicitly setting AsyncWrites. - if config.BufferSize > 0 { - config.AsyncWrites = true - } - return NewWAL( context.Background(), func(e proto.ChangelogEntry) ([]byte, error) { return e.Marshal() }, diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index b39a1917dc..841dfc9a0c 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -13,6 +13,12 @@ import ( "github.com/sei-protocol/sei-chain/sei-db/common/logger" ) +// The size of internal channel buffers if the provided buffer size is less than 1. +const defaultBufferSize = 1024 + +// The size of write batches if the provided write batch size is less than 1. +const defaultWriteBatchSize = 64 + // WAL is a generic write-ahead log implementation. type WAL[T any] struct { ctx context.Context @@ -27,16 +33,15 @@ type WAL[T any] struct { // The size of write batches. writeBatchSize int + asyncWrites bool writeChan chan *writeRequest[T] truncateChan chan *truncateRequest getOffsetChan chan *getOffsetRequest readAtChan chan *readAtRequest[T] replayChan chan *replayRequest[T] - // closeReqChan is sent on by Close() to request shutdown. Processed last so in-flight work can complete. - closeReqChan chan struct{} - // Once shut down, any errors encountered during closing are written to closeErrChan. If none, nil is written. - closeErrChan chan error + closeReqChan chan struct{} + closeErrChan chan error } // Configuration for the WAL. @@ -47,13 +52,12 @@ type Config struct { // The interval at which to prune the log. PruneInterval time.Duration - // If true, the writes are asynchronous, and will return immediately without waiting for the write to be durable - AsyncWrites bool - - // The size of internal buffers. + // The size of internal buffers. Also controls whether or not the Write method is asynchronous. // - // In order to support legacy configuration, if BufferSize is 0, then the buffer size is set to 1024. - BufferSize int + // If BufferSize is greater than 0, then the Write method is asynchronous, and the size of internal + // buffers is set to the provided value. If Buffer size is less than 1, then the Write method is synchronous, + // and any internal buffers are set to a default size. + WriteBufferSize int // The size of write batches. If less than or equal to 0, a default of 64 is used. // If 1, no batching is done. @@ -96,14 +100,16 @@ func NewWAL[T any]( return nil, err } - bufferSize := config.BufferSize - if config.BufferSize == 0 { - bufferSize = 1024 + bufferSize := config.WriteBufferSize + if config.WriteBufferSize == 0 { + bufferSize = defaultBufferSize } + asyncWrites := config.WriteBufferSize > 0 + writeBatchSize := config.WriteBatchSize if writeBatchSize <= 0 { - writeBatchSize = 64 + writeBatchSize = defaultWriteBatchSize } ctx, cancel := context.WithCancel(ctx) @@ -118,6 +124,7 @@ func NewWAL[T any]( marshal: marshal, unmarshal: unmarshal, writeBatchSize: writeBatchSize, + asyncWrites: asyncWrites, closeReqChan: make(chan struct{}), closeErrChan: make(chan error, 1), writeChan: make(chan *writeRequest[T], bufferSize), @@ -160,7 +167,7 @@ func (walLog *WAL[T]) Write(entry T) error { // request submitted successfully } - if walLog.config.AsyncWrites { + if walLog.asyncWrites { // Do not wait for the write to be durable return nil } @@ -512,7 +519,8 @@ func (walLog *WAL[T]) handleReplay(req *replayRequest[T]) { func (walLog *WAL[T]) prune() { keepRecent := walLog.config.KeepRecent if keepRecent <= 0 || walLog.config.PruneInterval <= 0 { - // pruning is disabled + // Pruning is disabled. This is a defensive check, since + // this method should only be called if pruning is enabled. return } @@ -614,12 +622,12 @@ func open(dir string, opts *wal.Options) (*wal.Log, error) { // The main loop doing work in the background. func (walLog *WAL[T]) mainLoop() { - pruneInterval := walLog.config.PruneInterval - if pruneInterval < time.Second { - pruneInterval = time.Second + var pruneChan <-chan time.Time + if walLog.config.PruneInterval > 0 && walLog.config.KeepRecent > 0 { + pruneTicker := time.NewTicker(walLog.config.PruneInterval) + defer pruneTicker.Stop() + pruneChan = pruneTicker.C } - pruneTicker := time.NewTicker(pruneInterval) - defer pruneTicker.Stop() running := true for running { @@ -636,7 +644,7 @@ func (walLog *WAL[T]) mainLoop() { walLog.handleReadAt(req) case req := <-walLog.replayChan: walLog.handleReplay(req) - case <-pruneTicker.C: + case <-pruneChan: walLog.prune() case <-walLog.closeReqChan: running = false diff --git a/sei-db/wal/wal_bench_test.go b/sei-db/wal/wal_bench_test.go index 45db20aee2..f4e27ed263 100644 --- a/sei-db/wal/wal_bench_test.go +++ b/sei-db/wal/wal_bench_test.go @@ -87,8 +87,7 @@ func BenchmarkWALWrapperWrite(b *testing.B) { b.Run(name, func(b *testing.B) { dir := b.TempDir() w, err := NewWAL(b.Context(), marshal, unmarshal, logger.NewNopLogger(), dir, Config{ - BufferSize: bufSize, - AsyncWrites: bufSize > 0, + WriteBufferSize: bufSize, }) if err != nil { b.Fatal(err) diff --git a/sei-db/wal/wal_test.go b/sei-db/wal/wal_test.go index ff5a0ca5a9..1f07b0acd3 100644 --- a/sei-db/wal/wal_test.go +++ b/sei-db/wal/wal_test.go @@ -145,7 +145,7 @@ func TestSynchronousWrite(t *testing.T) { func TestAsyncWrite(t *testing.T) { dir := t.TempDir() changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, - Config{BufferSize: 10, AsyncWrites: true}) + Config{WriteBufferSize: 10}) require.NoError(t, err) for _, changes := range ChangeSets { cs := []*proto.NamedChangeSet{ @@ -162,7 +162,7 @@ func TestAsyncWrite(t *testing.T) { err = changelog.Close() require.NoError(t, err) changelog, err = NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, - Config{BufferSize: 10, AsyncWrites: true}) + Config{WriteBufferSize: 10}) require.NoError(t, err) lastIndex, err := changelog.LastOffset() require.NoError(t, err) @@ -359,7 +359,7 @@ func TestEmptyLog(t *testing.T) { func TestCheckErrorNoError(t *testing.T) { dir := t.TempDir() changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, - Config{BufferSize: 10, AsyncWrites: true}) + Config{WriteBufferSize: 10}) require.NoError(t, err) // Write some data to initialize async mode @@ -390,7 +390,7 @@ func TestAsyncWriteReopenAndContinue(t *testing.T) { // Create with async write and write data changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, - Config{BufferSize: 10, AsyncWrites: true}) + Config{WriteBufferSize: 10}) require.NoError(t, err) for _, changes := range ChangeSets { @@ -405,7 +405,7 @@ func TestAsyncWriteReopenAndContinue(t *testing.T) { // Reopen with async write and continue changelog2, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, - Config{BufferSize: 10, AsyncWrites: true}) + Config{WriteBufferSize: 10}) require.NoError(t, err) // Write more entries @@ -453,9 +453,8 @@ func TestBatchWrite(t *testing.T) { dir := t.TempDir() changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{ - WriteBatchSize: batchSize, - AsyncWrites: true, - BufferSize: 64, + WriteBatchSize: batchSize, + WriteBufferSize: 64, }) require.NoError(t, err) @@ -526,7 +525,7 @@ func TestWriteMultipleChangesets(t *testing.T) { func TestConcurrentCloseWithInFlightAsyncWrites(t *testing.T) { dir := t.TempDir() changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, - Config{BufferSize: 8, AsyncWrites: true}) + Config{WriteBufferSize: 8}) require.NoError(t, err) // Intentionally avoid t.Cleanup here: we want Close() to race with in-flight async writes. @@ -593,10 +592,9 @@ func TestConcurrentCloseWithInFlightAsyncWrites(t *testing.T) { func TestConcurrentTruncateBeforeWithAsyncWrites(t *testing.T) { dir := t.TempDir() changelog, err := NewWAL(t.Context(), marshalEntry, unmarshalEntry, logger.NewNopLogger(), dir, Config{ - BufferSize: 10, - KeepRecent: 10, - PruneInterval: 1 * time.Millisecond, - AsyncWrites: true, + WriteBufferSize: 10, + KeepRecent: 10, + PruneInterval: 1 * time.Millisecond, }) require.NoError(t, err) From 156a282660c2aa4a8ddab6bc5636cffae982e1ab Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 19 Feb 2026 10:51:35 -0600 Subject: [PATCH 10/16] made read methods direct calls --- sei-db/state_db/sc/flatkv/store.go | 6 +- sei-db/state_db/sc/memiavl/db.go | 3 +- sei-db/wal/wal.go | 183 ++++------------------------- sei-db/wal/wal_test.go | 3 +- 4 files changed, 26 insertions(+), 169 deletions(-) diff --git a/sei-db/state_db/sc/flatkv/store.go b/sei-db/state_db/sc/flatkv/store.go index be0b14943d..78f4bcade5 100644 --- a/sei-db/state_db/sc/flatkv/store.go +++ b/sei-db/state_db/sc/flatkv/store.go @@ -209,9 +209,9 @@ func (s *CommitStore) open() (retErr error) { // Open changelog WAL changelogPath := filepath.Join(dir, "changelog") changelog, err := wal.NewChangelogWAL(s.log, changelogPath, wal.Config{ - AsyncWrites: false, // Synchronous writes for Phase 1 - KeepRecent: 0, // No pruning for Phase 1 - PruneInterval: 0, + WriteBufferSize: 0, // Synchronous writes for Phase 1 + KeepRecent: 0, // No pruning for Phase 1 + PruneInterval: 0, }) if err != nil { return fmt.Errorf("failed to open changelog: %w", err) diff --git a/sei-db/state_db/sc/memiavl/db.go b/sei-db/state_db/sc/memiavl/db.go index c3e4f6c085..61ba1a7baf 100644 --- a/sei-db/state_db/sc/memiavl/db.go +++ b/sei-db/state_db/sc/memiavl/db.go @@ -206,8 +206,7 @@ func OpenDB(logger logger.Logger, targetVersion int64, opts Options) (database * // MemIAVL owns changelog lifecycle: always open the WAL here. // Even in read-only mode we may need WAL replay to reconstruct non-snapshot versions. streamHandler, err := wal.NewChangelogWAL(logger, utils.GetChangelogPath(opts.Dir), wal.Config{ - BufferSize: opts.AsyncCommitBuffer, - AsyncWrites: opts.AsyncCommitBuffer > 0, + WriteBufferSize: opts.AsyncCommitBuffer, }) if err != nil { return nil, fmt.Errorf("failed to open changelog WAL: %w", err) diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index 841dfc9a0c..5bdb3a48b3 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -37,9 +37,6 @@ type WAL[T any] struct { writeChan chan *writeRequest[T] truncateChan chan *truncateRequest - getOffsetChan chan *getOffsetRequest - readAtChan chan *readAtRequest[T] - replayChan chan *replayRequest[T] closeReqChan chan struct{} closeErrChan chan error } @@ -129,9 +126,6 @@ func NewWAL[T any]( closeErrChan: make(chan error, 1), writeChan: make(chan *writeRequest[T], bufferSize), truncateChan: make(chan *truncateRequest, bufferSize), - getOffsetChan: make(chan *getOffsetRequest, bufferSize), - readAtChan: make(chan *readAtRequest[T], bufferSize), - replayChan: make(chan *replayRequest[T], bufferSize), } go w.mainLoop() @@ -340,180 +334,55 @@ func (walLog *WAL[T]) handleTruncate(req *truncateRequest) { req.errChan <- nil } -// A request to get the first or last offset/index of the log. -type getOffsetRequest struct { - // If true, get the first offset/index. Otherwise, get the last offset/index. - first bool - // The channel to send the response to. - responseChan chan *getOffsetResponse -} - -// A response to a get offset request. -type getOffsetResponse struct { - // The offset/index of the first or last entry in the log. - index uint64 - // The error, if any, encountered while getting the offset. - err error -} - func (walLog *WAL[T]) FirstOffset() (uint64, error) { - return walLog.sendGetOffset(true) + val, err := walLog.log.FirstIndex() + if err != nil { + return 0, fmt.Errorf("failed to get first offset: %w", err) + } + return val, nil } // LastOffset returns the last written offset/index of the log. func (walLog *WAL[T]) LastOffset() (uint64, error) { - return walLog.sendGetOffset(false) -} - -// sendGetOffset sends a get-offset request to the main loop and waits for the response. -func (walLog *WAL[T]) sendGetOffset(first bool) (uint64, error) { - req := &getOffsetRequest{ - first: first, - responseChan: make(chan *getOffsetResponse, 1), - } - - select { - case _, ok := <-walLog.ctx.Done(): - if !ok { - return 0, fmt.Errorf("WAL is closed, cannot get offset") - } - case walLog.getOffsetChan <- req: - // request submitted successfully - } - - select { - case <-walLog.ctx.Done(): - return 0, fmt.Errorf("WAL was closed after get offset was submitted but before response") - case resp := <-req.responseChan: - if resp.err != nil { - return 0, resp.err - } - return resp.index, nil - } -} - -// handleGetOffset runs on the main loop and returns the first or last index. -func (walLog *WAL[T]) handleGetOffset(req *getOffsetRequest) { - var index uint64 - var err error - if req.first { - index, err = walLog.log.FirstIndex() - } else { - index, err = walLog.log.LastIndex() + val, err := walLog.log.LastIndex() + if err != nil { + return 0, fmt.Errorf("failed to get last offset: %w", err) } - req.responseChan <- &getOffsetResponse{index: index, err: err} -} - -// A request to read an entry at a specific index. -type readAtRequest[T any] struct { - index uint64 - responseChan chan *readAtResponse[T] -} - -// A response to a read-at request. -type readAtResponse[T any] struct { - entry T - err error + return val, nil } // ReadAt will read the log entry at the provided index. func (walLog *WAL[T]) ReadAt(index uint64) (T, error) { var zero T - req := &readAtRequest[T]{ - index: index, - responseChan: make(chan *readAtResponse[T], 1), - } - - select { - case _, ok := <-walLog.ctx.Done(): - if !ok { - return zero, fmt.Errorf("WAL is closed, cannot read") - } - case walLog.readAtChan <- req: - // request submitted successfully - } - - select { - case <-walLog.ctx.Done(): - return zero, fmt.Errorf("WAL was closed after read was submitted but before response") - case resp := <-req.responseChan: - if resp.err != nil { - return zero, resp.err - } - return resp.entry, nil - } -} - -// handleReadAt runs on the main loop and reads and unmarshals the entry at the index. -func (walLog *WAL[T]) handleReadAt(req *readAtRequest[T]) { - var zero T - bz, err := walLog.log.Read(req.index) + bz, err := walLog.log.Read(index) if err != nil { - req.responseChan <- &readAtResponse[T]{entry: zero, err: fmt.Errorf("read log failed, %w", err)} - return + return zero, fmt.Errorf("read log failed, %w", err) } entry, err := walLog.unmarshal(bz) if err != nil { - req.responseChan <- &readAtResponse[T]{entry: zero, err: fmt.Errorf("unmarshal log failed, %w", err)} - return + return zero, fmt.Errorf("unmarshal log failed, %w", err) } - req.responseChan <- &readAtResponse[T]{entry: entry, err: nil} -} - -// A request to replay a range of the log. -type replayRequest[T any] struct { - start uint64 - end uint64 - processFn func(index uint64, entry T) error - errChan chan error + return entry, nil } // Replay will read the replay log and process each log entry with the provided function. func (walLog *WAL[T]) Replay(start uint64, end uint64, processFn func(index uint64, entry T) error) error { - req := &replayRequest[T]{ - start: start, - end: end, - processFn: processFn, - errChan: make(chan error, 1), - } - - select { - case _, ok := <-walLog.ctx.Done(): - if !ok { - return fmt.Errorf("WAL is closed, cannot replay") - } - case walLog.replayChan <- req: - // request submitted successfully - } - - select { - case <-walLog.ctx.Done(): - return fmt.Errorf("WAL was closed after replay was submitted but before completion") - case err := <-req.errChan: - return err - } -} - -// handleReplay runs on the main loop and replays the range, calling processFn for each entry. -func (walLog *WAL[T]) handleReplay(req *replayRequest[T]) { - for i := req.start; i <= req.end; i++ { + for i := start; i <= end; i++ { bz, err := walLog.log.Read(i) if err != nil { - req.errChan <- fmt.Errorf("read log failed, %w", err) - return + return fmt.Errorf("read log failed, %w", err) } entry, err := walLog.unmarshal(bz) if err != nil { - req.errChan <- fmt.Errorf("unmarshal log failed, %w", err) - return + return fmt.Errorf("unmarshal log failed, %w", err) + } - err = req.processFn(i, entry) + err = processFn(i, entry) if err != nil { - req.errChan <- err - return + return fmt.Errorf("process log failed, %w", err) } } - req.errChan <- nil + return nil } func (walLog *WAL[T]) prune() { @@ -551,12 +420,6 @@ func (walLog *WAL[T]) drain() { walLog.handleWrite(req) case req := <-walLog.truncateChan: walLog.handleTruncate(req) - case req := <-walLog.getOffsetChan: - walLog.handleGetOffset(req) - case req := <-walLog.readAtChan: - walLog.handleReadAt(req) - case req := <-walLog.replayChan: - walLog.handleReplay(req) default: return } @@ -638,12 +501,6 @@ func (walLog *WAL[T]) mainLoop() { walLog.handleWrite(req) case req := <-walLog.truncateChan: walLog.handleTruncate(req) - case req := <-walLog.getOffsetChan: - walLog.handleGetOffset(req) - case req := <-walLog.readAtChan: - walLog.handleReadAt(req) - case req := <-walLog.replayChan: - walLog.handleReplay(req) case <-pruneChan: walLog.prune() case <-walLog.closeReqChan: diff --git a/sei-db/wal/wal_test.go b/sei-db/wal/wal_test.go index 1f07b0acd3..44aa0ab829 100644 --- a/sei-db/wal/wal_test.go +++ b/sei-db/wal/wal_test.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -297,7 +298,7 @@ func TestReplayWithError(t *testing.T) { return nil }) require.Error(t, err) - require.Equal(t, expectedErr, err) + require.True(t, strings.Contains(err.Error(), expectedErr.Error())) } func TestReopenAndContinueWrite(t *testing.T) { From 7bf1d4f714191d63572697de98e9b94b62a9ec46 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 19 Feb 2026 11:01:05 -0600 Subject: [PATCH 11/16] made suggested change --- sei-db/wal/wal.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index 5bdb3a48b3..3a4f5bd27e 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -35,10 +35,10 @@ type WAL[T any] struct { writeBatchSize int asyncWrites bool - writeChan chan *writeRequest[T] - truncateChan chan *truncateRequest - closeReqChan chan struct{} - closeErrChan chan error + writeChan chan *writeRequest[T] + truncateChan chan *truncateRequest + closeReqChan chan struct{} + closeErrChan chan error } // Configuration for the WAL. @@ -508,6 +508,8 @@ func (walLog *WAL[T]) mainLoop() { } } + walLog.cancel() + // drain pending work, then tear down walLog.drain() @@ -517,6 +519,4 @@ func (walLog *WAL[T]) mainLoop() { } else { walLog.closeErrChan <- nil } - - walLog.cancel() } From f541e84ef695a0f01762e367685355e56bfde992 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 19 Feb 2026 11:26:52 -0600 Subject: [PATCH 12/16] improve readability --- sei-db/wal/wal.go | 83 +++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 38 deletions(-) diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index 3a4f5bd27e..43c89ea270 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -152,13 +152,9 @@ func (walLog *WAL[T]) Write(entry T) error { errChan: make(chan error, 1), } - select { - case _, ok := <-walLog.ctx.Done(): - if !ok { - return fmt.Errorf("WAL is closed, cannot write") - } - case walLog.writeChan <- req: - // request submitted successfully + err := interuptablePush(walLog.ctx, walLog.writeChan, req) + if err != nil { + return fmt.Errorf("failed to push write request: %w", err) } if walLog.asyncWrites { @@ -166,16 +162,12 @@ func (walLog *WAL[T]) Write(entry T) error { return nil } - select { - case _, ok := <-walLog.ctx.Done(): - if !ok { - return fmt.Errorf("WAL was closed after write was submitted but before write was finalized, " + - "write may or may not be durable") - } - case err := <-req.errChan: - if err != nil { - return fmt.Errorf("failed to write data: %w", err) - } + err, pullErr := interuptablePull(walLog.ctx, req.errChan) + if pullErr != nil { + return fmt.Errorf("failed to pull write error: %w", pullErr) + } + if err != nil { + return fmt.Errorf("failed to write data: %w", err) } return nil @@ -296,24 +288,17 @@ func (walLog *WAL[T]) sendTruncate(before bool, index uint64) error { errChan: make(chan error, 1), } - select { - case _, ok := <-walLog.ctx.Done(): - if !ok { - return fmt.Errorf("WAL is closed, cannot truncate") - } - case walLog.truncateChan <- req: - // request submitted successfully + err := interuptablePush(walLog.ctx, walLog.truncateChan, req) + if err != nil { + return fmt.Errorf("failed to push truncate request: %w", err) } - select { - case _, ok := <-walLog.ctx.Done(): - if !ok { - return fmt.Errorf("WAL was closed after truncate was submitted but before truncate was finalized") - } - case err := <-req.errChan: - if err != nil { - return fmt.Errorf("failed to truncate: %w", err) - } + err, pullErr := interuptablePull(walLog.ctx, req.errChan) + if pullErr != nil { + return fmt.Errorf("failed to pull truncate error: %w", pullErr) + } + if err != nil { + return fmt.Errorf("failed to truncate: %w", err) } return nil @@ -429,14 +414,12 @@ func (walLog *WAL[T]) drain() { // Shut down the WAL. Sends a close request to the main loop so in-flight writes (and other work) // can complete before teardown. Idempotent. func (walLog *WAL[T]) Close() error { - select { - case <-walLog.ctx.Done(): + err := interuptablePush(walLog.ctx, walLog.closeReqChan, struct{}{}) + if err != nil { // already closed - case walLog.closeReqChan <- struct{}{}: - // close request sent } - err := <-walLog.closeErrChan + err = <-walLog.closeErrChan // "reload" error into channel to make Close() idempotent walLog.closeErrChan <- err @@ -520,3 +503,27 @@ func (walLog *WAL[T]) mainLoop() { walLog.closeErrChan <- nil } } + +// Push to a channel, returning an error if the context is cancelled before the value is pushed. +func interuptablePush[T any](ctx context.Context, ch chan T, value T) error { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled: %w", ctx.Err()) + case ch <- value: + return nil + } +} + +// Pull from a channel, returning an error if the context is cancelled before the value is pulled. +func interuptablePull[T any](ctx context.Context, ch <-chan T) (T, error) { + var zero T + select { + case <-ctx.Done(): + return zero, fmt.Errorf("context cancelled: %w", ctx.Err()) + case value, ok := <-ch: + if !ok { + return zero, fmt.Errorf("channel closed") + } + return value, nil + } +} From fd8151ab48e15ec01b734e1a0d5f3ec42083cc41 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 19 Feb 2026 12:05:06 -0600 Subject: [PATCH 13/16] lint --- sei-db/wal/wal.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index 43c89ea270..00cb907e4a 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -414,12 +414,10 @@ func (walLog *WAL[T]) drain() { // Shut down the WAL. Sends a close request to the main loop so in-flight writes (and other work) // can complete before teardown. Idempotent. func (walLog *WAL[T]) Close() error { - err := interuptablePush(walLog.ctx, walLog.closeReqChan, struct{}{}) - if err != nil { - // already closed - } + _ = interuptablePush(walLog.ctx, walLog.closeReqChan, struct{}{}) + // If error is non-nil then this is not the first call to Close(), no problem since Close() is idempotent - err = <-walLog.closeErrChan + err := <-walLog.closeErrChan // "reload" error into channel to make Close() idempotent walLog.closeErrChan <- err From 0dfc75b70e5277b3d6380250faf5e4d1e85fa632 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 19 Feb 2026 14:59:20 -0600 Subject: [PATCH 14/16] made suggested changes --- sei-db/wal/wal.go | 113 +++++++++++++++++++++++++---------------- sei-db/wal/wal_test.go | 25 ++++++++- 2 files changed, 94 insertions(+), 44 deletions(-) diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index 00cb907e4a..b4b50fdbd6 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -41,6 +41,24 @@ type WAL[T any] struct { closeErrChan chan error } +// A request to truncate the log. +type truncateRequest struct { + // If true, truncate before the provided index. Otherwise, truncate after the provided index. + before bool + // The index to truncate at. + index uint64 + // Errors are returned over this channel, nil is written if completed with no error + errChan chan error +} + +// A request to write to the WAL. +type writeRequest[T any] struct { + // The data to write + entry T + // Errors are returned over this channel, nil is written if completed with no error + errChan chan error +} + // Configuration for the WAL. type Config struct { // The number of recent entries to keep in the log. @@ -98,7 +116,7 @@ func NewWAL[T any]( } bufferSize := config.WriteBufferSize - if config.WriteBufferSize == 0 { + if config.WriteBufferSize <= 0 { bufferSize = defaultBufferSize } @@ -134,14 +152,6 @@ func NewWAL[T any]( } -// A request to write to the WAL. -type writeRequest[T any] struct { - // The data to write - entry T - // Errors are returned over this channel, nil is written if completed with no error - errChan chan error -} - // Write will append a new entry to the end of the log. // Whether the writes is in blocking or async manner depends on the buffer size. // For async writes, this also checks for any previous async write errors. @@ -207,19 +217,7 @@ func (walLog *WAL[T]) handleUnbatchedWrite(req *writeRequest[T]) { // include them in the batch. func (walLog *WAL[T]) handleBatchedWrite(req *writeRequest[T]) { - requests := make([]*writeRequest[T], 0) - requests = append(requests, req) - - keepLooking := true - for keepLooking && len(requests) < walLog.writeBatchSize { - select { - case req := <-walLog.writeChan: - requests = append(requests, req) - default: - // No more pending writes immediately available, so process the batch we have so far. - keepLooking = false - } - } + requests := walLog.gatherRequestsForBatch(req) lastOffset, err := walLog.log.LastIndex() if err != nil { @@ -230,42 +228,71 @@ func (walLog *WAL[T]) handleBatchedWrite(req *writeRequest[T]) { return } - batch := &wal.Batch{} + binaryRequests := walLog.marshalRequests(requests) - for _, req := range requests { - bz, err := walLog.marshal(req.entry) - if err != nil { - err = fmt.Errorf("marshalling error: %w", err) - for _, req := range requests { - req.errChan <- err - } - return - } - batch.Write(lastOffset+1, bz) + batch := &wal.Batch{} + for _, binaryRequest := range binaryRequests { + batch.Write(lastOffset+1, binaryRequest) lastOffset++ } if err := walLog.log.WriteBatch(batch); err != nil { err = fmt.Errorf("failed to write batch: %w", err) for _, r := range requests { - r.errChan <- err + if r.errChan != nil { + r.errChan <- err + } } return } for _, r := range requests { - r.errChan <- nil + if r.errChan != nil { + r.errChan <- nil + } } } -// A request to truncate the log. -type truncateRequest struct { - // If true, truncate before the provided index. Otherwise, truncate after the provided index. - before bool - // The index to truncate at. - index uint64 - // Errors are returned over this channel, nil is written if completed with no error - errChan chan error +// Gather the requests for a batch. When this method is called, we will already have the first request in the batch. +func (walLog *WAL[T]) gatherRequestsForBatch(initialRequest *writeRequest[T]) []*writeRequest[T] { + requests := make([]*writeRequest[T], 0) + requests = append(requests, initialRequest) + + keepLooking := true + for keepLooking && len(requests) < walLog.writeBatchSize { + select { + case next := <-walLog.writeChan: + requests = append(requests, next) + default: + // No more pending writes immediately available, so process the batch we have so far. + keepLooking = false + } + } + + return requests +} + +// Marshal the requests for a batch. If a request can't be marshalled, an error is immediately sent +// to that request's caller. +// +// The requests slice passed into this method is modified if some requests +// are not marshalled successfully. Any request that is not marshalled successfully has its errChan +// set to nil to avoid sending more than one response to the caller. +func (walLog *WAL[T]) marshalRequests(requests []*writeRequest[T]) [][]byte { + binaryRequests := make([][]byte, 0, len(requests)) + + for _, req := range requests { + bz, err := walLog.marshal(req.entry) + if err != nil { + err = fmt.Errorf("marshalling error: %w", err) + req.errChan <- err + req.errChan = nil // signal that we have already sent a response to the caller + continue + } + binaryRequests = append(binaryRequests, bz) + } + + return binaryRequests } // TruncateAfter will remove all entries that are after the provided `index`. diff --git a/sei-db/wal/wal_test.go b/sei-db/wal/wal_test.go index 44aa0ab829..53f3219517 100644 --- a/sei-db/wal/wal_test.go +++ b/sei-db/wal/wal_test.go @@ -140,7 +140,6 @@ func TestSynchronousWrite(t *testing.T) { lastIndex, err := changelog.LastOffset() require.NoError(t, err) require.Equal(t, uint64(3), lastIndex) - } func TestAsyncWrite(t *testing.T) { @@ -679,3 +678,27 @@ func TestLogPath(t *testing.T) { path := LogPath("/some/dir") require.Equal(t, "/some/dir/changelog", path) } + + +func TestMultipleCloseCalls(t *testing.T) { + changelog := prepareTestData(t) + entry, err := changelog.ReadAt(2) + require.NoError(t, err) + require.Equal(t, []byte("hello1"), entry.Changesets[0].Changeset.Pairs[0].Key) + require.Equal(t, []byte("world1"), entry.Changesets[0].Changeset.Pairs[0].Value) + require.Equal(t, []byte("hello2"), entry.Changesets[0].Changeset.Pairs[1].Key) + require.Equal(t, []byte("world2"), entry.Changesets[0].Changeset.Pairs[1].Value) + entry, err = changelog.ReadAt(1) + require.NoError(t, err) + require.Equal(t, []byte("hello"), entry.Changesets[0].Changeset.Pairs[0].Key) + require.Equal(t, []byte("world"), entry.Changesets[0].Changeset.Pairs[0].Value) + entry, err = changelog.ReadAt(3) + require.NoError(t, err) + require.Equal(t, []byte("hello3"), entry.Changesets[0].Changeset.Pairs[0].Key) + require.Equal(t, []byte("world3"), entry.Changesets[0].Changeset.Pairs[0].Value) + + // Calling close lots of times shouldn't cause any problems. + for i := 0; i < 10; i++ { + require.NoError(t, changelog.Close()) + } +} \ No newline at end of file From 87772b899355ceb6cc8db12d68e4b8daabe77446 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 19 Feb 2026 15:06:55 -0600 Subject: [PATCH 15/16] add new test --- sei-db/wal/wal_test.go | 78 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/sei-db/wal/wal_test.go b/sei-db/wal/wal_test.go index 53f3219517..645de5278b 100644 --- a/sei-db/wal/wal_test.go +++ b/sei-db/wal/wal_test.go @@ -679,6 +679,82 @@ func TestLogPath(t *testing.T) { require.Equal(t, "/some/dir/changelog", path) } +// batchTestEntry is a simple type for testing batch marshal failures. +type batchTestEntry struct { + value string +} + +func TestBatchWriteWithMarshalFailure(t *testing.T) { + dir := t.TempDir() + + // Marshal fails for entries with value "fail" + marshalBatchTest := func(e batchTestEntry) ([]byte, error) { + if e.value == "fail" { + return nil, fmt.Errorf("mock marshal failure") + } + return []byte(e.value), nil + } + unmarshalBatchTest := func(b []byte) (batchTestEntry, error) { + return batchTestEntry{value: string(b)}, nil + } + + // Use sync writes (WriteBufferSize 0) and batching (WriteBatchSize 4) + // so we can observe per-write errors. The channel buffer allows multiple + // goroutines to push before the handler runs, forming a batch. + config := Config{ + WriteBufferSize: 0, // sync writes + WriteBatchSize: 4, // batch up to 4 + } + + w, err := NewWAL(t.Context(), marshalBatchTest, unmarshalBatchTest, logger.NewNopLogger(), dir, config) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, w.Close()) }) + + // Write 4 entries concurrently so they get batched. The second one will fail to marshal. + entries := []batchTestEntry{ + {value: "ok1"}, + {value: "fail"}, + {value: "ok2"}, + {value: "ok3"}, + } + + var wg sync.WaitGroup + errs := make([]error, 4) + for i := range entries { + wg.Add(1) + go func(idx int) { + defer wg.Done() + errs[idx] = w.Write(entries[idx]) + }(i) + } + wg.Wait() + + // The "fail" entry should have errored + require.Error(t, errs[1]) + require.Contains(t, errs[1].Error(), "mock marshal failure") + + // The successful entries should have no error + require.NoError(t, errs[0]) + require.NoError(t, errs[2]) + require.NoError(t, errs[3]) + + // The WAL should contain exactly 3 entries (the successfully marshalled ones; "fail" is skipped) + lastOffset, err := w.LastOffset() + require.NoError(t, err) + require.Equal(t, uint64(3), lastOffset) + + // Goroutines may push in any order, so we collect the written values and verify we have ok1, ok2, ok3 + written := make(map[string]bool) + for i := uint64(1); i <= 3; i++ { + e, err := w.ReadAt(i) + require.NoError(t, err) + written[e.value] = true + } + require.True(t, written["ok1"], "expected ok1 in WAL") + require.True(t, written["ok2"], "expected ok2 in WAL") + require.True(t, written["ok3"], "expected ok3 in WAL") + require.False(t, written["fail"], "fail should not be in WAL") +} func TestMultipleCloseCalls(t *testing.T) { changelog := prepareTestData(t) @@ -701,4 +777,4 @@ func TestMultipleCloseCalls(t *testing.T) { for i := 0; i < 10; i++ { require.NoError(t, changelog.Close()) } -} \ No newline at end of file +} From 7163b643835222d7ad7900068bf745b78e67476d Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Fri, 20 Feb 2026 09:01:34 -0600 Subject: [PATCH 16/16] fix race condition --- sei-db/wal/wal.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sei-db/wal/wal.go b/sei-db/wal/wal.go index b4b50fdbd6..0a8cb48dd8 100644 --- a/sei-db/wal/wal.go +++ b/sei-db/wal/wal.go @@ -157,9 +157,10 @@ func NewWAL[T any]( // For async writes, this also checks for any previous async write errors. func (walLog *WAL[T]) Write(entry T) error { + errChan := make(chan error, 1) req := &writeRequest[T]{ entry: entry, - errChan: make(chan error, 1), + errChan: errChan, } err := interuptablePush(walLog.ctx, walLog.writeChan, req) @@ -172,7 +173,7 @@ func (walLog *WAL[T]) Write(entry T) error { return nil } - err, pullErr := interuptablePull(walLog.ctx, req.errChan) + err, pullErr := interuptablePull(walLog.ctx, errChan) if pullErr != nil { return fmt.Errorf("failed to pull write error: %w", pullErr) }