Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,29 @@ public function delete(): void
}
}

public function flush(mixed $userState = null): void
public function flush(mixed $userState, bool $isRebuilding): void
{
if ($this->flushChannel) {
$this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([
ProjectingHeaders::PROJECTION_STATE => $userState,
]), $this->flushChannel);
if (! $this->flushChannel) {
return;
}

$headers = $this->withProjectionName([
ProjectingHeaders::PROJECTION_STATE => $userState,
ProjectingHeaders::PROJECTION_LIVE => $this->isLive && ! $isRebuilding,
MessageHeaders::STREAM_BASED_SOURCED => true,
]);

$requestMessage = MessageBuilder::withPayload([])
->setMultipleHeaders($headers)
->build();

$flushChannel = $this->flushChannel;
$this->messageHeadersPropagatorInterceptor->storeHeaders(
function () use ($headers, $flushChannel): void {
$this->messagingEntrypoint->sendWithHeaders([], $headers, $flushChannel);
},
$requestMessage
);
}

public function reset(?string $partitionKey = null): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public function delete(): void
// No deletion needed
}

public function flush(mixed $userState = null): void
public function flush(mixed $userState, bool $isRebuilding): void
{
// No flushing needed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public function delete(): void
$this->projectedEvents = [];
}

public function flush(mixed $userState = null): void
public function flush(mixed $userState, bool $isRebuilding): void
{
}

Expand Down
2 changes: 1 addition & 1 deletion packages/Ecotone/src/Projecting/ProjectingManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public function executePartitionBatch(?string $partitionKeyValue = null, bool $c
$batchProcessedEvents++;
}
if ($batchProcessedEvents > 0) {
$this->projectorExecutor->flush($userState);
$this->projectorExecutor->flush($userState, $shouldReset);
}

$totalProcessedEvents += $batchProcessedEvents;
Expand Down
2 changes: 1 addition & 1 deletion packages/Ecotone/src/Projecting/ProjectorExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ interface ProjectorExecutor
public function project(Event $event, mixed $userState = null, bool $isRebuilding = false): mixed;
public function init(): void;
public function delete(): void;
public function flush(mixed $userState = null): void;
public function flush(mixed $userState, bool $isRebuilding): void;
public function reset(?string $partitionKey = null): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,26 @@

namespace Test\Ecotone\EventSourcing\Projecting\Partitioned;

use Ecotone\EventSourcing\Attribute\FromAggregateStream;
use Ecotone\EventSourcing\Attribute\FromStream;
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
use Ecotone\EventSourcing\Attribute\ProjectionReset;
use Ecotone\EventSourcing\Attribute\ProjectionState;
use Ecotone\EventSourcing\EventStore;
use Ecotone\EventSourcing\EventStreamEmitter;
use Ecotone\Lite\EcotoneLite;
use Ecotone\Messaging\Attribute\Parameter\Reference;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Support\LicensingException;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Projecting\Attribute\Partitioned;
use Ecotone\Projecting\Attribute\ProjectionDeployment;
use Ecotone\Projecting\Attribute\ProjectionFlush;
use Ecotone\Projecting\Attribute\ProjectionV2;
use Ecotone\Projecting\ProjectionRegistry;
use Ecotone\Test\LicenceTesting;
use Enqueue\Dbal\DbalConnectionFactory;

Expand Down Expand Up @@ -359,4 +364,204 @@ public function delete(#[Reference] EventStore $eventStore): void
}
};
}

public function test_partitioned_projection_flush_emits_events_using_event_stream_emitter(): void
{
$projection = $this->createFlushEmittingProjection();

$ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore(
classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class],
containerOrAvailableServices: [
$projection,
new TicketEventConverter(),
new TicketListUpdatedConverter(),
DbalConnectionFactory::class => $this->getConnectionFactory(),
],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withNamespaces([
'Test\Ecotone\EventSourcing\Fixture\Ticket',
]),
pathToRootCatalog: __DIR__ . '/../../',
runForProductionEventStore: true,
licenceKey: LicenceTesting::VALID_LICENCE,
);

$ecotone
->sendCommand(new RegisterTicket('1', 'Johnny', 'alert'))
->sendCommand(new RegisterTicket('2', 'Jane', 'info'));

$eventStore = $ecotone->getGateway(EventStore::class);
$emittedEvents = $eventStore->load('projection_flush_emitting_projection');

self::assertCount(2, $emittedEvents);
}

public function test_rebuild_should_not_emit_events_from_flush_method(): void
{
$projection = $this->createFlushEmittingProjection();

$ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore(
classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class],
containerOrAvailableServices: [
$projection,
new TicketEventConverter(),
new TicketListUpdatedConverter(),
DbalConnectionFactory::class => $this->getConnectionFactory(),
],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withNamespaces([
'Test\Ecotone\EventSourcing\Fixture\Ticket',
]),
pathToRootCatalog: __DIR__ . '/../../',
runForProductionEventStore: true,
licenceKey: LicenceTesting::VALID_LICENCE,
);

