Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
7838231
feat: add graceful shutdown for machine manager and inspect server
vfusco Feb 28, 2026
8a9c42b
refactor: unify machine synchronization paths with batched input replay
vfusco Feb 28, 2026
6724dfb
fix: query previous snapshot before DB write to prevent TOCTOU race
vfusco Feb 28, 2026
c564871
fix(machine): replace panics with errors and propagate context betwee…
vfusco Feb 28, 2026
264eeba
fix(build): export MACOSX_DEPLOYMENT_TARGET in make env target
vfusco Feb 28, 2026
dcdbabd
refactor(machine): replace Advance 7-value return with AdvanceRespons…
vfusco Feb 28, 2026
885e621
fix(manager): prevent machine instance leak if addMachine fails
vfusco Feb 28, 2026
a625739
fix(advancer): bound memory usage with batched input fetching
vfusco Mar 3, 2026
a031a93
fix(manager): prevent fork leak and inconsistent state on machine clo…
vfusco Mar 3, 2026
7351f33
fix(manager): add timeout to Close to prevent indefinite blocking on …
vfusco Mar 3, 2026
6e50820
fix(advancer): trigger node shutdown on store failure after machine a…
vfusco Mar 3, 2026
95bdb0f
fix(manager): destroy machine runtime on fatal errors to prevent zomb…
vfusco Feb 28, 2026
1c54c59
fix(advancer): cap reports, release memory, and harden error handling
vfusco Mar 4, 2026
b0b6388
test: prevent goroutine leak in pmutex contested lock tests
vfusco Mar 4, 2026
9ec2cd0
fix(manager): reject new machines after Close to prevent post-shutdow…
vfusco Mar 5, 2026
9e843c0
test: improve advancer, manager and machine tests
vfusco Mar 5, 2026
e69b14d
refactor(manager): inject machine factory via functional options and …
vfusco Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ env:
@echo export CARTESI_TEST_DATABASE_CONNECTION="postgres://test_user:password@localhost:5432/test_rollupsdb?sslmode=disable"
@echo export CARTESI_TEST_MACHINE_IMAGES_PATH=\"$(CARTESI_TEST_MACHINE_IMAGES_PATH)\"
@echo export PATH=\"$(CURDIR):$$PATH\"
@$(if $(MACOSX_DEPLOYMENT_TARGET),echo export MACOSX_DEPLOYMENT_TARGET=\"$(MACOSX_DEPLOYMENT_TARGET)\")

# =============================================================================
# Artifacts
Expand Down
2 changes: 1 addition & 1 deletion cmd/cartesi-rollups-cli/root/read/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var Cmd = &cobra.Command{
Short: "Read the node state from the database",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if !cmd.Flags().Changed("jsonrpc") && cmd.Flags().Changed("jsonrpc-api-url") {
if err:= cmd.Flags().Set("jsonrpc", "true"); err != nil {
if err := cmd.Flags().Set("jsonrpc", "true"); err != nil {
return err
}
}
Expand Down
114 changes: 79 additions & 35 deletions internal/advancer/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"errors"
"fmt"
"os"
"path"
"path/filepath"
"strings"

"github.com/cartesi/rollups-node/internal/manager"
Expand Down Expand Up @@ -45,10 +45,16 @@ func getUnprocessedEpochs(ctx context.Context, er AdvancerRepository, address st
return er.ListEpochs(ctx, address, f, repository.Pagination{}, false)
}

// getUnprocessedInputs retrieves inputs that haven't been processed yet
func getUnprocessedInputs(ctx context.Context, repo AdvancerRepository, appAddress string, epochIndex uint64) ([]*Input, uint64, error) {
// getUnprocessedInputs retrieves inputs that haven't been processed yet with pagination support.
func getUnprocessedInputs(
ctx context.Context,
repo AdvancerRepository,
appAddress string,
epochIndex uint64,
batchSize uint64,
) ([]*Input, uint64, error) {
f := repository.InputFilter{Status: Pointer(InputCompletionStatus_None), EpochIndex: &epochIndex}
return repo.ListInputs(ctx, appAddress, f, repository.Pagination{}, false)
return repo.ListInputs(ctx, appAddress, f, repository.Pagination{Limit: batchSize}, false)
}

// Step performs one processing cycle of the advancer
Expand Down Expand Up @@ -78,17 +84,7 @@ func (s *Service) Step(ctx context.Context) error {
}

for _, epoch := range epochs {
// Get unprocessed inputs for this application
s.Logger.Debug("Querying for unprocessed inputs", "application", app.Name, "epoch_index", epoch.Index)
inputs, _, err := getUnprocessedInputs(ctx, s.repository, appAddress, epoch.Index)
if err != nil {
return err
}

// Process the inputs
s.Logger.Debug("Processing inputs", "application", app.Name, "epoch_index", epoch.Index, "count", len(inputs))
err = s.processInputs(ctx, app, inputs)
if err != nil {
if err := s.processEpochInputs(ctx, app, epoch.Index); err != nil {
return err
}

Expand Down Expand Up @@ -117,6 +113,26 @@ func (s *Service) Step(ctx context.Context) error {
return nil
}

// processEpochInputs fetches and processes unprocessed inputs for an epoch in batches.
// Processed inputs change status and drop out of the filter, so each batch fetches from offset 0.
func (s *Service) processEpochInputs(ctx context.Context, app *Application, epochIndex uint64) error {
appAddress := app.IApplicationAddress.String()
for {
inputs, _, err := getUnprocessedInputs(ctx, s.repository, appAddress, epochIndex, s.inputBatchSize)
if err != nil {
return err
}
if len(inputs) == 0 {
return nil
}
s.Logger.Debug("Processing inputs",
"application", app.Name, "epoch_index", epochIndex, "count", len(inputs))
if err := s.processInputs(ctx, app, inputs); err != nil {
return err
}
}
}

func (s *Service) isAllEpochInputsProcessed(app *Application, epoch *Epoch) (bool, error) {
// epoch has no inputs
if epoch.InputIndexLowerBound == epoch.InputIndexUpperBound {
Expand Down Expand Up @@ -159,6 +175,7 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []

// Advance the machine with this input
result, err := machine.Advance(ctx, input.RawData, input.EpochIndex, input.Index, app.IsDaveConsensus())
input.RawData = nil // allow GC to collect payload while batch continues
if err != nil {
// If there's an error, mark the application as inoperable
s.Logger.Error("Error executing advance",
Expand All @@ -179,6 +196,17 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []
"error", updateErr)
}

// Eagerly close the machine to release the child process.
// The app is already inoperable, so no further operations will succeed.
// Skip if the runtime was already destroyed inside the manager.
if !errors.Is(err, manager.ErrMachineClosed) {
if closeErr := machine.Close(); closeErr != nil {
s.Logger.Warn("Failed to close machine after advance error",
"application", app.Name,
"error", closeErr)
}
}

return err
}
// log advance result hashes
Expand All @@ -196,10 +224,18 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []
// Store the result in the database
err = s.repository.StoreAdvanceResult(ctx, input.EpochApplicationID, result)
if err != nil {
s.Logger.Error("Failed to store advance result",
// Machine state is now ahead of the database. This desync is
// unrecoverable without a restart — regardless of whether the
// failure was a DB error or a context timeout. Shut down the
// node so it can restart cleanly from the last snapshot.
s.Logger.Error(
"FATAL: failed to store advance result after machine state "+
"was already updated — shutting down to prevent permanent desync",
"application", app.Name,
"epoch", input.EpochIndex,
"index", input.Index,
"error", err)
s.Cancel() // triggers graceful shutdown of all services
return err
}

Expand Down Expand Up @@ -281,8 +317,15 @@ func (s *Service) handleEpochAfterInputsProcessed(ctx context.Context, app *Appl
if !exists {
return fmt.Errorf("%w: %d", ErrNoApp, app.ID)
}
outputsProof, err := machine.OutputsProof(ctx, 0)
outputsProof, err := machine.OutputsProof(ctx)
if err != nil {
// If the runtime was destroyed (e.g., child process crashed),
// mark the app inoperable to avoid an infinite retry loop.
if errors.Is(err, manager.ErrMachineClosed) {
reason := err.Error()
_ = s.repository.UpdateApplicationState(ctx, app.ID,
ApplicationState_Inoperable, &reason)
}
return fmt.Errorf("failed to get outputs proof from machine: %w", err)
}
err = s.repository.UpdateEpochOutputsProof(ctx, app.ID, epoch.Index, outputsProof)
Expand Down Expand Up @@ -342,7 +385,7 @@ func (s *Service) createSnapshot(ctx context.Context, app *Application, machine
// Generate a snapshot path with a simpler structure
// Use app name and input index only, avoiding deep directory nesting
snapshotName := fmt.Sprintf("%s_epoch%d_input%d", app.Name, input.EpochIndex, input.Index)
snapshotPath := path.Join(s.snapshotsDir, snapshotName)
snapshotPath := filepath.Join(s.snapshotsDir, snapshotName)

s.Logger.Info("Creating snapshot",
"application", app.Name,
Expand All @@ -351,10 +394,8 @@ func (s *Service) createSnapshot(ctx context.Context, app *Application, machine
"path", snapshotPath)

// Ensure the parent directory exists
if _, err := os.Stat(s.snapshotsDir); os.IsNotExist(err) {
if err := os.MkdirAll(s.snapshotsDir, 0755); err != nil { //nolint: mnd
return fmt.Errorf("failed to create snapshots directory: %w", err)
}
if err := os.MkdirAll(s.snapshotsDir, 0755); err != nil { //nolint: mnd
return fmt.Errorf("failed to create snapshots directory: %w", err)
}

// Create the snapshot
Expand All @@ -363,6 +404,16 @@ func (s *Service) createSnapshot(ctx context.Context, app *Application, machine
return err
}

// Get previous snapshot BEFORE writing the new one so the query does not
// return the snapshot we just created — that would cause self-deletion.
previousSnapshot, err := s.repository.GetLastSnapshot(ctx, app.IApplicationAddress.String())
if err != nil {
s.Logger.Error("Failed to get previous snapshot",
"application", app.Name,
"error", err)
// Continue even if we can't get the previous snapshot
}

// Update the input record with the snapshot URI
input.SnapshotURI = &snapshotPath

Expand All @@ -372,18 +423,8 @@ func (s *Service) createSnapshot(ctx context.Context, app *Application, machine
return fmt.Errorf("failed to update input snapshot URI: %w", err)
}

// Get previous snapshot if it exists
previousSnapshot, err := s.repository.GetLastSnapshot(ctx, app.IApplicationAddress.String())
if err != nil {
s.Logger.Error("Failed to get previous snapshot",
"application", app.Name,
"error", err)
// Continue even if we can't get the previous snapshot
}

// Remove previous snapshot if it exists
if previousSnapshot != nil && previousSnapshot.Index != input.Index && previousSnapshot.SnapshotURI != nil {
// Only remove if it's a different snapshot than the one we just created
if previousSnapshot != nil && previousSnapshot.SnapshotURI != nil {
if err := s.removeSnapshot(*previousSnapshot.SnapshotURI, app.Name); err != nil {
s.Logger.Error("Failed to remove previous snapshot",
"application", app.Name,
Expand All @@ -398,8 +439,11 @@ func (s *Service) createSnapshot(ctx context.Context, app *Application, machine

// removeSnapshot safely removes a previous snapshot
func (s *Service) removeSnapshot(snapshotPath string, appName string) error {
// Safety check: ensure the path contains the application name and is in the snapshots directory
if !strings.HasPrefix(snapshotPath, s.snapshotsDir) || !strings.Contains(snapshotPath, appName) {
// Safety check: canonicalize paths to prevent directory traversal via ".." sequences
cleanPath := filepath.Clean(snapshotPath)
cleanDir := filepath.Clean(s.snapshotsDir)
if !strings.HasPrefix(cleanPath, cleanDir+string(filepath.Separator)) ||
!strings.HasPrefix(filepath.Base(cleanPath), appName+"_") {
return fmt.Errorf("invalid snapshot path: %s", snapshotPath)
}

Expand Down
Loading
Loading