From fae0c4db7e6a273ab32d1bcb4892ee0585f6ef84 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 07:30:00 +0000 Subject: [PATCH 01/26] Probe replay before persisting exception logs --- src/ActivityStub.php | 19 ++++ src/ChildWorkflowStub.php | 19 ++++ src/Exception.php | 69 ++++++++++++++- src/Traits/Awaits.php | 6 ++ src/Traits/SideEffects.php | 5 ++ src/Traits/Timers.php | 6 ++ src/Traits/Versions.php | 5 ++ src/Workflow.php | 18 +++- src/WorkflowStub.php | 38 +++++++- tests/Feature/ExceptionLoggingReplayTest.php | 88 +++++++++++++++++++ .../Fixtures/TestProbeBackToBackWorkflow.php | 28 ++++++ ...tProbeChildFailureCompensationActivity.php | 15 ++++ ...estProbeChildFailureParentStepActivity.php | 15 ++++ .../TestProbeChildFailureParentWorkflow.php | 35 ++++++++ .../TestProbeChildFailureWorkflow.php | 16 ++++ .../TestProbeParallelChildWorkflow.php | 27 ++++++ tests/Fixtures/TestProbeRetryActivity.php | 23 +++++ tests/Fixtures/TestProbeRetryWorkflow.php | 41 +++++++++ tests/Unit/ExceptionTest.php | 46 ++++++++-- 19 files changed, 508 insertions(+), 11 deletions(-) create mode 100644 tests/Feature/ExceptionLoggingReplayTest.php create mode 100644 tests/Fixtures/TestProbeBackToBackWorkflow.php create mode 100644 tests/Fixtures/TestProbeChildFailureCompensationActivity.php create mode 100644 tests/Fixtures/TestProbeChildFailureParentStepActivity.php create mode 100644 tests/Fixtures/TestProbeChildFailureParentWorkflow.php create mode 100644 tests/Fixtures/TestProbeChildFailureWorkflow.php create mode 100644 tests/Fixtures/TestProbeParallelChildWorkflow.php create mode 100644 tests/Fixtures/TestProbeRetryActivity.php create mode 100644 tests/Fixtures/TestProbeRetryWorkflow.php diff --git a/src/ActivityStub.php b/src/ActivityStub.php index ed8f2ca2..7ef2caa6 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -51,6 +51,18 @@ public static function make($activity, ...$arguments): PromiseInterface } if ($log) { + if ( + WorkflowStub::isProbing() + && WorkflowStub::probeIndex() === $context->index + && ( + WorkflowStub::probeClass() === null + || WorkflowStub::probeClass() === $activity + ) + && $log->class === Exception::class + ) { + WorkflowStub::markProbeMatched(); + } + ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); @@ -74,6 +86,13 @@ public static function make($activity, ...$arguments): PromiseInterface return resolve($result); } + if (WorkflowStub::isProbing()) { + ++$context->index; + WorkflowStub::setContext($context); + $deferred = new Deferred(); + return $deferred->promise(); + } + $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 896237a3..54f40d91 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -45,6 +45,18 @@ public static function make($workflow, ...$arguments): PromiseInterface } if ($log) { + if ( + WorkflowStub::isProbing() + && WorkflowStub::probeIndex() === $context->index + && ( + WorkflowStub::probeClass() === null + || WorkflowStub::probeClass() === $workflow + ) + && $log->class === Exception::class + ) { + WorkflowStub::markProbeMatched(); + } + ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); @@ -67,6 +79,13 @@ public static function make($workflow, ...$arguments): PromiseInterface return resolve($result); } + if (WorkflowStub::isProbing()) { + ++$context->index; + WorkflowStub::setContext($context); + $deferred = new Deferred(); + return $deferred->promise(); + } + if (! $context->replaying) { $storedChildWorkflow = $context->storedWorkflow->children() ->wherePivot('parent_index', $context->index) diff --git a/src/Exception.php b/src/Exception.php index de177e51..74ca4bec 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -10,9 +10,11 @@ use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Throwable; use Workflow\Exceptions\TransitionNotFound; use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; +use Workflow\Serializers\Serializer; final class Exception implements ShouldBeEncrypted, ShouldQueue { @@ -35,7 +37,8 @@ public function __construct( public StoredWorkflow $storedWorkflow, public $exception, $connection = null, - $queue = null + $queue = null, + public ?string $sourceClass = null ) { $connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default'); $queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config( @@ -53,7 +56,7 @@ public function handle() try { if ($this->storedWorkflow->hasLogByIndex($this->index)) { $workflow->resume(); - } elseif (! $this->storedWorkflow->logs()->where('class', self::class)->exists()) { + } elseif ($this->shouldPersistAfterProbeReplay()) { $workflow->next($this->index, $this->now, self::class, $this->exception); } } catch (TransitionNotFound) { @@ -74,4 +77,66 @@ public function middleware() ), ]; } + + private function shouldPersistAfterProbeReplay(): bool + { + $workflowClass = $this->storedWorkflow->class; + + if (! is_string($workflowClass) || $workflowClass === '') { + return true; + } + + $previousContext = WorkflowStub::getContext(); + $connection = $this->storedWorkflow->getConnection(); + $shouldPersist = false; + + $connection->beginTransaction(); + + try { + $tentativeWorkflow = $this->createTentativeWorkflowState(); + $workflow = new $workflowClass($tentativeWorkflow, ...$tentativeWorkflow->workflowArguments()); + $workflow->replaying = true; + + WorkflowStub::setContext([ + 'storedWorkflow' => $tentativeWorkflow, + 'index' => 0, + 'now' => $this->now, + 'replaying' => true, + 'probing' => true, + 'probeIndex' => $this->index, + 'probeClass' => $this->sourceClass, + 'probeMatched' => false, + ]); + + try { + $workflow->handle(); + } catch (Throwable) { + // The replay path may still throw; we only care whether it matched this tentative log. + } + + $shouldPersist = WorkflowStub::probeMatched(); + } finally { + WorkflowStub::setContext($previousContext); + + if ($connection->transactionLevel() > 0) { + $connection->rollBack(); + } + } + + return $shouldPersist; + } + + private function createTentativeWorkflowState(): StoredWorkflow + { + $this->storedWorkflow->createLog([ + 'index' => $this->index, + 'now' => $this->now, + 'class' => self::class, + 'result' => Serializer::serialize($this->exception), + ]); + + $storedWorkflowClass = $this->storedWorkflow::class; + + return $storedWorkflowClass::query()->findOrFail($this->storedWorkflow->id); + } } diff --git a/src/Traits/Awaits.php b/src/Traits/Awaits.php index 3bd5ae17..7df13ada 100644 --- a/src/Traits/Awaits.php +++ b/src/Traits/Awaits.php @@ -22,6 +22,12 @@ public static function await($condition): PromiseInterface return resolve(Serializer::unserialize($log->result)); } + if (self::isProbing()) { + ++self::$context->index; + $deferred = new Deferred(); + return $deferred->promise(); + } + $result = $condition(); if ($result === true) { diff --git a/src/Traits/SideEffects.php b/src/Traits/SideEffects.php index e4a4db0c..c271385e 100644 --- a/src/Traits/SideEffects.php +++ b/src/Traits/SideEffects.php @@ -20,6 +20,11 @@ public static function sideEffect($callable): PromiseInterface return resolve(Serializer::unserialize($log->result)); } + if (self::isProbing()) { + ++self::$context->index; + return (new \React\Promise\Deferred())->promise(); + } + $result = $callable(); if (! self::$context->replaying) { diff --git a/src/Traits/Timers.php b/src/Traits/Timers.php index 538f9502..f2fc6a32 100644 --- a/src/Traits/Timers.php +++ b/src/Traits/Timers.php @@ -41,6 +41,12 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa $when = self::$context->now->copy() ->addSeconds($seconds); + if (self::isProbing()) { + ++self::$context->index; + $deferred = new Deferred(); + return $deferred->promise(); + } + if (! self::$context->replaying) { $timer = self::$context->storedWorkflow->createTimer([ 'index' => self::$context->index, diff --git a/src/Traits/Versions.php b/src/Traits/Versions.php index d8fd011f..3dc2307c 100644 --- a/src/Traits/Versions.php +++ b/src/Traits/Versions.php @@ -33,6 +33,11 @@ public static function getVersion( return resolve($version); } + if (self::isProbing()) { + ++self::$context->index; + return (new \React\Promise\Deferred())->promise(); + } + $version = $maxSupported; if (! self::$context->replaying) { diff --git a/src/Workflow.php b/src/Workflow.php index 20b80b6e..70e5957d 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -209,7 +209,7 @@ public function handle(): void $this->now = $log ? $log->now : Carbon::now(); } - WorkflowStub::setContext([ + $this->setContext([ 'storedWorkflow' => $this->storedWorkflow, 'index' => $this->index, 'now' => $this->now, @@ -229,7 +229,7 @@ public function handle(): void $this->now = $log ? $log->now : Carbon::now(); - WorkflowStub::setContext([ + $this->setContext([ 'storedWorkflow' => $this->storedWorkflow, 'index' => $this->index, 'now' => $this->now, @@ -309,4 +309,18 @@ public function handle(): void } } } + + private function setContext(array $context): void + { + $existingContext = WorkflowStub::getContext(); + + if (property_exists($existingContext, 'probing') && $existingContext->probing) { + $context['probing'] = true; + $context['probeIndex'] = $existingContext->probeIndex ?? null; + $context['probeClass'] = $existingContext->probeClass ?? null; + $context['probeMatched'] = $existingContext->probeMatched ?? false; + } + + WorkflowStub::setContext($context); + } } diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 5aabead1..095e294b 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -157,6 +157,10 @@ public static function fromStoredWorkflow(StoredWorkflow $storedWorkflow): stati public static function getContext(): \stdClass { + if (self::$context === null) { + self::$context = new \stdClass(); + } + return self::$context; } @@ -170,6 +174,35 @@ public static function now() return self::getContext()->now; } + public static function isProbing(): bool + { + return (bool) (self::getContext()->probing ?? false); + } + + public static function probeIndex(): ?int + { + return self::getContext()->probeIndex ?? null; + } + + public static function probeClass(): ?string + { + return self::getContext()->probeClass ?? null; + } + + public static function markProbeMatched(): void + { + if (! self::isProbing()) { + return; + } + + self::$context->probeMatched = true; + } + + public static function probeMatched(): bool + { + return (bool) (self::getContext()->probeMatched ?? false); + } + public function id() { return $this->storedWorkflow->id; @@ -287,7 +320,7 @@ public function fail($exception): void ->format('Y-m-d\TH:i:s.u\Z')); $this->storedWorkflow->parents() - ->each(static function ($parentWorkflow) use ($exception) { + ->each(function ($parentWorkflow) use ($exception) { if ( $parentWorkflow->pivot->parent_index === StoredWorkflow::CONTINUE_PARENT_INDEX || $parentWorkflow->pivot->parent_index === StoredWorkflow::ACTIVE_WORKFLOW_INDEX @@ -324,7 +357,8 @@ public function fail($exception): void $parentWorkflow, $throwable, $parentWf->connection(), - $parentWf->queue() + $parentWf->queue(), + $this->storedWorkflow->class ); }); } diff --git a/tests/Feature/ExceptionLoggingReplayTest.php b/tests/Feature/ExceptionLoggingReplayTest.php new file mode 100644 index 00000000..4c2dcc35 --- /dev/null +++ b/tests/Feature/ExceptionLoggingReplayTest.php @@ -0,0 +1,88 @@ +start(); + + sleep(1); + $workflow->requestRetry(); + + sleep(1); + $workflow->requestRetry(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('success', $workflow->output()); + $this->assertSame([ + Exception::class, + Signal::class, + Exception::class, + Signal::class, + TestProbeRetryActivity::class, + ], $classes); + } + + public function testBackToBackCaughtExceptionsEachPersist(): void + { + $workflow = WorkflowStub::make(TestProbeBackToBackWorkflow::class); + + $workflow->start(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('caught second: second failure', $workflow->output()); + $this->assertSame([Exception::class, Exception::class], $classes); + } + + public function testParallelChildFailuresStillDeduplicateToOneParentException(): void + { + $workflow = WorkflowStub::make(TestProbeChildFailureParentWorkflow::class); + + $workflow->start(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('caught: child failed: child-1', $workflow->output()); + $this->assertSame([ + TestProbeChildFailureParentStepActivity::class, + Exception::class, + TestProbeChildFailureCompensationActivity::class, + ], $classes); + $this->assertSame(1, $workflow->logs()->where('class', Exception::class)->count()); + } +} diff --git a/tests/Fixtures/TestProbeBackToBackWorkflow.php b/tests/Fixtures/TestProbeBackToBackWorkflow.php new file mode 100644 index 00000000..abc0fc6a --- /dev/null +++ b/tests/Fixtures/TestProbeBackToBackWorkflow.php @@ -0,0 +1,28 @@ +getMessage(); + } + + return 'unexpected-success'; + } +} diff --git a/tests/Fixtures/TestProbeChildFailureCompensationActivity.php b/tests/Fixtures/TestProbeChildFailureCompensationActivity.php new file mode 100644 index 00000000..d2ea9568 --- /dev/null +++ b/tests/Fixtures/TestProbeChildFailureCompensationActivity.php @@ -0,0 +1,15 @@ +addCompensation(fn () => activity(TestProbeChildFailureCompensationActivity::class)); + + yield all([ + child(TestProbeChildFailureWorkflow::class, 'child-1'), + child(TestProbeChildFailureWorkflow::class, 'child-2'), + child(TestProbeChildFailureWorkflow::class, 'child-3'), + ]); + + return 'unexpected-success'; + } catch (Throwable $throwable) { + yield from $this->compensate(); + + return 'caught: ' . $throwable->getMessage(); + } + } +} diff --git a/tests/Fixtures/TestProbeChildFailureWorkflow.php b/tests/Fixtures/TestProbeChildFailureWorkflow.php new file mode 100644 index 00000000..ca36619b --- /dev/null +++ b/tests/Fixtures/TestProbeChildFailureWorkflow.php @@ -0,0 +1,16 @@ + throw new RuntimeException('first failure'), + 2 => throw new InvalidArgumentException('second failure'), + default => 'success', + }; + } +} diff --git a/tests/Fixtures/TestProbeRetryWorkflow.php b/tests/Fixtures/TestProbeRetryWorkflow.php new file mode 100644 index 00000000..05e07813 --- /dev/null +++ b/tests/Fixtures/TestProbeRetryWorkflow.php @@ -0,0 +1,41 @@ +inbox->receive('retry'); + } + + public function execute() + { + $attempt = 0; + + while (true) { + try { + ++$attempt; + + return yield activity(TestProbeRetryActivity::class, $attempt); + } catch (Throwable $throwable) { + if ($attempt >= 3) { + throw $throwable; + } + + yield await(fn (): bool => $this->inbox->hasUnread()); + + $this->inbox->nextUnread(); + } + } + } +} diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 1f6cf3ca..d01f7aa6 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -4,6 +4,10 @@ namespace Tests\Unit; +use Tests\Fixtures\TestProbeBackToBackWorkflow; +use Tests\Fixtures\TestProbeChildFailureWorkflow; +use Tests\Fixtures\TestProbeParallelChildWorkflow; +use Tests\Fixtures\TestProbeRetryActivity; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\Exception; @@ -44,9 +48,9 @@ public function testExceptionWorkflowRunning(): void $this->assertSame(WorkflowRunningStatus::class, $workflow->status()); } - public function testSkipsWriteWhenSiblingExceptionLogExists(): void + public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void { - $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); $storedWorkflow->update([ 'arguments' => Serializer::serialize([]), @@ -61,19 +65,51 @@ public function testSkipsWriteWhenSiblingExceptionLogExists(): void 'class' => Exception::class, 'result' => Serializer::serialize([ 'class' => \Exception::class, - 'message' => 'first child failed', + 'message' => 'child failed: child-1', 'code' => 0, ]), ]); $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ 'class' => \Exception::class, - 'message' => 'second child failed', + 'message' => 'child failed: child-2', 'code' => 0, - ]); + ], sourceClass: TestProbeChildFailureWorkflow::class); $exception->handle(); $this->assertFalse($storedWorkflow->hasLogByIndex(1)); $this->assertSame(1, $storedWorkflow->logs()->count()); } + + public function testPersistsWriteWhenProbeReachesCandidateException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeBackToBackWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => \RuntimeException::class, + 'message' => 'first failure', + 'code' => 0, + ]), + ]); + + $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => \InvalidArgumentException::class, + 'message' => 'second failure', + 'code' => 0, + ], sourceClass: TestProbeRetryActivity::class); + $exception->handle(); + + $this->assertTrue($storedWorkflow->fresh()->hasLogByIndex(1)); + } } From 5f61620518cd326df9e75bf352b78957ce9b7b28 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 12:59:29 +0000 Subject: [PATCH 02/26] Fix ECS issues in probe fixtures --- tests/Fixtures/TestProbeBackToBackWorkflow.php | 2 +- tests/Fixtures/TestProbeChildFailureParentWorkflow.php | 4 ++-- tests/Fixtures/TestProbeParallelChildWorkflow.php | 2 +- tests/Fixtures/TestProbeRetryWorkflow.php | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/Fixtures/TestProbeBackToBackWorkflow.php b/tests/Fixtures/TestProbeBackToBackWorkflow.php index abc0fc6a..06396d64 100644 --- a/tests/Fixtures/TestProbeBackToBackWorkflow.php +++ b/tests/Fixtures/TestProbeBackToBackWorkflow.php @@ -5,8 +5,8 @@ namespace Tests\Fixtures; use Throwable; -use Workflow\Workflow; use function Workflow\activity; +use Workflow\Workflow; final class TestProbeBackToBackWorkflow extends Workflow { diff --git a/tests/Fixtures/TestProbeChildFailureParentWorkflow.php b/tests/Fixtures/TestProbeChildFailureParentWorkflow.php index 945749a8..a2a81677 100644 --- a/tests/Fixtures/TestProbeChildFailureParentWorkflow.php +++ b/tests/Fixtures/TestProbeChildFailureParentWorkflow.php @@ -5,10 +5,10 @@ namespace Tests\Fixtures; use Throwable; -use Workflow\Workflow; use function Workflow\activity; use function Workflow\all; use function Workflow\child; +use Workflow\Workflow; final class TestProbeChildFailureParentWorkflow extends Workflow { @@ -17,7 +17,7 @@ public function execute() try { yield activity(TestProbeChildFailureParentStepActivity::class); - $this->addCompensation(fn () => activity(TestProbeChildFailureCompensationActivity::class)); + $this->addCompensation(static fn () => activity(TestProbeChildFailureCompensationActivity::class)); yield all([ child(TestProbeChildFailureWorkflow::class, 'child-1'), diff --git a/tests/Fixtures/TestProbeParallelChildWorkflow.php b/tests/Fixtures/TestProbeParallelChildWorkflow.php index 827958cd..bc3ff582 100644 --- a/tests/Fixtures/TestProbeParallelChildWorkflow.php +++ b/tests/Fixtures/TestProbeParallelChildWorkflow.php @@ -5,9 +5,9 @@ namespace Tests\Fixtures; use Throwable; -use Workflow\Workflow; use function Workflow\all; use function Workflow\child; +use Workflow\Workflow; final class TestProbeParallelChildWorkflow extends Workflow { diff --git a/tests/Fixtures/TestProbeRetryWorkflow.php b/tests/Fixtures/TestProbeRetryWorkflow.php index 05e07813..ec80b0cb 100644 --- a/tests/Fixtures/TestProbeRetryWorkflow.php +++ b/tests/Fixtures/TestProbeRetryWorkflow.php @@ -5,10 +5,10 @@ namespace Tests\Fixtures; use Throwable; -use Workflow\SignalMethod; -use Workflow\Workflow; use function Workflow\activity; use function Workflow\await; +use Workflow\SignalMethod; +use Workflow\Workflow; final class TestProbeRetryWorkflow extends Workflow { From bfb3a650869e5a0b4adcff2c04bce5fcdeb67cb6 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 13:04:53 +0000 Subject: [PATCH 03/26] Import Deferred in probe code --- src/Traits/SideEffects.php | 3 ++- src/Traits/Versions.php | 3 ++- tests/Unit/ExceptionTest.php | 15 +++++++++------ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/Traits/SideEffects.php b/src/Traits/SideEffects.php index c271385e..72dd5d79 100644 --- a/src/Traits/SideEffects.php +++ b/src/Traits/SideEffects.php @@ -5,6 +5,7 @@ namespace Workflow\Traits; use Illuminate\Database\QueryException; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use function React\Promise\resolve; use Workflow\Serializers\Serializer; @@ -22,7 +23,7 @@ public static function sideEffect($callable): PromiseInterface if (self::isProbing()) { ++self::$context->index; - return (new \React\Promise\Deferred())->promise(); + return (new Deferred())->promise(); } $result = $callable(); diff --git a/src/Traits/Versions.php b/src/Traits/Versions.php index 3dc2307c..7d841fa1 100644 --- a/src/Traits/Versions.php +++ b/src/Traits/Versions.php @@ -5,6 +5,7 @@ namespace Workflow\Traits; use Illuminate\Database\QueryException; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use function React\Promise\resolve; use Workflow\Exceptions\VersionNotSupportedException; @@ -35,7 +36,7 @@ public static function getVersion( if (self::isProbing()) { ++self::$context->index; - return (new \React\Promise\Deferred())->promise(); + return (new Deferred())->promise(); } $version = $maxSupported; diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index d01f7aa6..326577fa 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -4,6 +4,9 @@ namespace Tests\Unit; +use Exception as BaseException; +use InvalidArgumentException; +use RuntimeException; use Tests\Fixtures\TestProbeBackToBackWorkflow; use Tests\Fixtures\TestProbeChildFailureWorkflow; use Tests\Fixtures\TestProbeParallelChildWorkflow; @@ -21,7 +24,7 @@ final class ExceptionTest extends TestCase { public function testMiddleware(): void { - $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new \Exception( + $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new BaseException( 'Test exception' )); @@ -42,7 +45,7 @@ public function testExceptionWorkflowRunning(): void 'status' => WorkflowRunningStatus::$name, ]); - $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new \Exception('Test exception')); + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new BaseException('Test exception')); $exception->handle(); $this->assertSame(WorkflowRunningStatus::class, $workflow->status()); @@ -64,14 +67,14 @@ public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void ->toDateTimeString(), 'class' => Exception::class, 'result' => Serializer::serialize([ - 'class' => \Exception::class, + 'class' => BaseException::class, 'message' => 'child failed: child-1', 'code' => 0, ]), ]); $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ - 'class' => \Exception::class, + 'class' => BaseException::class, 'message' => 'child failed: child-2', 'code' => 0, ], sourceClass: TestProbeChildFailureWorkflow::class); @@ -97,14 +100,14 @@ public function testPersistsWriteWhenProbeReachesCandidateException(): void ->toDateTimeString(), 'class' => Exception::class, 'result' => Serializer::serialize([ - 'class' => \RuntimeException::class, + 'class' => RuntimeException::class, 'message' => 'first failure', 'code' => 0, ]), ]); $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ - 'class' => \InvalidArgumentException::class, + 'class' => InvalidArgumentException::class, 'message' => 'second failure', 'code' => 0, ], sourceClass: TestProbeRetryActivity::class); From b36bd34d1ec49a68952266c9bed92a91a5bce78a Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 13:14:16 +0000 Subject: [PATCH 04/26] Inline Deferred promise returns --- src/ActivityStub.php | 6 ++---- src/ChildWorkflowStub.php | 6 ++---- src/Traits/Awaits.php | 6 ++---- src/Traits/Timers.php | 9 +++------ 4 files changed, 9 insertions(+), 18 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 7ef2caa6..b9cbe53e 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -89,15 +89,13 @@ public static function make($activity, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 54f40d91..7ea7d90b 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -82,8 +82,7 @@ public static function make($workflow, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } if (! $context->replaying) { @@ -113,7 +112,6 @@ public static function make($workflow, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/Traits/Awaits.php b/src/Traits/Awaits.php index 7df13ada..0ba8fae1 100644 --- a/src/Traits/Awaits.php +++ b/src/Traits/Awaits.php @@ -24,8 +24,7 @@ public static function await($condition): PromiseInterface if (self::isProbing()) { ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } $result = $condition(); @@ -55,7 +54,6 @@ public static function await($condition): PromiseInterface } ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/Traits/Timers.php b/src/Traits/Timers.php index f2fc6a32..0be2981a 100644 --- a/src/Traits/Timers.php +++ b/src/Traits/Timers.php @@ -43,8 +43,7 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa if (self::isProbing()) { ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } if (! self::$context->replaying) { @@ -54,8 +53,7 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa ]); } else { ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } @@ -101,7 +99,6 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa } ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } From 7393ea500bd7b3d2481bc562b92b3eed1ed5bff8 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 13:15:07 +0000 Subject: [PATCH 05/26] Remove redundant context resets --- src/ActivityStub.php | 3 --- src/ChildWorkflowStub.php | 3 --- 2 files changed, 6 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index b9cbe53e..8256c4b6 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -64,7 +64,6 @@ public static function make($activity, ...$arguments): PromiseInterface } ++$context->index; - WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); if ( is_array($result) && @@ -88,14 +87,12 @@ public static function make($activity, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; - WorkflowStub::setContext($context); return (new Deferred())->promise(); } $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; - WorkflowStub::setContext($context); return (new Deferred())->promise(); } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 7ea7d90b..58279d89 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -58,7 +58,6 @@ public static function make($workflow, ...$arguments): PromiseInterface } ++$context->index; - WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); if ( is_array($result) @@ -81,7 +80,6 @@ public static function make($workflow, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; - WorkflowStub::setContext($context); return (new Deferred())->promise(); } @@ -111,7 +109,6 @@ public static function make($workflow, ...$arguments): PromiseInterface } ++$context->index; - WorkflowStub::setContext($context); return (new Deferred())->promise(); } } From f3a51750c5977516682df1b6fcbba298c6071e9a Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 13:50:06 +0000 Subject: [PATCH 06/26] Restore parent context in workflow stubs --- src/ActivityStub.php | 3 +++ src/ChildWorkflowStub.php | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 8256c4b6..b9cbe53e 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -64,6 +64,7 @@ public static function make($activity, ...$arguments): PromiseInterface } ++$context->index; + WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); if ( is_array($result) && @@ -87,12 +88,14 @@ public static function make($activity, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; + WorkflowStub::setContext($context); return (new Deferred())->promise(); } $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; + WorkflowStub::setContext($context); return (new Deferred())->promise(); } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 58279d89..7ea7d90b 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -58,6 +58,7 @@ public static function make($workflow, ...$arguments): PromiseInterface } ++$context->index; + WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); if ( is_array($result) @@ -80,6 +81,7 @@ public static function make($workflow, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; + WorkflowStub::setContext($context); return (new Deferred())->promise(); } @@ -109,6 +111,7 @@ public static function make($workflow, ...$arguments): PromiseInterface } ++$context->index; + WorkflowStub::setContext($context); return (new Deferred())->promise(); } } From 1ea7c85624853cd0f5a125512c7aa31187ea823f Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 14:08:24 +0000 Subject: [PATCH 07/26] Tag activity exceptions for probe replay --- src/Activity.php | 3 ++- tests/Unit/ExceptionTest.php | 44 ++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/src/Activity.php b/src/Activity.php index a26508a7..44e116b9 100644 --- a/src/Activity.php +++ b/src/Activity.php @@ -170,7 +170,8 @@ public function failed(Throwable $throwable): void $this->storedWorkflow, $throwable, $workflow->connection(), - $workflow->queue() + $workflow->queue(), + $this::class ); } diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 326577fa..abc334fa 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -7,10 +7,13 @@ use Exception as BaseException; use InvalidArgumentException; use RuntimeException; +use Tests\Fixtures\TestActivity; use Tests\Fixtures\TestProbeBackToBackWorkflow; use Tests\Fixtures\TestProbeChildFailureWorkflow; use Tests\Fixtures\TestProbeParallelChildWorkflow; use Tests\Fixtures\TestProbeRetryActivity; +use Tests\Fixtures\TestSagaActivity; +use Tests\Fixtures\TestSagaParallelActivityWorkflow; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\Exception; @@ -115,4 +118,45 @@ public function testPersistsWriteWhenProbeReachesCandidateException(): void $this->assertTrue($storedWorkflow->fresh()->hasLogByIndex(1)); } + + public function testSkipsWriteWhenProbeReachesDifferentActivityClassAtSameIndex(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestSagaParallelActivityWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('step complete'), + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => RuntimeException::class, + 'message' => 'parallel failure', + 'code' => 0, + ]), + ]); + + $exception = new Exception(2, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => RuntimeException::class, + 'message' => 'another parallel failure', + 'code' => 0, + ], sourceClass: TestSagaActivity::class); + $exception->handle(); + + $this->assertFalse($storedWorkflow->fresh()->hasLogByIndex(2)); + } } From d8d4219d251e43eb6d5eade09b99ae818b9c0a45 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 14:57:02 +0000 Subject: [PATCH 08/26] Skip stale foreign exception logs during replay --- src/ActivityStub.php | 11 +++++++ src/ChildWorkflowStub.php | 11 +++++++ src/Exception.php | 44 +++++++++++++++++++++------- tests/Unit/ActivityStubTest.php | 39 ++++++++++++++++++++++++ tests/Unit/ChildWorkflowStubTest.php | 40 +++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 10 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index b9cbe53e..56caaa67 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -66,6 +66,9 @@ public static function make($activity, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); + if ($log->class === Exception::class && self::isForeignExceptionResult($result, $activity)) { + return self::make($activity, ...$arguments); + } if ( is_array($result) && array_key_exists('class', $result) && @@ -98,4 +101,12 @@ public static function make($activity, ...$arguments): PromiseInterface WorkflowStub::setContext($context); return (new Deferred())->promise(); } + + private static function isForeignExceptionResult(mixed $result, string $activity): bool + { + return is_array($result) + && isset($result['sourceClass']) + && is_string($result['sourceClass']) + && $result['sourceClass'] !== $activity; + } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 7ea7d90b..03a23ff6 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -60,6 +60,9 @@ public static function make($workflow, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); + if ($log->class === Exception::class && self::isForeignExceptionResult($result, $workflow)) { + return self::make($workflow, ...$arguments); + } if ( is_array($result) && array_key_exists('class', $result) @@ -114,4 +117,12 @@ public static function make($workflow, ...$arguments): PromiseInterface WorkflowStub::setContext($context); return (new Deferred())->promise(); } + + private static function isForeignExceptionResult(mixed $result, string $workflow): bool + { + return is_array($result) + && isset($result['sourceClass']) + && is_string($result['sourceClass']) + && $result['sourceClass'] !== $workflow; + } } diff --git a/src/Exception.php b/src/Exception.php index 74ca4bec..2c6ec0d9 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -10,6 +10,7 @@ use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Illuminate\Support\Facades\Cache; use Throwable; use Workflow\Exceptions\TransitionNotFound; use Workflow\Middleware\WithoutOverlappingMiddleware; @@ -51,18 +52,30 @@ public function __construct( public function handle() { - $workflow = $this->storedWorkflow->toWorkflow(); + $lock = Cache::lock('laravel-workflow-exception:' . $this->storedWorkflow->id, 15); + + if (! $lock->get()) { + $this->release(); + + return; + } try { - if ($this->storedWorkflow->hasLogByIndex($this->index)) { - $workflow->resume(); - } elseif ($this->shouldPersistAfterProbeReplay()) { - $workflow->next($this->index, $this->now, self::class, $this->exception); - } - } catch (TransitionNotFound) { - if ($workflow->running()) { - $this->release(); + $workflow = $this->storedWorkflow->toWorkflow(); + + try { + if ($this->storedWorkflow->hasLogByIndex($this->index)) { + $workflow->resume(); + } elseif ($this->shouldPersistAfterProbeReplay()) { + $workflow->next($this->index, $this->now, self::class, $this->exceptionPayload()); + } + } catch (TransitionNotFound) { + if ($workflow->running()) { + $this->release(); + } } + } finally { + $lock->release(); } } @@ -132,11 +145,22 @@ private function createTentativeWorkflowState(): StoredWorkflow 'index' => $this->index, 'now' => $this->now, 'class' => self::class, - 'result' => Serializer::serialize($this->exception), + 'result' => Serializer::serialize($this->exceptionPayload()), ]); $storedWorkflowClass = $this->storedWorkflow::class; return $storedWorkflowClass::query()->findOrFail($this->storedWorkflow->id); } + + private function exceptionPayload() + { + if (! is_array($this->exception) || $this->sourceClass === null) { + return $this->exception; + } + + return array_merge($this->exception, [ + 'sourceClass' => $this->sourceClass, + ]); + } } diff --git a/tests/Unit/ActivityStubTest.php b/tests/Unit/ActivityStubTest.php index af6cee9e..6bd00905 100644 --- a/tests/Unit/ActivityStubTest.php +++ b/tests/Unit/ActivityStubTest.php @@ -7,9 +7,11 @@ use Exception; use RuntimeException; use Tests\Fixtures\TestActivity; +use Tests\Fixtures\TestOtherActivity; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\ActivityStub; +use Workflow\Exception as WorkflowException; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowPendingStatus; @@ -123,6 +125,43 @@ public function testLoadsStoredExceptionWithNonStandardConstructor(): void }); } + public function testSkipsStoredExceptionForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('test'), + ]); + + ActivityStub::make(TestActivity::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + public function testAll(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index f9723562..ff7f9e01 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -4,11 +4,14 @@ namespace Tests\Unit; +use Exception; use Mockery; use Tests\Fixtures\TestChildWorkflow; +use Tests\Fixtures\TestExceptionWorkflow; use Tests\Fixtures\TestParentWorkflow; use Tests\TestCase; use Workflow\ChildWorkflowStub; +use Workflow\Exception as WorkflowException; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowPendingStatus; @@ -91,6 +94,43 @@ public function testLoadsChildWorkflow(): void $this->assertNull($result); } + public function testSkipsStoredExceptionForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => TestChildWorkflow::class, + 'result' => Serializer::serialize('test'), + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + public function testDoesNotResumeRunningStartedChildWorkflow(): void { $childWorkflow = Mockery::mock(); From 3456f71bdbff3914ad7ca0ba09ea7d753c0d7ab8 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 16:43:30 +0000 Subject: [PATCH 09/26] Add unit coverage for probe replay branches --- tests/Unit/ChildWorkflowStubTest.php | 71 ++++++++++++++++++ tests/Unit/ExceptionTest.php | 102 ++++++++++++++++++++++++++ tests/Unit/Traits/AwaitsTest.php | 30 ++++++++ tests/Unit/Traits/SideEffectsTest.php | 30 ++++++++ tests/Unit/Traits/TimersTest.php | 32 ++++++++ tests/Unit/Traits/VersionsTest.php | 24 ++++++ tests/Unit/WorkflowStubTest.php | 38 ++++++++++ 7 files changed, 327 insertions(+) diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index ff7f9e01..2ab1a9fe 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -131,6 +131,77 @@ public function testSkipsStoredExceptionForDifferentSourceClass(): void $this->assertSame(2, WorkflowStub::getContext()->index); } + public function testMarksProbeMatchedForMatchingStoredException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'matching child failure', + 'code' => 0, + ]), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + try { + ChildWorkflowStub::make(TestChildWorkflow::class); + $this->fail('Expected child exception to be thrown.'); + } catch (Exception $exception) { + $this->assertSame('matching child failure', $exception->getMessage()); + } + + $this->assertTrue(WorkflowStub::probeMatched()); + } + + public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredChildWorkflow(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testDoesNotResumeRunningStartedChildWorkflow(): void { $childWorkflow = Mockery::mock(); diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index abc334fa..a2fa3296 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -5,7 +5,11 @@ namespace Tests\Unit; use Exception as BaseException; +use Illuminate\Contracts\Queue\Job as JobContract; +use Illuminate\Support\Facades\Cache; use InvalidArgumentException; +use Mockery; +use ReflectionMethod; use RuntimeException; use Tests\Fixtures\TestActivity; use Tests\Fixtures\TestProbeBackToBackWorkflow; @@ -54,6 +58,98 @@ public function testExceptionWorkflowRunning(): void $this->assertSame(WorkflowRunningStatus::class, $workflow->status()); } + public function testHandleResumesWorkflowWhenLogAlreadyExists(): void + { + $lock = Mockery::mock(); + $lock->shouldReceive('get') + ->once() + ->andReturn(true); + $lock->shouldReceive('release') + ->once(); + + Cache::shouldReceive('lock') + ->once() + ->with('laravel-workflow-exception:123', 15) + ->andReturn($lock); + + $workflow = Mockery::mock(); + $workflow->shouldReceive('resume') + ->once(); + + $storedWorkflow = Mockery::mock(StoredWorkflow::class) + ->makePartial(); + $storedWorkflow->id = 123; + $storedWorkflow->shouldReceive('effectiveConnection') + ->andReturn(null); + $storedWorkflow->shouldReceive('effectiveQueue') + ->andReturn(null); + $storedWorkflow->shouldReceive('toWorkflow') + ->once() + ->andReturn($workflow); + $storedWorkflow->shouldReceive('hasLogByIndex') + ->once() + ->with(0) + ->andReturn(true); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new BaseException('existing log')); + $exception->handle(); + + $this->assertSame(123, $storedWorkflow->id); + + Mockery::close(); + } + + public function testHandleReleasesWhenExceptionLockUnavailable(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + + $lock = Mockery::mock(); + $lock->shouldReceive('get') + ->once() + ->andReturn(false); + + Cache::shouldReceive('lock') + ->once() + ->with('laravel-workflow-exception:' . $storedWorkflow->id, 15) + ->andReturn($lock); + + $job = Mockery::mock(JobContract::class); + $job->shouldReceive('release') + ->once() + ->with(0); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'locked', + 'code' => 0, + ]); + $exception->setJob($job); + $exception->handle(); + + $this->assertFalse($storedWorkflow->hasLogByIndex(0)); + + Mockery::close(); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassIsInvalid(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = ''; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'invalid workflow class', + 'code' => 0, + ]); + + $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method->setAccessible(true); + + $this->assertTrue($method->invoke($exception)); + } + public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); @@ -116,7 +212,13 @@ public function testPersistsWriteWhenProbeReachesCandidateException(): void ], sourceClass: TestProbeRetryActivity::class); $exception->handle(); + $log = $storedWorkflow->fresh() + ->logs() + ->firstWhere('index', 1); + + $this->assertNotNull($log); $this->assertTrue($storedWorkflow->fresh()->hasLogByIndex(1)); + $this->assertSame(TestProbeRetryActivity::class, Serializer::unserialize($log->result)['sourceClass']); } public function testSkipsWriteWhenProbeReachesDifferentActivityClassAtSameIndex(): void diff --git a/tests/Unit/Traits/AwaitsTest.php b/tests/Unit/Traits/AwaitsTest.php index 12b2a8bd..1759143e 100644 --- a/tests/Unit/Traits/AwaitsTest.php +++ b/tests/Unit/Traits/AwaitsTest.php @@ -102,6 +102,36 @@ public function testResolvesConflictingResult(): void $this->assertFalse(Serializer::unserialize($workflow->logs()->firstWhere('index', 0)->result)); } + public function testDefersWhenProbing(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $conditionEvaluated = false; + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::await(static function () use (&$conditionEvaluated): bool { + $conditionEvaluated = true; + + return true; + }) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertFalse($conditionEvaluated); + $this->assertNull($result); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/Traits/SideEffectsTest.php b/tests/Unit/Traits/SideEffectsTest.php index 9e121000..dfbe0a3a 100644 --- a/tests/Unit/Traits/SideEffectsTest.php +++ b/tests/Unit/Traits/SideEffectsTest.php @@ -88,6 +88,36 @@ public function testResolvesConflictingResult(): void $this->assertSame('test', Serializer::unserialize($workflow->logs()->firstWhere('index', 0)->result)); } + public function testDefersWhenProbing(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $callableEvaluated = false; + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::sideEffect(static function () use (&$callableEvaluated): string { + $callableEvaluated = true; + + return 'test'; + }) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertFalse($callableEvaluated); + $this->assertNull($result); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/Traits/TimersTest.php b/tests/Unit/Traits/TimersTest.php index 0e5ead92..cce36994 100644 --- a/tests/Unit/Traits/TimersTest.php +++ b/tests/Unit/Traits/TimersTest.php @@ -242,6 +242,38 @@ public function testTimerReturnsUnresolvedPromiseWhenReplayingAndNoTimer(): void ]); } + public function testTimerReturnsUnresolvedPromiseWhenProbingAndNoTimer(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::timer('1 minute') + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertDatabaseMissing('workflow_timers', [ + 'stored_workflow_id' => $workflow->id(), + 'index' => 0, + ]); + } + public function testTimerCapsDelayForSqsDriver(): void { Bus::fake(); diff --git a/tests/Unit/Traits/VersionsTest.php b/tests/Unit/Traits/VersionsTest.php index 1fe909f9..d5425b49 100644 --- a/tests/Unit/Traits/VersionsTest.php +++ b/tests/Unit/Traits/VersionsTest.php @@ -191,6 +191,30 @@ public function testResolvesConflictingResultThrowsWhenVersionNotSupported(): vo Mockery::close(); } + public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredVersion(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::getVersion('test-change', WorkflowStub::DEFAULT_VERSION, 1) + ->then(static function ($value) use (&$result): void { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertSame(0, $workflow->logs()->count()); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/WorkflowStubTest.php b/tests/Unit/WorkflowStubTest.php index cf6b7e60..4188ddfd 100644 --- a/tests/Unit/WorkflowStubTest.php +++ b/tests/Unit/WorkflowStubTest.php @@ -8,6 +8,8 @@ use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Queue; +use ReflectionProperty; +use stdClass; use Tests\Fixtures\TestAwaitWorkflow; use Tests\Fixtures\TestBadConnectionWorkflow; use Tests\Fixtures\TestChatBotWorkflow; @@ -231,6 +233,42 @@ public function testConnection(): void $this->assertSame('default', WorkflowStub::queue()); } + public function testProbeHelpers(): void + { + $contextProperty = new ReflectionProperty(WorkflowStub::class, 'context'); + $contextProperty->setAccessible(true); + $previousContext = $contextProperty->getValue(); + $contextProperty->setValue(null); + + try { + $this->assertInstanceOf(stdClass::class, WorkflowStub::getContext()); + $this->assertFalse(WorkflowStub::isProbing()); + $this->assertNull(WorkflowStub::probeIndex()); + $this->assertNull(WorkflowStub::probeClass()); + $this->assertFalse(WorkflowStub::probeMatched()); + + WorkflowStub::markProbeMatched(); + + $this->assertFalse(WorkflowStub::probeMatched()); + + WorkflowStub::setContext([ + 'probing' => true, + 'probeIndex' => 7, + 'probeClass' => TestWorkflow::class, + 'probeMatched' => false, + ]); + + WorkflowStub::markProbeMatched(); + + $this->assertTrue(WorkflowStub::isProbing()); + $this->assertSame(7, WorkflowStub::probeIndex()); + $this->assertSame(TestWorkflow::class, WorkflowStub::probeClass()); + $this->assertTrue(WorkflowStub::probeMatched()); + } finally { + $contextProperty->setValue($previousContext); + } + } + public function testHandlesDuplicateLogInsertionProperly(): void { Queue::fake(); From d0ca14dc268d35fc744fc78a51a15c457f93f8c0 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 18:10:35 +0000 Subject: [PATCH 10/26] Stream feature worker output in CI --- .github/workflows/php.yml | 2 ++ tests/TestCase.php | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 2934dfbb..44d66c74 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -82,6 +82,7 @@ jobs: DB_CONNECTION: mysql DB_PORT: ${{ job.services.mysql.ports[3306] }} QUEUE_CONNECTION: redis + WORKFLOW_TEST_STREAM_WORKER_OUTPUT: 1 - name: Run test suite (PostgreSQL) run: vendor/bin/phpunit --testdox --debug --testsuite feature @@ -89,6 +90,7 @@ jobs: DB_CONNECTION: pgsql DB_PORT: ${{ job.services.postgres.ports[5432] }} QUEUE_CONNECTION: redis + WORKFLOW_TEST_STREAM_WORKER_OUTPUT: 1 - name: Upload laravel.log if tests fail if: failure() diff --git a/tests/TestCase.php b/tests/TestCase.php index de3b4bf6..46e31678 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -43,8 +43,15 @@ public static function setUpBeforeClass(): void for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); - self::$workers[$i]->disableOutput(); - self::$workers[$i]->start(); + if (! self::shouldStreamWorkerOutput()) { + self::$workers[$i]->disableOutput(); + self::$workers[$i]->start(); + continue; + } + + self::$workers[$i]->start(static function (string $type, string $output) use ($i): void { + fwrite(STDERR, '[worker-' . $i . '][' . $type . '] ' . $output); + }); } } @@ -94,4 +101,11 @@ protected function getPackageProviders($app) { return [\Workflow\Providers\WorkflowServiceProvider::class]; } + + private static function shouldStreamWorkerOutput(): bool + { + $value = getenv('WORKFLOW_TEST_STREAM_WORKER_OUTPUT') ?: ($_ENV['WORKFLOW_TEST_STREAM_WORKER_OUTPUT'] ?? null); + + return in_array($value, ['1', 'true', 'yes', 'on'], true); + } } From 39e68393f24ad5505a334b9ae62ea3d4a7d05fa5 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 18:29:34 +0000 Subject: [PATCH 11/26] Capture CI worker output to files --- .github/workflows/php.yml | 8 +++++--- tests/TestCase.php | 19 +++++++++++++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 44d66c74..780f6a15 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -82,7 +82,7 @@ jobs: DB_CONNECTION: mysql DB_PORT: ${{ job.services.mysql.ports[3306] }} QUEUE_CONNECTION: redis - WORKFLOW_TEST_STREAM_WORKER_OUTPUT: 1 + WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT: 1 - name: Run test suite (PostgreSQL) run: vendor/bin/phpunit --testdox --debug --testsuite feature @@ -90,14 +90,16 @@ jobs: DB_CONNECTION: pgsql DB_PORT: ${{ job.services.postgres.ports[5432] }} QUEUE_CONNECTION: redis - WORKFLOW_TEST_STREAM_WORKER_OUTPUT: 1 + WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT: 1 - name: Upload laravel.log if tests fail if: failure() uses: actions/upload-artifact@v4 with: name: laravel-log - path: vendor/orchestra/testbench-core/laravel/storage/logs/laravel.log + path: | + vendor/orchestra/testbench-core/laravel/storage/logs/laravel.log + vendor/orchestra/testbench-core/laravel/storage/logs/workflow-test-worker-*.log - name: Code Coverage run: | diff --git a/tests/TestCase.php b/tests/TestCase.php index 46e31678..9393ce39 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -43,14 +43,16 @@ public static function setUpBeforeClass(): void for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); - if (! self::shouldStreamWorkerOutput()) { + if (! self::shouldCaptureWorkerOutput()) { self::$workers[$i]->disableOutput(); self::$workers[$i]->start(); continue; } + file_put_contents(self::workerLogPath($i), ''); + self::$workers[$i]->start(static function (string $type, string $output) use ($i): void { - fwrite(STDERR, '[worker-' . $i . '][' . $type . '] ' . $output); + file_put_contents(self::workerLogPath($i), $output, FILE_APPEND | LOCK_EX); }); } } @@ -102,10 +104,19 @@ protected function getPackageProviders($app) return [\Workflow\Providers\WorkflowServiceProvider::class]; } - private static function shouldStreamWorkerOutput(): bool + private static function shouldCaptureWorkerOutput(): bool { - $value = getenv('WORKFLOW_TEST_STREAM_WORKER_OUTPUT') ?: ($_ENV['WORKFLOW_TEST_STREAM_WORKER_OUTPUT'] ?? null); + $value = getenv( + 'WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT' + ) ?: ($_ENV['WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT'] ?? null); return in_array($value, ['1', 'true', 'yes', 'on'], true); } + + private static function workerLogPath(int $worker): string + { + return dirname( + __DIR__ + ) . '/vendor/orchestra/testbench-core/laravel/storage/logs/workflow-test-worker-' . $worker . '.log'; + } } From 0ad07ad4c0c0dc0600cda3c3e910b4f657288797 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 18:40:49 +0000 Subject: [PATCH 12/26] Revert CI worker capture instrumentation --- .github/workflows/php.yml | 6 +----- tests/TestCase.php | 29 ++--------------------------- 2 files changed, 3 insertions(+), 32 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 780f6a15..2934dfbb 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -82,7 +82,6 @@ jobs: DB_CONNECTION: mysql DB_PORT: ${{ job.services.mysql.ports[3306] }} QUEUE_CONNECTION: redis - WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT: 1 - name: Run test suite (PostgreSQL) run: vendor/bin/phpunit --testdox --debug --testsuite feature @@ -90,16 +89,13 @@ jobs: DB_CONNECTION: pgsql DB_PORT: ${{ job.services.postgres.ports[5432] }} QUEUE_CONNECTION: redis - WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT: 1 - name: Upload laravel.log if tests fail if: failure() uses: actions/upload-artifact@v4 with: name: laravel-log - path: | - vendor/orchestra/testbench-core/laravel/storage/logs/laravel.log - vendor/orchestra/testbench-core/laravel/storage/logs/workflow-test-worker-*.log + path: vendor/orchestra/testbench-core/laravel/storage/logs/laravel.log - name: Code Coverage run: | diff --git a/tests/TestCase.php b/tests/TestCase.php index 9393ce39..de3b4bf6 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -43,17 +43,8 @@ public static function setUpBeforeClass(): void for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); - if (! self::shouldCaptureWorkerOutput()) { - self::$workers[$i]->disableOutput(); - self::$workers[$i]->start(); - continue; - } - - file_put_contents(self::workerLogPath($i), ''); - - self::$workers[$i]->start(static function (string $type, string $output) use ($i): void { - file_put_contents(self::workerLogPath($i), $output, FILE_APPEND | LOCK_EX); - }); + self::$workers[$i]->disableOutput(); + self::$workers[$i]->start(); } } @@ -103,20 +94,4 @@ protected function getPackageProviders($app) { return [\Workflow\Providers\WorkflowServiceProvider::class]; } - - private static function shouldCaptureWorkerOutput(): bool - { - $value = getenv( - 'WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT' - ) ?: ($_ENV['WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT'] ?? null); - - return in_array($value, ['1', 'true', 'yes', 'on'], true); - } - - private static function workerLogPath(int $worker): string - { - return dirname( - __DIR__ - ) . '/vendor/orchestra/testbench-core/laravel/storage/logs/workflow-test-worker-' . $worker . '.log'; - } } From ee0b47ac1a5bb711f657651dc25736b3f7acedd8 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 19:06:02 +0000 Subject: [PATCH 13/26] Use synthetic logs for probe replay --- src/Exception.php | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/src/Exception.php b/src/Exception.php index 2c6ec0d9..d7ca59a9 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -15,6 +15,7 @@ use Workflow\Exceptions\TransitionNotFound; use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; +use Workflow\Models\StoredWorkflowLog; use Workflow\Serializers\Serializer; final class Exception implements ShouldBeEncrypted, ShouldQueue @@ -100,11 +101,8 @@ private function shouldPersistAfterProbeReplay(): bool } $previousContext = WorkflowStub::getContext(); - $connection = $this->storedWorkflow->getConnection(); $shouldPersist = false; - $connection->beginTransaction(); - try { $tentativeWorkflow = $this->createTentativeWorkflowState(); $workflow = new $workflowClass($tentativeWorkflow, ...$tentativeWorkflow->workflowArguments()); @@ -130,10 +128,6 @@ private function shouldPersistAfterProbeReplay(): bool $shouldPersist = WorkflowStub::probeMatched(); } finally { WorkflowStub::setContext($previousContext); - - if ($connection->transactionLevel() > 0) { - $connection->rollBack(); - } } return $shouldPersist; @@ -141,16 +135,32 @@ private function shouldPersistAfterProbeReplay(): bool private function createTentativeWorkflowState(): StoredWorkflow { - $this->storedWorkflow->createLog([ - 'index' => $this->index, - 'now' => $this->now, - 'class' => self::class, - 'result' => Serializer::serialize($this->exceptionPayload()), - ]); - $storedWorkflowClass = $this->storedWorkflow::class; - return $storedWorkflowClass::query()->findOrFail($this->storedWorkflow->id); + /** @var StoredWorkflow $tentativeWorkflow */ + $tentativeWorkflow = $storedWorkflowClass::query() + ->findOrFail($this->storedWorkflow->id); + + $tentativeWorkflow->loadMissing(['logs', 'signals']); + + /** @var StoredWorkflowLog $tentativeLog */ + $tentativeLog = $tentativeWorkflow->logs() + ->make([ + 'index' => $this->index, + 'now' => $this->now, + 'class' => self::class, + 'result' => Serializer::serialize($this->exceptionPayload()), + ]); + + $tentativeWorkflow->setRelation( + 'logs', + $tentativeWorkflow->getRelation('logs') + ->push($tentativeLog) + ->sortBy(static fn ($log): string => sprintf('%020d:%020d', $log->index, $log->id ?? PHP_INT_MAX)) + ->values() + ); + + return $tentativeWorkflow; } private function exceptionPayload() From ef06874de4f09ad0c9c22a307f46e42e5837354c Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 21:47:36 +0000 Subject: [PATCH 14/26] Flush Redis after feature workers stop --- tests/TestCase.php | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/tests/TestCase.php b/tests/TestCase.php index de3b4bf6..37f28192 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -29,17 +29,7 @@ public static function setUpBeforeClass(): void } } - $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); - $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); - if ($redisHost && class_exists(\Redis::class)) { - try { - $redis = new \Redis(); - $redis->connect($redisHost, (int) $redisPort); - $redis->flushDB(); - } catch (\Throwable $e) { - // Ignore if no redis - } - } + self::flushRedis(); for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); @@ -53,6 +43,10 @@ public static function tearDownAfterClass(): void foreach (self::$workers as $worker) { $worker->stop(); } + + self::$workers = []; + + self::flushRedis(); } protected function setUp(): void @@ -67,17 +61,7 @@ protected function setUp(): void Cache::flush(); - $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); - $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); - if ($redisHost && class_exists(\Redis::class)) { - try { - $redis = new \Redis(); - $redis->connect($redisHost, (int) $redisPort); - $redis->flushDB(); - } catch (\Throwable $e) { - // Ignore if no redis - } - } + self::flushRedis(); } protected function defineDatabaseMigrations() @@ -94,4 +78,19 @@ protected function getPackageProviders($app) { return [\Workflow\Providers\WorkflowServiceProvider::class]; } + + private static function flushRedis(): void + { + $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); + $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); + if ($redisHost && class_exists(\Redis::class)) { + try { + $redis = new \Redis(); + $redis->connect($redisHost, (int) $redisPort); + $redis->flushDB(); + } catch (\Throwable $e) { + // Ignore if no redis + } + } + } } From 1bea82fa55554bf05821758f4811aa30dacacd27 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 22:11:58 +0000 Subject: [PATCH 15/26] Remove overlap middleware from exception jobs --- src/Exception.php | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/Exception.php b/src/Exception.php index d7ca59a9..fca28d08 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -13,7 +13,6 @@ use Illuminate\Support\Facades\Cache; use Throwable; use Workflow\Exceptions\TransitionNotFound; -use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; use Workflow\Models\StoredWorkflowLog; use Workflow\Serializers\Serializer; @@ -82,14 +81,7 @@ public function handle() public function middleware() { - return [ - new WithoutOverlappingMiddleware( - $this->storedWorkflow->id, - WithoutOverlappingMiddleware::ACTIVITY, - 0, - 15 - ), - ]; + return []; } private function shouldPersistAfterProbeReplay(): bool From da8240c690feeb3b7ac446bfbf32619bac659e2b Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sat, 4 Apr 2026 19:08:42 +0000 Subject: [PATCH 16/26] Update exception middleware expectation --- tests/Unit/ExceptionTest.php | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index a2fa3296..b624eea2 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -21,7 +21,6 @@ use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\Exception; -use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowRunningStatus; @@ -35,12 +34,7 @@ public function testMiddleware(): void 'Test exception' )); - $middleware = collect($exception->middleware()) - ->values(); - - $this->assertCount(1, $middleware); - $this->assertSame(WithoutOverlappingMiddleware::class, get_class($middleware[0])); - $this->assertSame(15, $middleware[0]->expiresAfter); + $this->assertSame([], $exception->middleware()); } public function testExceptionWorkflowRunning(): void From 8887e29fee419e232df9d21cf95c6c34fe1f3fa0 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sat, 4 Apr 2026 20:02:14 +0000 Subject: [PATCH 17/26] Restore workflow overlap guard for exception jobs --- src/Exception.php | 10 +++++++++- tests/Unit/ExceptionTest.php | 9 ++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/Exception.php b/src/Exception.php index fca28d08..3e27e350 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -13,6 +13,7 @@ use Illuminate\Support\Facades\Cache; use Throwable; use Workflow\Exceptions\TransitionNotFound; +use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; use Workflow\Models\StoredWorkflowLog; use Workflow\Serializers\Serializer; @@ -81,7 +82,14 @@ public function handle() public function middleware() { - return []; + return [ + new WithoutOverlappingMiddleware( + $this->storedWorkflow->id, + WithoutOverlappingMiddleware::WORKFLOW, + 0, + 15 + ), + ]; } private function shouldPersistAfterProbeReplay(): bool diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index b624eea2..f1786455 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -21,6 +21,7 @@ use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\Exception; +use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowRunningStatus; @@ -34,7 +35,13 @@ public function testMiddleware(): void 'Test exception' )); - $this->assertSame([], $exception->middleware()); + $middleware = collect($exception->middleware()) + ->values(); + + $this->assertCount(1, $middleware); + $this->assertSame(WithoutOverlappingMiddleware::class, $middleware[0]::class); + $this->assertSame(WithoutOverlappingMiddleware::WORKFLOW, $middleware[0]->type); + $this->assertSame(15, $middleware[0]->expiresAfter); } public function testExceptionWorkflowRunning(): void From 26beade1d5ea89333b28a3c09c0d9f34ea556aab Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sat, 4 Apr 2026 20:28:48 +0000 Subject: [PATCH 18/26] Guard probe replay against invalid workflow classes --- src/Exception.php | 13 ++++++++++++ tests/Unit/ExceptionTest.php | 39 +++++++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/src/Exception.php b/src/Exception.php index 3e27e350..897a84f8 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -11,6 +11,7 @@ use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; use Illuminate\Support\Facades\Cache; +use ReflectionClass; use Throwable; use Workflow\Exceptions\TransitionNotFound; use Workflow\Middleware\WithoutOverlappingMiddleware; @@ -100,6 +101,18 @@ private function shouldPersistAfterProbeReplay(): bool return true; } + try { + if (! class_exists($workflowClass) || ! is_subclass_of($workflowClass, Workflow::class)) { + return true; + } + + if (! (new ReflectionClass($workflowClass))->isInstantiable()) { + return true; + } + } catch (Throwable) { + return true; + } + $previousContext = WorkflowStub::getContext(); $shouldPersist = false; diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index f1786455..e6bc9bbd 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -11,6 +11,7 @@ use Mockery; use ReflectionMethod; use RuntimeException; +use stdClass; use Tests\Fixtures\TestActivity; use Tests\Fixtures\TestProbeBackToBackWorkflow; use Tests\Fixtures\TestProbeChildFailureWorkflow; @@ -143,7 +144,43 @@ public function testProbeReplayShortCircuitsWhenWorkflowClassIsInvalid(): void 'class' => BaseException::class, 'message' => 'invalid workflow class', 'code' => 0, - ]); + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method->setAccessible(true); + + $this->assertTrue($method->invoke($exception)); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassDoesNotExist(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = 'Tests\\Fixtures\\MissingWorkflowClass'; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'missing workflow class', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method->setAccessible(true); + + $this->assertTrue($method->invoke($exception)); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassIsNotAWorkflow(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = stdClass::class; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'non workflow class', + 'code' => 0, + ], connection: 'redis', queue: 'default'); $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); $method->setAccessible(true); From a3b85b8e6cc6ad25c8c4d632ee3e4b4c15f52d68 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sat, 4 Apr 2026 21:18:47 +0000 Subject: [PATCH 19/26] Tighten probe matching and exception retries --- src/ActivityStub.php | 13 +++++-- src/ChildWorkflowStub.php | 13 +++++-- src/Exception.php | 4 +- tests/Unit/ActivityStubTest.php | 43 ++++++++++++++++++++++ tests/Unit/ChildWorkflowStubTest.php | 43 ++++++++++++++++++++++ tests/Unit/ExceptionTest.php | 55 +++++++++++++++++++++++++++- 6 files changed, 161 insertions(+), 10 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 56caaa67..422b30dd 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -51,6 +51,15 @@ public static function make($activity, ...$arguments): PromiseInterface } if ($log) { + $result = Serializer::unserialize($log->result); + + if ($log->class === Exception::class && self::isForeignExceptionResult($result, $activity)) { + ++$context->index; + WorkflowStub::setContext($context); + + return self::make($activity, ...$arguments); + } + if ( WorkflowStub::isProbing() && WorkflowStub::probeIndex() === $context->index @@ -65,10 +74,6 @@ public static function make($activity, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); - $result = Serializer::unserialize($log->result); - if ($log->class === Exception::class && self::isForeignExceptionResult($result, $activity)) { - return self::make($activity, ...$arguments); - } if ( is_array($result) && array_key_exists('class', $result) && diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 03a23ff6..720a8d53 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -45,6 +45,15 @@ public static function make($workflow, ...$arguments): PromiseInterface } if ($log) { + $result = Serializer::unserialize($log->result); + + if ($log->class === Exception::class && self::isForeignExceptionResult($result, $workflow)) { + ++$context->index; + WorkflowStub::setContext($context); + + return self::make($workflow, ...$arguments); + } + if ( WorkflowStub::isProbing() && WorkflowStub::probeIndex() === $context->index @@ -59,10 +68,6 @@ public static function make($workflow, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); - $result = Serializer::unserialize($log->result); - if ($log->class === Exception::class && self::isForeignExceptionResult($result, $workflow)) { - return self::make($workflow, ...$arguments); - } if ( is_array($result) && array_key_exists('class', $result) diff --git a/src/Exception.php b/src/Exception.php index 897a84f8..4fe1b51b 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -26,6 +26,8 @@ final class Exception implements ShouldBeEncrypted, ShouldQueue use Queueable; use SerializesModels; + private const LOCK_RETRY_DELAY = 1; + public ?string $key = null; public $tries = PHP_INT_MAX; @@ -57,7 +59,7 @@ public function handle() $lock = Cache::lock('laravel-workflow-exception:' . $this->storedWorkflow->id, 15); if (! $lock->get()) { - $this->release(); + $this->release(self::LOCK_RETRY_DELAY); return; } diff --git a/tests/Unit/ActivityStubTest.php b/tests/Unit/ActivityStubTest.php index 6bd00905..1e1d596a 100644 --- a/tests/Unit/ActivityStubTest.php +++ b/tests/Unit/ActivityStubTest.php @@ -162,6 +162,49 @@ public function testSkipsStoredExceptionForDifferentSourceClass(): void $this->assertSame(2, WorkflowStub::getContext()->index); } + public function testDoesNotMarkProbeMatchedForForeignStoredException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestActivity::class, + 'probeMatched' => false, + ]); + + ActivityStub::make(TestActivity::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertFalse(WorkflowStub::probeMatched()); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + public function testAll(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index 2ab1a9fe..e990f2eb 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -172,6 +172,49 @@ public function testMarksProbeMatchedForMatchingStoredException(): void $this->assertTrue(WorkflowStub::probeMatched()); } + public function testDoesNotMarkProbeMatchedForForeignStoredException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child failure', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertFalse(WorkflowStub::probeMatched()); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredChildWorkflow(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index e6bc9bbd..592b4258 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -26,6 +26,7 @@ use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowRunningStatus; +use Workflow\Workflow; use Workflow\WorkflowStub; final class ExceptionTest extends TestCase @@ -119,7 +120,7 @@ public function testHandleReleasesWhenExceptionLockUnavailable(): void $job = Mockery::mock(JobContract::class); $job->shouldReceive('release') ->once() - ->with(0); + ->with(1); $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ 'class' => BaseException::class, @@ -188,6 +189,54 @@ public function testProbeReplayShortCircuitsWhenWorkflowClassIsNotAWorkflow(): v $this->assertTrue($method->invoke($exception)); } + public function testProbeReplayShortCircuitsWhenWorkflowClassIsNotInstantiable(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = ExceptionTestAbstractWorkflow::class; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'abstract workflow class', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method->setAccessible(true); + + $this->assertTrue($method->invoke($exception)); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassAutoloadThrows(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = 'Tests\\Fixtures\\ThrowingAutoloadWorkflow'; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'autoload failure', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $autoload = static function (string $class): void { + if ($class === 'Tests\\Fixtures\\ThrowingAutoloadWorkflow') { + throw new RuntimeException('autoload exploded'); + } + }; + + spl_autoload_register($autoload); + + try { + $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method->setAccessible(true); + + $this->assertTrue($method->invoke($exception)); + } finally { + spl_autoload_unregister($autoload); + } + } + public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); @@ -300,3 +349,7 @@ public function testSkipsWriteWhenProbeReachesDifferentActivityClassAtSameIndex( $this->assertFalse($storedWorkflow->fresh()->hasLogByIndex(2)); } } + +abstract class ExceptionTestAbstractWorkflow extends Workflow +{ +} From 98398c83fb3898f3eab3ffa47ea4eb59c37ab30f Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sat, 4 Apr 2026 21:41:15 +0000 Subject: [PATCH 20/26] Retry probe matches behind pending work --- src/ActivityStub.php | 1 + src/ChildWorkflowStub.php | 1 + src/Exception.php | 37 ++++++++++++++++------ src/Traits/Awaits.php | 1 + src/Traits/SideEffects.php | 1 + src/Traits/Timers.php | 5 +++ src/Traits/Versions.php | 1 + src/WorkflowStub.php | 14 +++++++++ tests/Unit/ActivityStubTest.php | 1 + tests/Unit/ChildWorkflowStubTest.php | 1 + tests/Unit/ExceptionTest.php | 45 +++++++++++++++++++++------ tests/Unit/Traits/AwaitsTest.php | 1 + tests/Unit/Traits/SideEffectsTest.php | 1 + tests/Unit/Traits/TimersTest.php | 36 +++++++++++++++++++++ tests/Unit/Traits/VersionsTest.php | 1 + tests/Unit/WorkflowStubTest.php | 6 ++++ 16 files changed, 133 insertions(+), 20 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 422b30dd..4be34562 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -95,6 +95,7 @@ public static function make($activity, ...$arguments): PromiseInterface } if (WorkflowStub::isProbing()) { + WorkflowStub::markProbePendingBeforeMatch(); ++$context->index; WorkflowStub::setContext($context); return (new Deferred())->promise(); diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 720a8d53..6333520c 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -88,6 +88,7 @@ public static function make($workflow, ...$arguments): PromiseInterface } if (WorkflowStub::isProbing()) { + WorkflowStub::markProbePendingBeforeMatch(); ++$context->index; WorkflowStub::setContext($context); return (new Deferred())->promise(); diff --git a/src/Exception.php b/src/Exception.php index 4fe1b51b..5b3a63e4 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -28,6 +28,12 @@ final class Exception implements ShouldBeEncrypted, ShouldQueue private const LOCK_RETRY_DELAY = 1; + private const PROBE_SKIP = 'skip'; + + private const PROBE_PERSIST = 'persist'; + + private const PROBE_RETRY = 'retry'; + public ?string $key = null; public $tries = PHP_INT_MAX; @@ -70,8 +76,14 @@ public function handle() try { if ($this->storedWorkflow->hasLogByIndex($this->index)) { $workflow->resume(); - } elseif ($this->shouldPersistAfterProbeReplay()) { - $workflow->next($this->index, $this->now, self::class, $this->exceptionPayload()); + } else { + $probeDecision = $this->probeReplayDecision(); + + if ($probeDecision === self::PROBE_PERSIST) { + $workflow->next($this->index, $this->now, self::class, $this->exceptionPayload()); + } elseif ($probeDecision === self::PROBE_RETRY) { + $this->release(self::LOCK_RETRY_DELAY); + } } } catch (TransitionNotFound) { if ($workflow->running()) { @@ -95,28 +107,28 @@ public function middleware() ]; } - private function shouldPersistAfterProbeReplay(): bool + private function probeReplayDecision(): string { $workflowClass = $this->storedWorkflow->class; if (! is_string($workflowClass) || $workflowClass === '') { - return true; + return self::PROBE_PERSIST; } try { if (! class_exists($workflowClass) || ! is_subclass_of($workflowClass, Workflow::class)) { - return true; + return self::PROBE_PERSIST; } if (! (new ReflectionClass($workflowClass))->isInstantiable()) { - return true; + return self::PROBE_PERSIST; } } catch (Throwable) { - return true; + return self::PROBE_PERSIST; } $previousContext = WorkflowStub::getContext(); - $shouldPersist = false; + $probeDecision = self::PROBE_SKIP; try { $tentativeWorkflow = $this->createTentativeWorkflowState(); @@ -132,6 +144,7 @@ private function shouldPersistAfterProbeReplay(): bool 'probeIndex' => $this->index, 'probeClass' => $this->sourceClass, 'probeMatched' => false, + 'probePendingBeforeMatch' => false, ]); try { @@ -140,12 +153,16 @@ private function shouldPersistAfterProbeReplay(): bool // The replay path may still throw; we only care whether it matched this tentative log. } - $shouldPersist = WorkflowStub::probeMatched(); + if (WorkflowStub::probeMatched()) { + $probeDecision = WorkflowStub::probePendingBeforeMatch() + ? self::PROBE_RETRY + : self::PROBE_PERSIST; + } } finally { WorkflowStub::setContext($previousContext); } - return $shouldPersist; + return $probeDecision; } private function createTentativeWorkflowState(): StoredWorkflow diff --git a/src/Traits/Awaits.php b/src/Traits/Awaits.php index 0ba8fae1..c3fc138a 100644 --- a/src/Traits/Awaits.php +++ b/src/Traits/Awaits.php @@ -23,6 +23,7 @@ public static function await($condition): PromiseInterface } if (self::isProbing()) { + self::markProbePendingBeforeMatch(); ++self::$context->index; return (new Deferred())->promise(); } diff --git a/src/Traits/SideEffects.php b/src/Traits/SideEffects.php index 72dd5d79..763f7a4e 100644 --- a/src/Traits/SideEffects.php +++ b/src/Traits/SideEffects.php @@ -22,6 +22,7 @@ public static function sideEffect($callable): PromiseInterface } if (self::isProbing()) { + self::markProbePendingBeforeMatch(); ++self::$context->index; return (new Deferred())->promise(); } diff --git a/src/Traits/Timers.php b/src/Traits/Timers.php index 0be2981a..35c133bd 100644 --- a/src/Traits/Timers.php +++ b/src/Traits/Timers.php @@ -42,6 +42,7 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa ->addSeconds($seconds); if (self::isProbing()) { + self::markProbePendingBeforeMatch(); ++self::$context->index; return (new Deferred())->promise(); } @@ -98,6 +99,10 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa )->delay($delay); } + if (self::isProbing()) { + self::markProbePendingBeforeMatch(); + } + ++self::$context->index; return (new Deferred())->promise(); } diff --git a/src/Traits/Versions.php b/src/Traits/Versions.php index 7d841fa1..c9655a7e 100644 --- a/src/Traits/Versions.php +++ b/src/Traits/Versions.php @@ -35,6 +35,7 @@ public static function getVersion( } if (self::isProbing()) { + self::markProbePendingBeforeMatch(); ++self::$context->index; return (new Deferred())->promise(); } diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 095e294b..8aeacd56 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -198,11 +198,25 @@ public static function markProbeMatched(): void self::$context->probeMatched = true; } + public static function markProbePendingBeforeMatch(): void + { + if (! self::isProbing() || self::probeMatched()) { + return; + } + + self::$context->probePendingBeforeMatch = true; + } + public static function probeMatched(): bool { return (bool) (self::getContext()->probeMatched ?? false); } + public static function probePendingBeforeMatch(): bool + { + return (bool) (self::getContext()->probePendingBeforeMatch ?? false); + } + public function id() { return $this->storedWorkflow->id; diff --git a/tests/Unit/ActivityStubTest.php b/tests/Unit/ActivityStubTest.php index 1e1d596a..20e54ff5 100644 --- a/tests/Unit/ActivityStubTest.php +++ b/tests/Unit/ActivityStubTest.php @@ -201,6 +201,7 @@ public function testDoesNotMarkProbeMatchedForForeignStoredException(): void }); $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); $this->assertFalse(WorkflowStub::probeMatched()); $this->assertSame(2, WorkflowStub::getContext()->index); } diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index e990f2eb..f18ff203 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -242,6 +242,7 @@ public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredChildWorkflo }); $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); $this->assertSame(1, WorkflowStub::getContext()->index); } diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 592b4258..f32cd23a 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -147,10 +147,10 @@ public function testProbeReplayShortCircuitsWhenWorkflowClassIsInvalid(): void 'code' => 0, ], connection: 'redis', queue: 'default'); - $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); $method->setAccessible(true); - $this->assertTrue($method->invoke($exception)); + $this->assertSame('persist', $method->invoke($exception)); } public function testProbeReplayShortCircuitsWhenWorkflowClassDoesNotExist(): void @@ -165,10 +165,10 @@ public function testProbeReplayShortCircuitsWhenWorkflowClassDoesNotExist(): voi 'code' => 0, ], connection: 'redis', queue: 'default'); - $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); $method->setAccessible(true); - $this->assertTrue($method->invoke($exception)); + $this->assertSame('persist', $method->invoke($exception)); } public function testProbeReplayShortCircuitsWhenWorkflowClassIsNotAWorkflow(): void @@ -183,10 +183,10 @@ public function testProbeReplayShortCircuitsWhenWorkflowClassIsNotAWorkflow(): v 'code' => 0, ], connection: 'redis', queue: 'default'); - $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); $method->setAccessible(true); - $this->assertTrue($method->invoke($exception)); + $this->assertSame('persist', $method->invoke($exception)); } public function testProbeReplayShortCircuitsWhenWorkflowClassIsNotInstantiable(): void @@ -201,10 +201,10 @@ public function testProbeReplayShortCircuitsWhenWorkflowClassIsNotInstantiable() 'code' => 0, ], connection: 'redis', queue: 'default'); - $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); $method->setAccessible(true); - $this->assertTrue($method->invoke($exception)); + $this->assertSame('persist', $method->invoke($exception)); } public function testProbeReplayShortCircuitsWhenWorkflowClassAutoloadThrows(): void @@ -228,10 +228,10 @@ public function testProbeReplayShortCircuitsWhenWorkflowClassAutoloadThrows(): v spl_autoload_register($autoload); try { - $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); $method->setAccessible(true); - $this->assertTrue($method->invoke($exception)); + $this->assertSame('persist', $method->invoke($exception)); } finally { spl_autoload_unregister($autoload); } @@ -270,6 +270,31 @@ public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void $this->assertSame(1, $storedWorkflow->logs()->count()); } + public function testRetriesWriteWhenProbeMatchesAfterEarlierPendingWork(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $job = Mockery::mock(JobContract::class); + $job->shouldReceive('release') + ->once() + ->with(1); + + $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'child failed: child-2', + 'code' => 0, + ], sourceClass: TestProbeChildFailureWorkflow::class); + $exception->setJob($job); + $exception->handle(); + + $this->assertFalse($storedWorkflow->fresh()->hasLogByIndex(1)); + } + public function testPersistsWriteWhenProbeReachesCandidateException(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeBackToBackWorkflow::class)->id()); diff --git a/tests/Unit/Traits/AwaitsTest.php b/tests/Unit/Traits/AwaitsTest.php index 1759143e..400d90af 100644 --- a/tests/Unit/Traits/AwaitsTest.php +++ b/tests/Unit/Traits/AwaitsTest.php @@ -128,6 +128,7 @@ public function testDefersWhenProbing(): void $this->assertFalse($conditionEvaluated); $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); $this->assertSame(0, $workflow->logs()->count()); $this->assertSame(1, WorkflowStub::getContext()->index); } diff --git a/tests/Unit/Traits/SideEffectsTest.php b/tests/Unit/Traits/SideEffectsTest.php index dfbe0a3a..db11844c 100644 --- a/tests/Unit/Traits/SideEffectsTest.php +++ b/tests/Unit/Traits/SideEffectsTest.php @@ -114,6 +114,7 @@ public function testDefersWhenProbing(): void $this->assertFalse($callableEvaluated); $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); $this->assertSame(0, $workflow->logs()->count()); $this->assertSame(1, WorkflowStub::getContext()->index); } diff --git a/tests/Unit/Traits/TimersTest.php b/tests/Unit/Traits/TimersTest.php index cce36994..947914a6 100644 --- a/tests/Unit/Traits/TimersTest.php +++ b/tests/Unit/Traits/TimersTest.php @@ -267,6 +267,7 @@ public function testTimerReturnsUnresolvedPromiseWhenProbingAndNoTimer(): void $this->assertNull($result); $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); $this->assertSame(0, $workflow->logs()->count()); $this->assertDatabaseMissing('workflow_timers', [ 'stored_workflow_id' => $workflow->id(), @@ -274,6 +275,41 @@ public function testTimerReturnsUnresolvedPromiseWhenProbingAndNoTimer(): void ]); } + public function testTimerMarksProbePendingBeforeMatchWhenStoredTimerHasNotElapsed(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->timers() + ->create([ + 'index' => 0, + 'stop_at' => now() + ->addMinute(), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::timer('1 minute') + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertSame(0, $workflow->logs()->count()); + } + public function testTimerCapsDelayForSqsDriver(): void { Bus::fake(); diff --git a/tests/Unit/Traits/VersionsTest.php b/tests/Unit/Traits/VersionsTest.php index d5425b49..f3174456 100644 --- a/tests/Unit/Traits/VersionsTest.php +++ b/tests/Unit/Traits/VersionsTest.php @@ -211,6 +211,7 @@ public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredVersion(): v }); $this->assertNull($result); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); $this->assertSame(1, WorkflowStub::getContext()->index); $this->assertSame(0, $workflow->logs()->count()); } diff --git a/tests/Unit/WorkflowStubTest.php b/tests/Unit/WorkflowStubTest.php index 4188ddfd..03290159 100644 --- a/tests/Unit/WorkflowStubTest.php +++ b/tests/Unit/WorkflowStubTest.php @@ -246,24 +246,30 @@ public function testProbeHelpers(): void $this->assertNull(WorkflowStub::probeIndex()); $this->assertNull(WorkflowStub::probeClass()); $this->assertFalse(WorkflowStub::probeMatched()); + $this->assertFalse(WorkflowStub::probePendingBeforeMatch()); WorkflowStub::markProbeMatched(); + WorkflowStub::markProbePendingBeforeMatch(); $this->assertFalse(WorkflowStub::probeMatched()); + $this->assertFalse(WorkflowStub::probePendingBeforeMatch()); WorkflowStub::setContext([ 'probing' => true, 'probeIndex' => 7, 'probeClass' => TestWorkflow::class, 'probeMatched' => false, + 'probePendingBeforeMatch' => false, ]); + WorkflowStub::markProbePendingBeforeMatch(); WorkflowStub::markProbeMatched(); $this->assertTrue(WorkflowStub::isProbing()); $this->assertSame(7, WorkflowStub::probeIndex()); $this->assertSame(TestWorkflow::class, WorkflowStub::probeClass()); $this->assertTrue(WorkflowStub::probeMatched()); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); } finally { $contextProperty->setValue($previousContext); } From 9bcd713407e78489e6f84dc230740cfa43cf64a2 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sun, 5 Apr 2026 00:27:27 +0000 Subject: [PATCH 21/26] Preserve probe pending state across replay --- src/Workflow.php | 1 + tests/Unit/WorkflowTest.php | 38 +++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/src/Workflow.php b/src/Workflow.php index a45cd17f..eebefb91 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -323,6 +323,7 @@ private function setContext(array $context): void $context['probeIndex'] = $existingContext->probeIndex ?? null; $context['probeClass'] = $existingContext->probeClass ?? null; $context['probeMatched'] = $existingContext->probeMatched ?? false; + $context['probePendingBeforeMatch'] = $existingContext->probePendingBeforeMatch ?? false; } WorkflowStub::setContext($context); diff --git a/tests/Unit/WorkflowTest.php b/tests/Unit/WorkflowTest.php index 96b2e247..144e23de 100644 --- a/tests/Unit/WorkflowTest.php +++ b/tests/Unit/WorkflowTest.php @@ -9,6 +9,7 @@ use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Event; use Mockery; +use ReflectionMethod; use Tests\Fixtures\TestActivity; use Tests\Fixtures\TestChildWorkflow; use Tests\Fixtures\TestContinueAsNewWorkflow; @@ -179,6 +180,43 @@ public function testExceptionAlreadyLogged(): void $this->assertSame(1, $storedWorkflow->logs()->count()); } + public function testSetContextPreservesProbePendingBeforeMatch(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->arguments = Serializer::serialize([]); + $storedWorkflow->save(); + + $workflow = new Workflow($storedWorkflow); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 3, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 7, + 'probeClass' => TestActivity::class, + 'probeMatched' => true, + 'probePendingBeforeMatch' => true, + ]); + + $method = new ReflectionMethod(Workflow::class, 'setContext'); + $method->setAccessible(true); + $method->invoke($workflow, [ + 'storedWorkflow' => $storedWorkflow, + 'index' => 4, + 'now' => now(), + 'replaying' => true, + ]); + + $this->assertTrue(WorkflowStub::isProbing()); + $this->assertSame(7, WorkflowStub::probeIndex()); + $this->assertSame(TestActivity::class, WorkflowStub::probeClass()); + $this->assertTrue(WorkflowStub::probeMatched()); + $this->assertTrue(WorkflowStub::probePendingBeforeMatch()); + } + public function testParent(): void { Carbon::setTestNow('2022-01-01'); From f576cc433f7505f5c9b9209906988a4af8c1a905 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sun, 5 Apr 2026 01:53:44 +0000 Subject: [PATCH 22/26] Flatten foreign exception skips in stubs --- src/ActivityStub.php | 47 ++++++++++++++------------ src/ChildWorkflowStub.php | 47 ++++++++++++++------------ tests/Unit/ActivityStubTest.php | 49 ++++++++++++++++++++++++++++ tests/Unit/ChildWorkflowStubTest.php | 49 ++++++++++++++++++++++++++++ 4 files changed, 150 insertions(+), 42 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 4be34562..93a56115 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -29,37 +29,42 @@ public static function make($activity, ...$arguments): PromiseInterface { $context = WorkflowStub::getContext(); - $log = $context->storedWorkflow->findLogByIndex($context->index); + while (true) { + $log = $context->storedWorkflow->findLogByIndex($context->index); - if (WorkflowStub::faked()) { - $mocks = WorkflowStub::mocks(); + if (WorkflowStub::faked()) { + $mocks = WorkflowStub::mocks(); - if (! $log && array_key_exists($activity, $mocks)) { - $result = $mocks[$activity]; + if (! $log && array_key_exists($activity, $mocks)) { + $result = $mocks[$activity]; - $log = $context->storedWorkflow->createLog([ - 'index' => $context->index, - 'now' => $context->now, - 'class' => $activity, - 'result' => Serializer::serialize( - is_callable($result) ? $result($context, ...$arguments) : $result - ), - ]); + $log = $context->storedWorkflow->createLog([ + 'index' => $context->index, + 'now' => $context->now, + 'class' => $activity, + 'result' => Serializer::serialize( + is_callable($result) ? $result($context, ...$arguments) : $result + ), + ]); - WorkflowStub::recordDispatched($activity, $arguments); + WorkflowStub::recordDispatched($activity, $arguments); + } + } + + if (! $log || ! ($log->class === Exception::class && self::isForeignExceptionResult( + Serializer::unserialize($log->result), + $activity + ))) { + break; } + + ++$context->index; + WorkflowStub::setContext($context); } if ($log) { $result = Serializer::unserialize($log->result); - if ($log->class === Exception::class && self::isForeignExceptionResult($result, $activity)) { - ++$context->index; - WorkflowStub::setContext($context); - - return self::make($activity, ...$arguments); - } - if ( WorkflowStub::isProbing() && WorkflowStub::probeIndex() === $context->index diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 6333520c..25b3f97d 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -23,37 +23,42 @@ public static function make($workflow, ...$arguments): PromiseInterface { $context = WorkflowStub::getContext(); - $log = $context->storedWorkflow->findLogByIndex($context->index); + while (true) { + $log = $context->storedWorkflow->findLogByIndex($context->index); - if (WorkflowStub::faked()) { - $mocks = WorkflowStub::mocks(); + if (WorkflowStub::faked()) { + $mocks = WorkflowStub::mocks(); - if (! $log && array_key_exists($workflow, $mocks)) { - $result = $mocks[$workflow]; + if (! $log && array_key_exists($workflow, $mocks)) { + $result = $mocks[$workflow]; - $log = $context->storedWorkflow->createLog([ - 'index' => $context->index, - 'now' => $context->now, - 'class' => $workflow, - 'result' => Serializer::serialize( - is_callable($result) ? $result($context, ...$arguments) : $result - ), - ]); + $log = $context->storedWorkflow->createLog([ + 'index' => $context->index, + 'now' => $context->now, + 'class' => $workflow, + 'result' => Serializer::serialize( + is_callable($result) ? $result($context, ...$arguments) : $result + ), + ]); - WorkflowStub::recordDispatched($workflow, $arguments); + WorkflowStub::recordDispatched($workflow, $arguments); + } + } + + if (! $log || ! ($log->class === Exception::class && self::isForeignExceptionResult( + Serializer::unserialize($log->result), + $workflow + ))) { + break; } + + ++$context->index; + WorkflowStub::setContext($context); } if ($log) { $result = Serializer::unserialize($log->result); - if ($log->class === Exception::class && self::isForeignExceptionResult($result, $workflow)) { - ++$context->index; - WorkflowStub::setContext($context); - - return self::make($workflow, ...$arguments); - } - if ( WorkflowStub::isProbing() && WorkflowStub::probeIndex() === $context->index diff --git a/tests/Unit/ActivityStubTest.php b/tests/Unit/ActivityStubTest.php index 20e54ff5..ec970de2 100644 --- a/tests/Unit/ActivityStubTest.php +++ b/tests/Unit/ActivityStubTest.php @@ -162,6 +162,55 @@ public function testSkipsStoredExceptionForDifferentSourceClass(): void $this->assertSame(2, WorkflowStub::getContext()->index); } + public function testSkipsMultipleStoredExceptionsForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign-1', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign-2', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 2, + 'now' => WorkflowStub::now(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('test'), + ]); + + ActivityStub::make(TestActivity::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(3, WorkflowStub::getContext()->index); + } + public function testDoesNotMarkProbeMatchedForForeignStoredException(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index f18ff203..9de6e8c1 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -131,6 +131,55 @@ public function testSkipsStoredExceptionForDifferentSourceClass(): void $this->assertSame(2, WorkflowStub::getContext()->index); } + public function testSkipsMultipleStoredExceptionsForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child 1', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child 2', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 2, + 'now' => WorkflowStub::now(), + 'class' => TestChildWorkflow::class, + 'result' => Serializer::serialize('test'), + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(3, WorkflowStub::getContext()->index); + } + public function testMarksProbeMatchedForMatchingStoredException(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); From e5082f9ebabcf2948a9b6654ddc4c278ce0ba844 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sun, 5 Apr 2026 02:17:52 +0000 Subject: [PATCH 23/26] Normalize probe replay now context --- src/Exception.php | 3 +- tests/Fixtures/TestProbeNowSignalWorkflow.php | 30 +++++++++++++++++++ tests/Unit/ExceptionTest.php | 30 +++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 tests/Fixtures/TestProbeNowSignalWorkflow.php diff --git a/src/Exception.php b/src/Exception.php index 5b3a63e4..f8459737 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -10,6 +10,7 @@ use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Cache; use ReflectionClass; use Throwable; @@ -138,7 +139,7 @@ private function probeReplayDecision(): string WorkflowStub::setContext([ 'storedWorkflow' => $tentativeWorkflow, 'index' => 0, - 'now' => $this->now, + 'now' => Carbon::parse($this->now), 'replaying' => true, 'probing' => true, 'probeIndex' => $this->index, diff --git a/tests/Fixtures/TestProbeNowSignalWorkflow.php b/tests/Fixtures/TestProbeNowSignalWorkflow.php new file mode 100644 index 00000000..8a3f65ed --- /dev/null +++ b/tests/Fixtures/TestProbeNowSignalWorkflow.php @@ -0,0 +1,30 @@ +id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + $storedWorkflow->signals() + ->create([ + 'method' => 'recordNowType', + 'arguments' => Serializer::serialize([]), + ]); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'probe now type', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); + $method->setAccessible(true); + $method->invoke($exception); + + $this->assertTrue(TestProbeNowSignalWorkflow::$signalSawCarbonNow); + } + public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); From 297d06a873415ed41df70fe23fa4cd31cc30598a Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sun, 5 Apr 2026 02:44:37 +0000 Subject: [PATCH 24/26] Harden exception retry backoff and probe parsing --- src/Exception.php | 10 ++++++++-- tests/Unit/ExceptionTest.php | 22 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/Exception.php b/src/Exception.php index f8459737..8d38215e 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -102,7 +102,7 @@ public function middleware() new WithoutOverlappingMiddleware( $this->storedWorkflow->id, WithoutOverlappingMiddleware::WORKFLOW, - 0, + self::LOCK_RETRY_DELAY, 15 ), ]; @@ -132,6 +132,12 @@ private function probeReplayDecision(): string $probeDecision = self::PROBE_SKIP; try { + try { + $probeNow = Carbon::parse($this->now); + } catch (Throwable) { + return self::PROBE_PERSIST; + } + $tentativeWorkflow = $this->createTentativeWorkflowState(); $workflow = new $workflowClass($tentativeWorkflow, ...$tentativeWorkflow->workflowArguments()); $workflow->replaying = true; @@ -139,7 +145,7 @@ private function probeReplayDecision(): string WorkflowStub::setContext([ 'storedWorkflow' => $tentativeWorkflow, 'index' => 0, - 'now' => Carbon::parse($this->now), + 'now' => $probeNow, 'replaying' => true, 'probing' => true, 'probeIndex' => $this->index, diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 9a8cf431..9df287de 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -44,6 +44,7 @@ public function testMiddleware(): void $this->assertCount(1, $middleware); $this->assertSame(WithoutOverlappingMiddleware::class, $middleware[0]::class); $this->assertSame(WithoutOverlappingMiddleware::WORKFLOW, $middleware[0]->type); + $this->assertSame(1, $middleware[0]->releaseAfter); $this->assertSame(15, $middleware[0]->expiresAfter); } @@ -267,6 +268,27 @@ public function testProbeReplayUsesCarbonNowBeforeWorkflowHandleResetsContext(): $this->assertTrue(TestProbeNowSignalWorkflow::$signalSawCarbonNow); } + public function testProbeReplayPersistsWhenNowCannotBeParsed(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $exception = new Exception(0, 'not-a-timestamp', $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'bad now', + 'code' => 0, + ], connection: 'redis', queue: 'default'); + + $method = new ReflectionMethod(Exception::class, 'probeReplayDecision'); + $method->setAccessible(true); + + $this->assertSame('persist', $method->invoke($exception)); + } + public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); From 26d287e328988f4d156bf2dd2e3ffcbff3db0e3b Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sun, 5 Apr 2026 03:08:49 +0000 Subject: [PATCH 25/26] Trim probe replay overhead --- src/ActivityStub.php | 19 ++++++++++++++----- src/ChildWorkflowStub.php | 19 ++++++++++++++----- src/Exception.php | 1 - 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 93a56115..5c041527 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -28,9 +28,11 @@ public static function async(callable $callback): PromiseInterface public static function make($activity, ...$arguments): PromiseInterface { $context = WorkflowStub::getContext(); + $result = null; while (true) { $log = $context->storedWorkflow->findLogByIndex($context->index); + $result = null; if (WorkflowStub::faked()) { $mocks = WorkflowStub::mocks(); @@ -51,10 +53,17 @@ public static function make($activity, ...$arguments): PromiseInterface } } - if (! $log || ! ($log->class === Exception::class && self::isForeignExceptionResult( - Serializer::unserialize($log->result), - $activity - ))) { + if (! $log) { + break; + } + + if ($log->class !== Exception::class) { + break; + } + + $result = Serializer::unserialize($log->result); + + if (! self::isForeignExceptionResult($result, $activity)) { break; } @@ -63,7 +72,7 @@ public static function make($activity, ...$arguments): PromiseInterface } if ($log) { - $result = Serializer::unserialize($log->result); + $result ??= Serializer::unserialize($log->result); if ( WorkflowStub::isProbing() diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 25b3f97d..8fa5cc6b 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -22,9 +22,11 @@ public static function all(iterable $promises): PromiseInterface public static function make($workflow, ...$arguments): PromiseInterface { $context = WorkflowStub::getContext(); + $result = null; while (true) { $log = $context->storedWorkflow->findLogByIndex($context->index); + $result = null; if (WorkflowStub::faked()) { $mocks = WorkflowStub::mocks(); @@ -45,10 +47,17 @@ public static function make($workflow, ...$arguments): PromiseInterface } } - if (! $log || ! ($log->class === Exception::class && self::isForeignExceptionResult( - Serializer::unserialize($log->result), - $workflow - ))) { + if (! $log) { + break; + } + + if ($log->class !== Exception::class) { + break; + } + + $result = Serializer::unserialize($log->result); + + if (! self::isForeignExceptionResult($result, $workflow)) { break; } @@ -57,7 +66,7 @@ public static function make($workflow, ...$arguments): PromiseInterface } if ($log) { - $result = Serializer::unserialize($log->result); + $result ??= Serializer::unserialize($log->result); if ( WorkflowStub::isProbing() diff --git a/src/Exception.php b/src/Exception.php index 8d38215e..e5e2bea4 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -195,7 +195,6 @@ private function createTentativeWorkflowState(): StoredWorkflow 'logs', $tentativeWorkflow->getRelation('logs') ->push($tentativeLog) - ->sortBy(static fn ($log): string => sprintf('%020d:%020d', $log->index, $log->id ?? PHP_INT_MAX)) ->values() ); From 8625db804b053142dfc4c0fce1a7c975ad565bb9 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Sun, 5 Apr 2026 03:32:15 +0000 Subject: [PATCH 26/26] Fix fake callable replay results --- src/ActivityStub.php | 4 ++-- src/ChildWorkflowStub.php | 4 ++-- tests/Unit/WorkflowFakerTest.php | 19 +++++++++++++++++++ 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 5c041527..2bc9b259 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -38,14 +38,14 @@ public static function make($activity, ...$arguments): PromiseInterface $mocks = WorkflowStub::mocks(); if (! $log && array_key_exists($activity, $mocks)) { - $result = $mocks[$activity]; + $mockedResult = $mocks[$activity]; $log = $context->storedWorkflow->createLog([ 'index' => $context->index, 'now' => $context->now, 'class' => $activity, 'result' => Serializer::serialize( - is_callable($result) ? $result($context, ...$arguments) : $result + is_callable($mockedResult) ? $mockedResult($context, ...$arguments) : $mockedResult ), ]); diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 8fa5cc6b..b2f2d880 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -32,14 +32,14 @@ public static function make($workflow, ...$arguments): PromiseInterface $mocks = WorkflowStub::mocks(); if (! $log && array_key_exists($workflow, $mocks)) { - $result = $mocks[$workflow]; + $mockedResult = $mocks[$workflow]; $log = $context->storedWorkflow->createLog([ 'index' => $context->index, 'now' => $context->now, 'class' => $workflow, 'result' => Serializer::serialize( - is_callable($result) ? $result($context, ...$arguments) : $result + is_callable($mockedResult) ? $mockedResult($context, ...$arguments) : $mockedResult ), ]); diff --git a/tests/Unit/WorkflowFakerTest.php b/tests/Unit/WorkflowFakerTest.php index bad1e792..03fe51ba 100644 --- a/tests/Unit/WorkflowFakerTest.php +++ b/tests/Unit/WorkflowFakerTest.php @@ -71,6 +71,25 @@ public function testParentWorkflow(): void $this->assertSame($workflow->output(), 'workflow_activity_other_activity'); } + public function testParentWorkflowWithCallableChildMock(): void + { + WorkflowStub::fake(); + + WorkflowStub::mock(TestActivity::class, 'activity'); + + WorkflowStub::mock(TestChildWorkflow::class, static function ($context) { + return 'other_activity'; + }); + + $workflow = WorkflowStub::make(TestParentWorkflow::class); + $workflow->start(); + + WorkflowStub::assertDispatchedTimes(TestActivity::class, 1); + WorkflowStub::assertDispatchedTimes(TestChildWorkflow::class, 1); + + $this->assertSame($workflow->output(), 'workflow_activity_other_activity'); + } + public function testConcurrentWorkflow(): void { WorkflowStub::fake();