From 9a8e7d544e037bdc78bae3f7839e061f3421bbf9 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Mon, 4 May 2026 18:03:40 +0200 Subject: [PATCH 1/2] fix: support EventStreamEmitter inside #[ProjectionFlush] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Calling EventStreamEmitter::emit() / linkTo() from a #[ProjectionFlush] method threw "Header projection.name does not exists" because the flush dispatch in EcotoneProjectorExecutor skipped the header context that project() sets up (projection.name, projection.live, streamBasedSourced) and never wrapped the dispatch in MessageHeadersPropagatorInterceptor. flush() now mirrors project()'s header context — projection.name stays enterprise-gated via withProjectionName() — and wraps the dispatch in storeHeaders() so EventStreamEmitter's PropagateHeaders gateway can merge those headers into outbound emit/linkTo calls. ProjectorExecutor gains an isRebuilding flag so projection.live is set to false during rebuild, suppressing flush-time emits via the existing live-filter. --- .../Projecting/EcotoneProjectorExecutor.php | 26 ++- .../EventStoreChannelAdapterProjection.php | 2 +- .../Projecting/InMemory/InMemoryProjector.php | 2 +- .../src/Projecting/ProjectingManager.php | 2 +- .../src/Projecting/ProjectorExecutor.php | 2 +- .../EmittingEventsProjectionTest.php | 205 ++++++++++++++++++ 6 files changed, 230 insertions(+), 9 deletions(-) diff --git a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php index a1be9c145..10eb22f1f 100644 --- a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php @@ -80,13 +80,29 @@ public function delete(): void } } - public function flush(mixed $userState = null): void + public function flush(mixed $userState = null, bool $isRebuilding = false): void { - if ($this->flushChannel) { - $this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([ - ProjectingHeaders::PROJECTION_STATE => $userState, - ]), $this->flushChannel); + if (! $this->flushChannel) { + return; } + + $headers = $this->withProjectionName([ + ProjectingHeaders::PROJECTION_STATE => $userState, + ProjectingHeaders::PROJECTION_LIVE => $this->isLive && ! $isRebuilding, + MessageHeaders::STREAM_BASED_SOURCED => true, + ]); + + $requestMessage = MessageBuilder::withPayload([]) + ->setMultipleHeaders($headers) + ->build(); + + $flushChannel = $this->flushChannel; + $this->messageHeadersPropagatorInterceptor->storeHeaders( + function () use ($headers, $flushChannel): void { + $this->messagingEntrypoint->sendWithHeaders([], $headers, $flushChannel); + }, + $requestMessage + ); } public function reset(?string $partitionKey = null): void diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php index 6bd3e2472..d736423d6 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php @@ -73,7 +73,7 @@ public function delete(): void // No deletion needed } - public function flush(mixed $userState = null): void + public function flush(mixed $userState = null, bool $isRebuilding = false): void { // No flushing needed } diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php index c14abaaed..eab99aba8 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php @@ -40,7 +40,7 @@ public function delete(): void $this->projectedEvents = []; } - public function flush(mixed $userState = null): void + public function flush(mixed $userState = null, bool $isRebuilding = false): void { } diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index fed41d73d..cddf75c1e 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -102,7 +102,7 @@ public function executePartitionBatch(?string $partitionKeyValue = null, bool $c $batchProcessedEvents++; } if ($batchProcessedEvents > 0) { - $this->projectorExecutor->flush($userState); + $this->projectorExecutor->flush($userState, $shouldReset); } $totalProcessedEvents += $batchProcessedEvents; diff --git a/packages/Ecotone/src/Projecting/ProjectorExecutor.php b/packages/Ecotone/src/Projecting/ProjectorExecutor.php index 7918f6fb8..ebc419f25 100644 --- a/packages/Ecotone/src/Projecting/ProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/ProjectorExecutor.php @@ -18,6 +18,6 @@ interface ProjectorExecutor public function project(Event $event, mixed $userState = null, bool $isRebuilding = false): mixed; public function init(): void; public function delete(): void; - public function flush(mixed $userState = null): void; + public function flush(mixed $userState = null, bool $isRebuilding = false): void; public function reset(?string $partitionKey = null): void; } diff --git a/packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php index 1060b8955..16d0d66c0 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php @@ -7,21 +7,26 @@ namespace Test\Ecotone\EventSourcing\Projecting\Partitioned; +use Ecotone\EventSourcing\Attribute\FromAggregateStream; use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\ProjectionDelete; use Ecotone\EventSourcing\Attribute\ProjectionInitialization; use Ecotone\EventSourcing\Attribute\ProjectionReset; +use Ecotone\EventSourcing\Attribute\ProjectionState; use Ecotone\EventSourcing\EventStore; use Ecotone\EventSourcing\EventStreamEmitter; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Attribute\Parameter\Reference; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Support\LicensingException; use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Attribute\QueryHandler; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\ProjectionDeployment; +use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; +use Ecotone\Projecting\ProjectionRegistry; use Ecotone\Test\LicenceTesting; use Enqueue\Dbal\DbalConnectionFactory; @@ -359,4 +364,204 @@ public function delete(#[Reference] EventStore $eventStore): void } }; } + + public function test_partitioned_projection_flush_emits_events_using_event_stream_emitter(): void + { + $projection = $this->createFlushEmittingProjection(); + + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class], + containerOrAvailableServices: [ + $projection, + new TicketEventConverter(), + new TicketListUpdatedConverter(), + DbalConnectionFactory::class => $this->getConnectionFactory(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone + ->sendCommand(new RegisterTicket('1', 'Johnny', 'alert')) + ->sendCommand(new RegisterTicket('2', 'Jane', 'info')); + + $eventStore = $ecotone->getGateway(EventStore::class); + $emittedEvents = $eventStore->load('projection_flush_emitting_projection'); + + self::assertCount(2, $emittedEvents); + } + + public function test_rebuild_should_not_emit_events_from_flush_method(): void + { + $projection = $this->createFlushEmittingProjection(); + + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class], + containerOrAvailableServices: [ + $projection, + new TicketEventConverter(), + new TicketListUpdatedConverter(), + DbalConnectionFactory::class => $this->getConnectionFactory(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone + ->sendCommand(new RegisterTicket('1', 'Johnny', 'alert')) + ->sendCommand(new RegisterTicket('2', 'Jane', 'info')); + + $eventStore = $ecotone->getGateway(EventStore::class); + self::assertCount(2, $eventStore->load('projection_flush_emitting_projection')); + + $ecotone->resetProjection('flush_emitting_projection'); + self::assertEmpty($projection->getTickets()); + $emittedCountBeforeRebuild = $eventStore->hasStream('projection_flush_emitting_projection') + ? count($eventStore->load('projection_flush_emitting_projection')) + : 0; + + $ecotone->getGateway(ProjectionRegistry::class)->get('flush_emitting_projection')->prepareRebuild(); + + self::assertNotEmpty($projection->getTickets()); + $emittedCountAfterRebuild = $eventStore->hasStream('projection_flush_emitting_projection') + ? count($eventStore->load('projection_flush_emitting_projection')) + : 0; + self::assertSame($emittedCountBeforeRebuild, $emittedCountAfterRebuild); + } + + public function test_global_flush_with_projection_state_requires_enterprise_licence(): void + { + $projection = new #[ProjectionV2('global_flush_state_projection'), \Ecotone\EventSourcing\Attribute\FromStream(Ticket::class)] class () { + #[EventHandler(endpointId: 'globalFlushStateProjection.addTicket')] + public function addTicket(TicketWasRegistered $event, #[ProjectionState] array $ticket = []): array + { + $ticket['ticketId'] = $event->getTicketId(); + return $ticket; + } + + #[ProjectionFlush] + public function flush(#[ProjectionState] array $ticket, EventStreamEmitter $emitter): void + { + if (! isset($ticket['ticketId'])) { + return; + } + $emitter->emit([new TicketListUpdated($ticket['ticketId'])]); + } + }; + + $this->expectException(LicensingException::class); + $this->expectExceptionMessage('Using #[ProjectionState] in #[ProjectionFlush] methods requires Ecotone Enterprise licence.'); + + EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class], + containerOrAvailableServices: [ + $projection, + new TicketEventConverter(), + new TicketListUpdatedConverter(), + DbalConnectionFactory::class => $this->getConnectionFactory(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true, + ); + } + + public function test_partitioned_flush_emitter_pattern_requires_enterprise_licence(): void + { + $projection = $this->createFlushEmittingProjection(); + + $this->expectException(LicensingException::class); + $this->expectExceptionMessageMatches('/Enterprise licence/'); + + EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class], + containerOrAvailableServices: [ + $projection, + new TicketEventConverter(), + new TicketListUpdatedConverter(), + DbalConnectionFactory::class => $this->getConnectionFactory(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true, + ); + } + + private function createFlushEmittingProjection(): object + { + return new #[ProjectionV2('flush_emitting_projection'), Partitioned, FromAggregateStream(Ticket::class)] class () { + public array $tickets = []; + + #[EventHandler(endpointId: 'flushEmittingProjection.addTicket')] + public function addTicket(TicketWasRegistered $event, #[ProjectionState] array $ticket = []): array + { + $ticket['ticketId'] = $event->getTicketId(); + $ticket['status'] = 'open'; + $this->tickets[$event->getTicketId()] = $ticket; + return $ticket; + } + + #[EventHandler(endpointId: 'flushEmittingProjection.closeTicket')] + public function closeTicket(TicketWasClosed $event, #[ProjectionState] array $ticket): array + { + $ticket['status'] = 'closed'; + $this->tickets[$event->getTicketId()] = $ticket; + return $ticket; + } + + #[ProjectionFlush] + public function flush(#[ProjectionState] array $ticket, EventStreamEmitter $emitter): void + { + if (! isset($ticket['ticketId'])) { + return; + } + + $emitter->emit([new TicketListUpdated($ticket['ticketId'])]); + } + + #[QueryHandler('getFlushEmittingProjectionTickets')] + public function getTickets(): array + { + return $this->tickets; + } + + #[ProjectionReset] + public function reset(#[Reference] EventStore $eventStore): void + { + $this->tickets = []; + if ($eventStore->hasStream('projection_flush_emitting_projection')) { + $eventStore->delete('projection_flush_emitting_projection'); + } + } + + #[ProjectionDelete] + public function delete(#[Reference] EventStore $eventStore): void + { + $this->tickets = []; + if ($eventStore->hasStream('projection_flush_emitting_projection')) { + $eventStore->delete('projection_flush_emitting_projection'); + } + } + }; + } } From f655226954f5275308a7f8336e3695e23f9ef77d Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Mon, 4 May 2026 18:08:14 +0200 Subject: [PATCH 2/2] refactor: make ProjectorExecutor::flush parameters required MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop the default values on $userState and $isRebuilding so callers must explicitly pass both — the rebuild flag in particular should always come from the surrounding flow rather than silently defaulting to live mode. --- packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php | 2 +- .../EventStoreAdapter/EventStoreChannelAdapterProjection.php | 2 +- packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php | 2 +- packages/Ecotone/src/Projecting/ProjectorExecutor.php | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php index 10eb22f1f..14ff7ceb5 100644 --- a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php @@ -80,7 +80,7 @@ public function delete(): void } } - public function flush(mixed $userState = null, bool $isRebuilding = false): void + public function flush(mixed $userState, bool $isRebuilding): void { if (! $this->flushChannel) { return; diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php index d736423d6..5fef7ab3a 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php @@ -73,7 +73,7 @@ public function delete(): void // No deletion needed } - public function flush(mixed $userState = null, bool $isRebuilding = false): void + public function flush(mixed $userState, bool $isRebuilding): void { // No flushing needed } diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php index eab99aba8..8aa7f8945 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php @@ -40,7 +40,7 @@ public function delete(): void $this->projectedEvents = []; } - public function flush(mixed $userState = null, bool $isRebuilding = false): void + public function flush(mixed $userState, bool $isRebuilding): void { } diff --git a/packages/Ecotone/src/Projecting/ProjectorExecutor.php b/packages/Ecotone/src/Projecting/ProjectorExecutor.php index ebc419f25..96d64a1d7 100644 --- a/packages/Ecotone/src/Projecting/ProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/ProjectorExecutor.php @@ -18,6 +18,6 @@ interface ProjectorExecutor public function project(Event $event, mixed $userState = null, bool $isRebuilding = false): mixed; public function init(): void; public function delete(): void; - public function flush(mixed $userState = null, bool $isRebuilding = false): void; + public function flush(mixed $userState, bool $isRebuilding): void; public function reset(?string $partitionKey = null): void; }