$ecotone
->sendCommand(new RegisterTicket('1', 'Johnny', 'alert'))
->sendCommand(new RegisterTicket('2', 'Jane', 'info'));

$eventStore = $ecotone->getGateway(EventStore::class);
self::assertCount(2, $eventStore->load('projection_flush_emitting_projection'));

$ecotone->resetProjection('flush_emitting_projection');
self::assertEmpty($projection->getTickets());
$emittedCountBeforeRebuild = $eventStore->hasStream('projection_flush_emitting_projection')
? count($eventStore->load('projection_flush_emitting_projection'))
: 0;

$ecotone->getGateway(ProjectionRegistry::class)->get('flush_emitting_projection')->prepareRebuild();

self::assertNotEmpty($projection->getTickets());
$emittedCountAfterRebuild = $eventStore->hasStream('projection_flush_emitting_projection')
? count($eventStore->load('projection_flush_emitting_projection'))
: 0;
self::assertSame($emittedCountBeforeRebuild, $emittedCountAfterRebuild);
}

public function test_global_flush_with_projection_state_requires_enterprise_licence(): void
{
$projection = new #[ProjectionV2('global_flush_state_projection'), \Ecotone\EventSourcing\Attribute\FromStream(Ticket::class)] class () {
#[EventHandler(endpointId: 'globalFlushStateProjection.addTicket')]
public function addTicket(TicketWasRegistered $event, #[ProjectionState] array $ticket = []): array
{
$ticket['ticketId'] = $event->getTicketId();
return $ticket;
}

#[ProjectionFlush]
public function flush(#[ProjectionState] array $ticket, EventStreamEmitter $emitter): void
{
if (! isset($ticket['ticketId'])) {
return;
}
$emitter->emit([new TicketListUpdated($ticket['ticketId'])]);
}
};

$this->expectException(LicensingException::class);
$this->expectExceptionMessage('Using #[ProjectionState] in #[ProjectionFlush] methods requires Ecotone Enterprise licence.');

EcotoneLite::bootstrapFlowTestingWithEventStore(
classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class],
containerOrAvailableServices: [
$projection,
new TicketEventConverter(),
new TicketListUpdatedConverter(),
DbalConnectionFactory::class => $this->getConnectionFactory(),
],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withNamespaces([
'Test\Ecotone\EventSourcing\Fixture\Ticket',
]),
pathToRootCatalog: __DIR__ . '/../../',
runForProductionEventStore: true,
);
}

public function test_partitioned_flush_emitter_pattern_requires_enterprise_licence(): void
{
$projection = $this->createFlushEmittingProjection();

$this->expectException(LicensingException::class);
$this->expectExceptionMessageMatches('/Enterprise licence/');

EcotoneLite::bootstrapFlowTestingWithEventStore(
classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class],
containerOrAvailableServices: [
$projection,
new TicketEventConverter(),
new TicketListUpdatedConverter(),
DbalConnectionFactory::class => $this->getConnectionFactory(),
],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withNamespaces([
'Test\Ecotone\EventSourcing\Fixture\Ticket',
]),
pathToRootCatalog: __DIR__ . '/../../',
runForProductionEventStore: true,
);
}

private function createFlushEmittingProjection(): object
{
return new #[ProjectionV2('flush_emitting_projection'), Partitioned, FromAggregateStream(Ticket::class)] class () {
public array $tickets = [];

#[EventHandler(endpointId: 'flushEmittingProjection.addTicket')]
public function addTicket(TicketWasRegistered $event, #[ProjectionState] array $ticket = []): array
{
$ticket['ticketId'] = $event->getTicketId();
$ticket['status'] = 'open';
$this->tickets[$event->getTicketId()] = $ticket;
return $ticket;
}

#[EventHandler(endpointId: 'flushEmittingProjection.closeTicket')]
public function closeTicket(TicketWasClosed $event, #[ProjectionState] array $ticket): array
{
$ticket['status'] = 'closed';
$this->tickets[$event->getTicketId()] = $ticket;
return $ticket;
}

#[ProjectionFlush]
public function flush(#[ProjectionState] array $ticket, EventStreamEmitter $emitter): void
{
if (! isset($ticket['ticketId'])) {
return;
}

$emitter->emit([new TicketListUpdated($ticket['ticketId'])]);
}

#[QueryHandler('getFlushEmittingProjectionTickets')]
public function getTickets(): array
{
return $this->tickets;
}

#[ProjectionReset]
public function reset(#[Reference] EventStore $eventStore): void
{
$this->tickets = [];
if ($eventStore->hasStream('projection_flush_emitting_projection')) {
$eventStore->delete('projection_flush_emitting_projection');
}
}

#[ProjectionDelete]
public function delete(#[Reference] EventStore $eventStore): void
{
$this->tickets = [];
if ($eventStore->hasStream('projection_flush_emitting_projection')) {
$eventStore->delete('projection_flush_emitting_projection');
}
}
};
}
}
Loading