diff --git a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php index a1be9c145..14ff7ceb5 100644 --- a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php @@ -80,13 +80,29 @@ public function delete(): void } } - public function flush(mixed $userState = null): void + public function flush(mixed $userState, bool $isRebuilding): void { - if ($this->flushChannel) { - $this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([ - ProjectingHeaders::PROJECTION_STATE => $userState, - ]), $this->flushChannel); + if (! $this->flushChannel) { + return; } + + $headers = $this->withProjectionName([ + ProjectingHeaders::PROJECTION_STATE => $userState, + ProjectingHeaders::PROJECTION_LIVE => $this->isLive && ! $isRebuilding, + MessageHeaders::STREAM_BASED_SOURCED => true, + ]); + + $requestMessage = MessageBuilder::withPayload([]) + ->setMultipleHeaders($headers) + ->build(); + + $flushChannel = $this->flushChannel; + $this->messageHeadersPropagatorInterceptor->storeHeaders( + function () use ($headers, $flushChannel): void { + $this->messagingEntrypoint->sendWithHeaders([], $headers, $flushChannel); + }, + $requestMessage + ); } public function reset(?string $partitionKey = null): void diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php index 6bd3e2472..5fef7ab3a 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php @@ -73,7 +73,7 @@ public function delete(): void // No deletion needed } - public function flush(mixed $userState = null): void + public function flush(mixed $userState, bool $isRebuilding): void { // No flushing needed } diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php index c14abaaed..8aa7f8945 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php @@ -40,7 +40,7 @@ public function delete(): void $this->projectedEvents = []; } - public function flush(mixed $userState = null): void + public function flush(mixed $userState, bool $isRebuilding): void { } diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index fed41d73d..cddf75c1e 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -102,7 +102,7 @@ public function executePartitionBatch(?string $partitionKeyValue = null, bool $c $batchProcessedEvents++; } if ($batchProcessedEvents > 0) { - $this->projectorExecutor->flush($userState); + $this->projectorExecutor->flush($userState, $shouldReset); } $totalProcessedEvents += $batchProcessedEvents; diff --git a/packages/Ecotone/src/Projecting/ProjectorExecutor.php b/packages/Ecotone/src/Projecting/ProjectorExecutor.php index 7918f6fb8..96d64a1d7 100644 --- a/packages/Ecotone/src/Projecting/ProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/ProjectorExecutor.php @@ -18,6 +18,6 @@ interface ProjectorExecutor public function project(Event $event, mixed $userState = null, bool $isRebuilding = false): mixed; public function init(): void; public function delete(): void; - public function flush(mixed $userState = null): void; + public function flush(mixed $userState, bool $isRebuilding): void; public function reset(?string $partitionKey = null): void; } diff --git a/packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php index 1060b8955..16d0d66c0 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php @@ -7,21 +7,26 @@ namespace Test\Ecotone\EventSourcing\Projecting\Partitioned; +use Ecotone\EventSourcing\Attribute\FromAggregateStream; use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\ProjectionDelete; use Ecotone\EventSourcing\Attribute\ProjectionInitialization; use Ecotone\EventSourcing\Attribute\ProjectionReset; +use Ecotone\EventSourcing\Attribute\ProjectionState; use Ecotone\EventSourcing\EventStore; use Ecotone\EventSourcing\EventStreamEmitter; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Attribute\Parameter\Reference; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Support\LicensingException; use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Attribute\QueryHandler; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\ProjectionDeployment; +use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; +use Ecotone\Projecting\ProjectionRegistry; use Ecotone\Test\LicenceTesting; use Enqueue\Dbal\DbalConnectionFactory; @@ -359,4 +364,204 @@ public function delete(#[Reference] EventStore $eventStore): void } }; } + + public function test_partitioned_projection_flush_emits_events_using_event_stream_emitter(): void + { + $projection = $this->createFlushEmittingProjection(); + + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class], + containerOrAvailableServices: [ + $projection, + new TicketEventConverter(), + new TicketListUpdatedConverter(), + DbalConnectionFactory::class => $this->getConnectionFactory(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone + ->sendCommand(new RegisterTicket('1', 'Johnny', 'alert')) + ->sendCommand(new RegisterTicket('2', 'Jane', 'info')); + + $eventStore = $ecotone->getGateway(EventStore::class); + $emittedEvents = $eventStore->load('projection_flush_emitting_projection'); + + self::assertCount(2, $emittedEvents); + } + + public function test_rebuild_should_not_emit_events_from_flush_method(): void + { + $projection = $this->createFlushEmittingProjection(); + + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class], + containerOrAvailableServices: [ + $projection, + new TicketEventConverter(), + new TicketListUpdatedConverter(), + DbalConnectionFactory::class => $this->getConnectionFactory(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone + ->sendCommand(new RegisterTicket('1', 'Johnny', 'alert')) + ->sendCommand(new RegisterTicket('2', 'Jane', 'info')); + + $eventStore = $ecotone->getGateway(EventStore::class); + self::assertCount(2, $eventStore->load('projection_flush_emitting_projection')); + + $ecotone->resetProjection('flush_emitting_projection'); + self::assertEmpty($projection->getTickets()); + $emittedCountBeforeRebuild = $eventStore->hasStream('projection_flush_emitting_projection') + ? count($eventStore->load('projection_flush_emitting_projection')) + : 0; + + $ecotone->getGateway(ProjectionRegistry::class)->get('flush_emitting_projection')->prepareRebuild(); + + self::assertNotEmpty($projection->getTickets()); + $emittedCountAfterRebuild = $eventStore->hasStream('projection_flush_emitting_projection') + ? count($eventStore->load('projection_flush_emitting_projection')) + : 0; + self::assertSame($emittedCountBeforeRebuild, $emittedCountAfterRebuild); + } + + public function test_global_flush_with_projection_state_requires_enterprise_licence(): void + { + $projection = new #[ProjectionV2('global_flush_state_projection'), \Ecotone\EventSourcing\Attribute\FromStream(Ticket::class)] class () { + #[EventHandler(endpointId: 'globalFlushStateProjection.addTicket')] + public function addTicket(TicketWasRegistered $event, #[ProjectionState] array $ticket = []): array + { + $ticket['ticketId'] = $event->getTicketId(); + return $ticket; + } + + #[ProjectionFlush] + public function flush(#[ProjectionState] array $ticket, EventStreamEmitter $emitter): void + { + if (! isset($ticket['ticketId'])) { + return; + } + $emitter->emit([new TicketListUpdated($ticket['ticketId'])]); + } + }; + + $this->expectException(LicensingException::class); + $this->expectExceptionMessage('Using #[ProjectionState] in #[ProjectionFlush] methods requires Ecotone Enterprise licence.'); + + EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class], + containerOrAvailableServices: [ + $projection, + new TicketEventConverter(), + new TicketListUpdatedConverter(), + DbalConnectionFactory::class => $this->getConnectionFactory(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true, + ); + } + + public function test_partitioned_flush_emitter_pattern_requires_enterprise_licence(): void + { + $projection = $this->createFlushEmittingProjection(); + + $this->expectException(LicensingException::class); + $this->expectExceptionMessageMatches('/Enterprise licence/'); + + EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class], + containerOrAvailableServices: [ + $projection, + new TicketEventConverter(), + new TicketListUpdatedConverter(), + DbalConnectionFactory::class => $this->getConnectionFactory(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true, + ); + } + + private function createFlushEmittingProjection(): object + { + return new #[ProjectionV2('flush_emitting_projection'), Partitioned, FromAggregateStream(Ticket::class)] class () { + public array $tickets = []; + + #[EventHandler(endpointId: 'flushEmittingProjection.addTicket')] + public function addTicket(TicketWasRegistered $event, #[ProjectionState] array $ticket = []): array + { + $ticket['ticketId'] = $event->getTicketId(); + $ticket['status'] = 'open'; + $this->tickets[$event->getTicketId()] = $ticket; + return $ticket; + } + + #[EventHandler(endpointId: 'flushEmittingProjection.closeTicket')] + public function closeTicket(TicketWasClosed $event, #[ProjectionState] array $ticket): array + { + $ticket['status'] = 'closed'; + $this->tickets[$event->getTicketId()] = $ticket; + return $ticket; + } + + #[ProjectionFlush] + public function flush(#[ProjectionState] array $ticket, EventStreamEmitter $emitter): void + { + if (! isset($ticket['ticketId'])) { + return; + } + + $emitter->emit([new TicketListUpdated($ticket['ticketId'])]); + } + + #[QueryHandler('getFlushEmittingProjectionTickets')] + public function getTickets(): array + { + return $this->tickets; + } + + #[ProjectionReset] + public function reset(#[Reference] EventStore $eventStore): void + { + $this->tickets = []; + if ($eventStore->hasStream('projection_flush_emitting_projection')) { + $eventStore->delete('projection_flush_emitting_projection'); + } + } + + #[ProjectionDelete] + public function delete(#[Reference] EventStore $eventStore): void + { + $this->tickets = []; + if ($eventStore->hasStream('projection_flush_emitting_projection')) { + $eventStore->delete('projection_flush_emitting_projection'); + } + } + }; + } }