diff --git a/.gitignore b/.gitignore index 394ebcb07..dd5205b0d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea/ +docs/superpowers/ vendor/ tests/coverage !tests/coverage/.gitkeep diff --git a/packages/Dbal/src/Attribute/WithTenantResolver.php b/packages/Dbal/src/Attribute/WithTenantResolver.php new file mode 100644 index 000000000..4fbda77a9 --- /dev/null +++ b/packages/Dbal/src/Attribute/WithTenantResolver.php @@ -0,0 +1,23 @@ +expression; + } +} diff --git a/packages/Dbal/src/MultiTenant/Module/MultiTenantConnectionFactoryModule.php b/packages/Dbal/src/MultiTenant/Module/MultiTenantConnectionFactoryModule.php index f91c0383b..8e6266058 100644 --- a/packages/Dbal/src/MultiTenant/Module/MultiTenantConnectionFactoryModule.php +++ b/packages/Dbal/src/MultiTenant/Module/MultiTenantConnectionFactoryModule.php @@ -5,10 +5,14 @@ namespace Ecotone\Dbal\MultiTenant\Module; use Ecotone\AnnotationFinder\AnnotationFinder; +use Ecotone\Dbal\Attribute\WithTenantResolver; use Ecotone\Dbal\MultiTenant\HeaderBasedMultiTenantConnectionFactory; use Ecotone\Dbal\MultiTenant\MultiTenantConfiguration; use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; +use Ecotone\Dbal\MultiTenant\MultiTenantHeaderResolver; use Ecotone\Messaging\Attribute\AsynchronousRunningEndpoint; +use Ecotone\Messaging\Attribute\ChannelAdapter; +use Ecotone\Messaging\Attribute\MessageConsumer; use Ecotone\Messaging\Attribute\MessageGateway; use Ecotone\Messaging\Attribute\ModuleAnnotation; use Ecotone\Messaging\Attribute\PropagateHeaders; @@ -18,16 +22,19 @@ use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ExtensionObjectResolver; use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\NoExternalConfigurationModule; use Ecotone\Messaging\Config\Configuration; +use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Messaging\Config\Container\Definition; use Ecotone\Messaging\Config\Container\Reference; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ModuleReferenceSearchService; use Ecotone\Messaging\Gateway\MessagingEntrypointService; +use Ecotone\Messaging\Handler\ExpressionEvaluationService; use Ecotone\Messaging\Handler\InterfaceToCallRegistry; use Ecotone\Messaging\Handler\Logger\LoggingGateway; use Ecotone\Messaging\Handler\Processor\MethodInvoker\AroundInterceptorBuilder; use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInterceptorBuilder; use Ecotone\Messaging\Precedence; +use Ecotone\Messaging\Support\LicensingException; use Ecotone\Modelling\CommandBus; use Ecotone\Modelling\EventBus; use Ecotone\Modelling\MessageHandling\MetadataPropagator\MessageHeadersPropagatorInterceptor; @@ -40,13 +47,55 @@ */ final class MultiTenantConnectionFactoryModule extends NoExternalConfigurationModule implements AnnotationModule { + /** + * @param array $tenantResolverPlacements + * @param array $invalidTenantResolverPlacements + */ + private function __construct( + private array $tenantResolverPlacements, + private array $invalidTenantResolverPlacements, + ) { + } + public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static { - return new self(); + $allPlacements = []; + $invalid = []; + foreach ($annotationRegistrationService->findAnnotatedMethods(WithTenantResolver::class) as $annotatedMethod) { + $location = $annotatedMethod->getClassName() . '::' . $annotatedMethod->getMethodName(); + $allPlacements[] = $location; + + $isOnInboundAdapter = false; + foreach ($annotatedMethod->getMethodAnnotations() as $annotation) { + if ($annotation instanceof ChannelAdapter || $annotation instanceof MessageConsumer) { + $isOnInboundAdapter = true; + break; + } + } + if (! $isOnInboundAdapter) { + $invalid[] = $location; + } + } + + return new self($allPlacements, $invalid); } public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void { + if ($this->invalidTenantResolverPlacements !== []) { + throw ConfigurationException::create(sprintf( + "WithTenantResolver attribute on %s is invalid. WithTenantResolver may only be applied to inbound channel adapter methods (e.g. #[KafkaConsumer], #[AmqpConsumer], #[Scheduled]) where messages may arrive from outside the application without a tenant header. Internal Message Channels — including those used by synchronous and asynchronous CommandHandler / EventHandler / QueryHandler / ServiceActivator handlers — already carry the tenant context propagated from the originating bus call, so there is no header to derive there. If an asynchronous handler is processing externally-arrived messages, attach #[WithTenantResolver] to the inbound channel adapter that produces those messages, not to the handler.", + implode(', ', $this->invalidTenantResolverPlacements) + )); + } + + if ($this->tenantResolverPlacements !== [] && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { + throw LicensingException::create(sprintf( + 'WithTenantResolver attribute on %s requires Ecotone Enterprise licence.', + implode(', ', $this->tenantResolverPlacements) + )); + } + $messagingConfiguration->registerMessageChannel( SimpleMessageChannelBuilder::createPublishSubscribeChannel(HeaderBasedMultiTenantConnectionFactory::TENANT_ACTIVATED_CHANNEL_NAME) ); @@ -118,6 +167,28 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO MessageGateway::class ) ); + + $resolverReference = 'multi_tenant_header_resolver.' . $multiTenantConfig->getReferenceName(); + $messagingConfiguration->registerServiceDefinition( + $resolverReference, + new Definition( + MultiTenantHeaderResolver::class, + [ + $multiTenantConfig->getTenantHeaderName(), + Reference::to(ExpressionEvaluationService::REFERENCE), + ] + ) + ); + + $messagingConfiguration->registerBeforeMethodInterceptor( + MethodInterceptorBuilder::create( + Reference::to($resolverReference), + $interfaceToCallRegistry->getFor(MultiTenantHeaderResolver::class, 'resolve'), + Precedence::DEFAULT_PRECEDENCE, + WithTenantResolver::class, + true + ) + ); } } diff --git a/packages/Dbal/src/MultiTenant/MultiTenantHeaderResolver.php b/packages/Dbal/src/MultiTenant/MultiTenantHeaderResolver.php new file mode 100644 index 000000000..87eb395f0 --- /dev/null +++ b/packages/Dbal/src/MultiTenant/MultiTenantHeaderResolver.php @@ -0,0 +1,56 @@ +getHeaders()->containsKey($this->tenantHeaderName)) { + return []; + } + + $value = $this->expressionEvaluationService->evaluate( + $config->getExpression(), + [ + 'payload' => $message->getPayload(), + 'headers' => $message->getHeaders()->headers(), + ] + ); + + if ($value === null) { + return []; + } + + if (! is_string($value) && ! is_int($value)) { + $type = is_object($value) ? $value::class : gettype($value); + throw InvalidArgumentException::create(sprintf( + 'WithTenantResolver expression for tenant header "%s" must evaluate to string|int|null, got %s. Expression: %s', + $this->tenantHeaderName, + $type, + $config->getExpression() + )); + } + + return [$this->tenantHeaderName => $value]; + } +} diff --git a/packages/Dbal/tests/Integration/MultiTenant/ScheduledTenantResolverDatabaseRoutingTest.php b/packages/Dbal/tests/Integration/MultiTenant/ScheduledTenantResolverDatabaseRoutingTest.php new file mode 100644 index 000000000..6398e671c --- /dev/null +++ b/packages/Dbal/tests/Integration/MultiTenant/ScheduledTenantResolverDatabaseRoutingTest.php @@ -0,0 +1,211 @@ +resetTenantTables(); + + $poller = $this->newPoller(); + + $handler = new class () { + #[Asynchronous('persons_processing')] + #[CommandHandler('insertPerson', endpointId: 'insertPersonEndpoint')] + public function handle( + int $personId, + #[Header('person_name')] string $name, + #[Reference(DbalConnectionFactory::class)] ConnectionFactory $factory, + ): void { + $factory->createContext()->getDbalConnection()->executeStatement( + 'INSERT INTO persons (person_id, name) VALUES (?, ?)', + [$personId, $name] + ); + } + }; + + $ecotone = $this->bootstrap([$poller, $handler], asynchronous: true); + + $ecotone->run('externalPersonPoller', ExecutionPollingMetadata::createWithTestingSetup(1, 1)); + $ecotone->run('persons_processing', ExecutionPollingMetadata::createWithTestingSetup(1, 1)); + $ecotone->run('externalPersonPoller', ExecutionPollingMetadata::createWithTestingSetup(1, 1)); + $ecotone->run('persons_processing', ExecutionPollingMetadata::createWithTestingSetup(1, 1)); + + $this->assertTenantTablesIsolated($tenantADbal, $tenantBDbal); + } + + public function test_inserts_routed_to_per_tenant_database_when_handler_is_synchronous(): void + { + [$tenantADbal, $tenantBDbal] = $this->resetTenantTables(); + + $poller = $this->newPoller(); + + $handler = new class () { + #[CommandHandler('insertPerson', endpointId: 'insertPersonEndpoint')] + public function handle( + int $personId, + #[Header('person_name')] string $name, + #[Reference(DbalConnectionFactory::class)] ConnectionFactory $factory, + ): void { + $factory->createContext()->getDbalConnection()->executeStatement( + 'INSERT INTO persons (person_id, name) VALUES (?, ?)', + [$personId, $name] + ); + } + }; + + $ecotone = $this->bootstrap([$poller, $handler], asynchronous: false); + + $ecotone->run('externalPersonPoller', ExecutionPollingMetadata::createWithTestingSetup(1, 1)); + $ecotone->run('externalPersonPoller', ExecutionPollingMetadata::createWithTestingSetup(1, 1)); + + $this->assertTenantTablesIsolated($tenantADbal, $tenantBDbal); + } + + /** + * @return array{0: Connection, 1: Connection} + */ + private function resetTenantTables(): array + { + $tenantADbal = $this->connectionForTenantA()->createContext()->getDbalConnection(); + $tenantBDbal = $this->connectionForTenantB()->createContext()->getDbalConnection(); + $tenantADbal->executeStatement('DROP TABLE IF EXISTS persons'); + $tenantBDbal->executeStatement('DROP TABLE IF EXISTS persons'); + $this->setupUserTable($tenantADbal); + $this->setupUserTable($tenantBDbal); + + return [$tenantADbal, $tenantBDbal]; + } + + private function newPoller(): object + { + return new class ([ + ['source' => 'tenant_b', 'personId' => 200, 'name' => 'Bob'], + ['source' => 'tenant_a', 'personId' => 100, 'name' => 'Alice'], + ]) { + public function __construct(private array $pending) + { + } + + #[Scheduled(requestChannelName: 'insertPerson', endpointId: 'externalPersonPoller')] + #[WithTenantResolver(expression: "headers['source']")] + public function poll(): ?Message + { + if ($this->pending === []) { + return null; + } + $event = array_shift($this->pending); + return MessageBuilder::withPayload($event['personId']) + ->setHeader('person_name', $event['name']) + ->setHeader('source', $event['source']) + ->build(); + } + }; + } + + /** + * @param object[] $services + */ + private function bootstrap(array $services, bool $asynchronous): FlowTestSupport + { + $extensionObjects = [ + PollingMetadata::create('externalPersonPoller') + ->setExecutionAmountLimit(1) + ->setHandledMessageLimit(1), + MultiTenantConfiguration::create( + tenantHeaderName: 'tenant', + tenantToConnectionMapping: [ + 'tenant_a' => 'tenant_a_connection', + 'tenant_b' => 'tenant_b_connection', + ], + ), + DbalConfiguration::createWithDefaults() + ->withTransactionOnCommandBus(false) + ->withTransactionOnAsynchronousEndpoints(false) + ->withClearAndFlushObjectManagerOnCommandBus(false) + ->withDeduplication(false), + ]; + if ($asynchronous) { + $extensionObjects[] = PollingMetadata::create('persons_processing') + ->setExecutionAmountLimit(1) + ->setHandledMessageLimit(1); + } + + return EcotoneLite::bootstrapFlowTesting( + array_map(static fn (object $service): string => $service::class, $services), + array_merge( + $services, + [ + 'tenant_a_connection' => $this->connectionForTenantA(), + 'tenant_b_connection' => $this->connectionForTenantB(), + ], + ), + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects($extensionObjects), + enableAsynchronousProcessing: $asynchronous + ? [SimpleMessageChannelBuilder::createQueueChannel('persons_processing')] + : true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } + + private function assertTenantTablesIsolated(Connection $tenantADbal, Connection $tenantBDbal): void + { + $this->assertSame( + [['person_id' => 100, 'name' => 'Alice']], + $this->fetchPersons($tenantADbal), + 'tenant_a database must contain only the tenant_a record. WithTenantResolver routes the inbound message via headers[source] -> tenant header -> tenant_a connection.' + ); + $this->assertSame( + [['person_id' => 200, 'name' => 'Bob']], + $this->fetchPersons($tenantBDbal), + 'tenant_b database must contain only the tenant_b record. Cross-tenant leakage would mean tenant routing failed.' + ); + } + + /** + * @return array + */ + private function fetchPersons(Connection $connection): array + { + $rows = $connection->fetchAllAssociative('SELECT person_id, name FROM persons ORDER BY person_id'); + return array_map( + fn (array $row): array => ['person_id' => (int) $row['person_id'], 'name' => (string) $row['name']], + $rows + ); + } +} diff --git a/packages/Dbal/tests/Integration/MultiTenant/ScheduledTenantResolverTest.php b/packages/Dbal/tests/Integration/MultiTenant/ScheduledTenantResolverTest.php new file mode 100644 index 000000000..e390b176a --- /dev/null +++ b/packages/Dbal/tests/Integration/MultiTenant/ScheduledTenantResolverTest.php @@ -0,0 +1,309 @@ + 'tenant_a', 'payload' => 'first'], + ['source' => 'tenant_b', 'payload' => 'second'], + ]) { + public function __construct(private array $pending) + { + } + + #[Scheduled(requestChannelName: 'externalEventArrived', endpointId: 'externalEventPoller')] + #[WithTenantResolver(expression: "headers['source']")] + public function poll(): ?Message + { + if ($this->pending === []) { + return null; + } + $event = array_shift($this->pending); + return MessageBuilder::withPayload($event['payload']) + ->setHeader('source', $event['source']) + ->build(); + } + }; + $receiver = $this->newReceiver(); + $ecotone = $this->bootstrap([$poller, $receiver], [$poller::class, $receiver::class]); + + $this->pollOnce($ecotone); + $this->drainProcessing($ecotone); + + $first = $ecotone->sendQueryWithRouting('lastCapturedHeaders'); + $this->assertNotNull($first, 'Handler was never invoked - tenant resolution likely blocked the chain.'); + $this->assertSame('tenant_a', $first['tenant'] ?? null, 'Resolver should have derived tenant_a from headers[source].'); + + $this->pollOnce($ecotone); + $this->drainProcessing($ecotone); + + $second = $ecotone->sendQueryWithRouting('lastCapturedHeaders'); + $this->assertNotNull($second); + $this->assertSame('tenant_b', $second['tenant'] ?? null, 'Resolver should derive a fresh tenant per inbound message.'); + } + + public function test_explicit_tenant_header_takes_precedence_over_resolver(): void + { + $poller = new class ([ + 'source' => 'tenant_a', + 'payload' => 'first', + 'tenant' => 'tenant_b', + ]) { + public function __construct(private ?array $next) + { + } + + #[Scheduled(requestChannelName: 'externalEventArrived', endpointId: 'externalEventPoller')] + #[WithTenantResolver(expression: "headers['source']")] + public function poll(): ?Message + { + $event = $this->next; + $this->next = null; + if ($event === null) { + return null; + } + return MessageBuilder::withPayload($event['payload']) + ->setHeader('source', $event['source']) + ->setHeader('tenant', $event['tenant']) + ->build(); + } + }; + $receiver = $this->newReceiver(); + $ecotone = $this->bootstrap([$poller, $receiver], [$poller::class, $receiver::class]); + + $this->pollOnce($ecotone); + $this->drainProcessing($ecotone); + + $captured = $ecotone->sendQueryWithRouting('lastCapturedHeaders'); + $this->assertNotNull($captured); + $this->assertSame('tenant_b', $captured['tenant'] ?? null, 'Explicit tenant header must win over the resolver expression.'); + } + + public function test_no_tenant_header_when_resolver_attribute_missing(): void + { + $poller = new class () { + #[Scheduled(requestChannelName: 'externalEventArrived', endpointId: 'externalEventPoller')] + public function poll(): ?Message + { + static $emitted = false; + if ($emitted) { + return null; + } + $emitted = true; + return MessageBuilder::withPayload('first')->setHeader('source', 'tenant_a')->build(); + } + }; + $receiver = $this->newReceiver(); + $ecotone = $this->bootstrap([$poller, $receiver], [$poller::class, $receiver::class]); + + $this->pollOnce($ecotone); + $this->drainProcessing($ecotone); + + $captured = $ecotone->sendQueryWithRouting('lastCapturedHeaders'); + $this->assertNotNull($captured); + $this->assertArrayNotHasKey('tenant', $captured, 'Without #[WithTenantResolver], no tenant header should be injected.'); + } + + public function test_no_tenant_header_when_expression_evaluates_to_null(): void + { + $poller = new class () { + #[Scheduled(requestChannelName: 'externalEventArrived', endpointId: 'externalEventPoller')] + #[WithTenantResolver(expression: "headers['source'] ?? null")] + public function poll(): ?Message + { + static $emitted = false; + if ($emitted) { + return null; + } + $emitted = true; + return MessageBuilder::withPayload('first')->build(); + } + }; + $receiver = $this->newReceiver(); + $ecotone = $this->bootstrap([$poller, $receiver], [$poller::class, $receiver::class]); + + $this->pollOnce($ecotone); + $this->drainProcessing($ecotone); + + $captured = $ecotone->sendQueryWithRouting('lastCapturedHeaders'); + $this->assertNotNull($captured); + $this->assertArrayNotHasKey('tenant', $captured, 'Null expression result must not inject any tenant header.'); + } + + public function test_resolver_interceptor_fires_exactly_once_per_inbound_message(): void + { + $poller = new class () { + #[Scheduled(requestChannelName: 'externalEventArrived', endpointId: 'externalEventPoller')] + #[WithTenantResolver(expression: "headers['source']")] + public function poll(): ?Message + { + static $emitted = false; + if ($emitted) { + return null; + } + $emitted = true; + return MessageBuilder::withPayload('first')->setHeader('source', 'tenant_a')->build(); + } + }; + $receiver = $this->newReceiver(); + $counter = new class () { + private int $count = 0; + + #[Before(pointcut: WithTenantResolver::class)] + public function increment(): void + { + $this->count++; + } + + #[QueryHandler('counter.invocations')] + public function invocations(): int + { + return $this->count; + } + }; + $ecotone = $this->bootstrap( + [$poller, $receiver, $counter], + [$poller::class, $receiver::class, $counter::class], + ); + + $this->pollOnce($ecotone); + $this->drainProcessing($ecotone); + + $this->assertSame( + 1, + $ecotone->sendQueryWithRouting('counter.invocations'), + 'Inbound channel adapter must trigger the WithTenantResolver Before interceptor exactly once per message; double-firing would mean propagating handler annotations into endpoint annotations is causing the same pointcut to match twice.' + ); + } + + public function test_throws_when_resolver_expression_returns_non_scalar(): void + { + $poller = new class () { + #[Scheduled(requestChannelName: 'externalEventArrived', endpointId: 'externalEventPoller')] + #[WithTenantResolver(expression: 'payload')] + public function poll(): ?Message + { + static $emitted = false; + if ($emitted) { + return null; + } + $emitted = true; + return MessageBuilder::withPayload(['source' => 'tenant_a'])->build(); + } + }; + $receiver = $this->newReceiver(); + $ecotone = $this->bootstrap([$poller, $receiver], [$poller::class, $receiver::class]); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('must evaluate to string|int|null'); + + $this->pollOnce($ecotone); + } + + private function newReceiver(): object + { + return new class () { + /** @var array> */ + private array $captured = []; + + #[Asynchronous('external_processing')] + #[CommandHandler('externalEventArrived', endpointId: 'externalEventArrivedEndpoint')] + public function handle(mixed $payload, #[Headers] array $headers): void + { + $this->captured[] = $headers; + } + + /** + * @return array|null + */ + #[QueryHandler('lastCapturedHeaders')] + public function lastCapturedHeaders(): ?array + { + return array_shift($this->captured); + } + }; + } + + /** + * @param object[] $services + * @param class-string[] $classes + */ + private function bootstrap(array $services, array $classes): FlowTestSupport + { + return EcotoneLite::bootstrapFlowTesting( + $classes, + array_merge($services, ['tenant_a_connection' => new FakeConnectionFactory()]), + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + PollingMetadata::create('externalEventPoller') + ->setExecutionAmountLimit(1) + ->setHandledMessageLimit(1), + PollingMetadata::create('external_processing') + ->setExecutionAmountLimit(1) + ->setHandledMessageLimit(1), + MultiTenantConfiguration::createWithDefaultConnection( + 'tenant', + ['tenant_a' => 'tenant_a_connection', 'tenant_b' => 'tenant_a_connection'], + 'tenant_a_connection', + DbalConnectionFactory::class, + ), + DbalConfiguration::createWithDefaults() + ->withTransactionOnCommandBus(false) + ->withTransactionOnAsynchronousEndpoints(false) + ->withClearAndFlushObjectManagerOnCommandBus(false) + ->withDeduplication(false), + ]), + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('external_processing'), + ], + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } + + private function pollOnce(FlowTestSupport $ecotone): void + { + $ecotone->run('externalEventPoller', ExecutionPollingMetadata::createWithTestingSetup(1, 1)); + } + + private function drainProcessing(FlowTestSupport $ecotone): void + { + $ecotone->run('external_processing', ExecutionPollingMetadata::createWithTestingSetup(1, 1)); + } +} diff --git a/packages/Dbal/tests/Integration/MultiTenant/WithTenantResolverLicensingTest.php b/packages/Dbal/tests/Integration/MultiTenant/WithTenantResolverLicensingTest.php new file mode 100644 index 000000000..0993062a9 --- /dev/null +++ b/packages/Dbal/tests/Integration/MultiTenant/WithTenantResolverLicensingTest.php @@ -0,0 +1,89 @@ +newTenantResolvingPoller(); + + $this->expectException(LicensingException::class); + $this->expectExceptionMessage('WithTenantResolver'); + $this->expectExceptionMessage($service::class . '::poll'); + $this->expectExceptionMessage('Enterprise licence'); + + $this->bootstrap($service, null); + } + + public function test_bootstraps_successfully_with_enterprise_licence(): void + { + $this->bootstrap($this->newTenantResolvingPoller(), LicenceTesting::VALID_LICENCE); + + $this->assertTrue(true, 'Bootstrap with valid Enterprise licence must succeed when WithTenantResolver is in use.'); + } + + private function newTenantResolvingPoller(): object + { + return new class () { + #[Scheduled(requestChannelName: 'externalEventArrived', endpointId: 'externalEventPoller')] + #[WithTenantResolver(expression: "headers['source']")] + public function poll(): ?Message + { + return MessageBuilder::withPayload('payload')->setHeader('source', 'tenant_a')->build(); + } + }; + } + + private function bootstrap(object $service, ?string $licenceKey): void + { + EcotoneLite::bootstrapFlowTesting( + [$service::class], + [$service, 'tenant_a_connection' => new FakeConnectionFactory()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + MultiTenantConfiguration::createWithDefaultConnection( + 'tenant', + ['tenant_a' => 'tenant_a_connection'], + 'tenant_a_connection', + DbalConnectionFactory::class, + ), + DbalConfiguration::createWithDefaults() + ->withTransactionOnCommandBus(false) + ->withTransactionOnAsynchronousEndpoints(false) + ->withClearAndFlushObjectManagerOnCommandBus(false) + ->withDeduplication(false), + ]), + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('external_processing'), + ], + licenceKey: $licenceKey, + ); + } +} diff --git a/packages/Dbal/tests/Integration/MultiTenant/WithTenantResolverPlacementValidationTest.php b/packages/Dbal/tests/Integration/MultiTenant/WithTenantResolverPlacementValidationTest.php new file mode 100644 index 000000000..f60122d55 --- /dev/null +++ b/packages/Dbal/tests/Integration/MultiTenant/WithTenantResolverPlacementValidationTest.php @@ -0,0 +1,131 @@ +expectException(ConfigurationException::class); + $this->expectExceptionMessage($service::class . '::handle'); + $this->expectExceptionMessage('inbound channel adapter'); + $this->expectExceptionMessage('Internal Message Channels'); + + $this->bootstrap([$service::class], [$service]); + } + + public function test_throws_when_tenant_resolver_placed_on_event_handler(): void + { + $service = new class () { + #[EventHandler] + #[WithTenantResolver(expression: "headers['source']")] + public function on(stdClass $event): void + { + } + }; + + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage($service::class . '::on'); + + $this->bootstrap([$service::class], [$service]); + } + + public function test_throws_when_tenant_resolver_placed_on_asynchronous_handler(): void + { + $service = new class () { + #[Asynchronous('async_invalid_channel')] + #[CommandHandler('asyncInvalidPlacement', endpointId: 'asyncInvalidPlacementEndpoint')] + #[WithTenantResolver(expression: "headers['source']")] + public function handle(string $payload): void + { + } + }; + + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage($service::class . '::handle'); + $this->expectExceptionMessage('inbound channel adapter'); + + $this->bootstrap([$service::class], [$service]); + } + + public function test_does_not_throw_when_tenant_resolver_placed_on_inbound_channel_adapter(): void + { + $service = new class () { + #[Scheduled(requestChannelName: 'externalEventArrived', endpointId: 'externalEventPoller')] + #[WithTenantResolver(expression: "headers['source']")] + public function poll(): ?Message + { + return MessageBuilder::withPayload('payload')->setHeader('source', 'tenant_a')->build(); + } + }; + + $ecotone = $this->bootstrap([$service::class], [$service]); + + $this->assertNotNull($ecotone, 'Bootstrap should succeed when WithTenantResolver is placed on a #[Scheduled] inbound adapter.'); + } + + /** + * @param class-string[] $classes + * @param object[] $services + */ + private function bootstrap(array $classes, array $services): FlowTestSupport + { + return EcotoneLite::bootstrapFlowTesting( + $classes, + array_merge($services, ['tenant_a_connection' => new FakeConnectionFactory()]), + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + MultiTenantConfiguration::createWithDefaultConnection( + 'tenant', + ['tenant_a' => 'tenant_a_connection'], + 'tenant_a_connection', + DbalConnectionFactory::class, + ), + DbalConfiguration::createWithDefaults() + ->withTransactionOnCommandBus(false) + ->withTransactionOnAsynchronousEndpoints(false) + ->withClearAndFlushObjectManagerOnCommandBus(false) + ->withDeduplication(false), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } +} diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ScheduledModule.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ScheduledModule.php index f990c98d4..1b3a0c1e7 100644 --- a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ScheduledModule.php +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ScheduledModule.php @@ -41,7 +41,8 @@ public static function createConsumerFrom(AnnotatedFinding $annotationRegistrati $interfaceToCallRegistry->getFor($annotationRegistration->getClassName(), $annotationRegistration->getMethodName()) ) ->withEndpointId($annotation->getEndpointId()) - ->withRequiredInterceptorNames($annotation->getRequiredInterceptorNames()); + ->withRequiredInterceptorNames($annotation->getRequiredInterceptorNames()) + ->withEndpointAnnotations($annotationRegistration->getAllAnnotationDefinitions()); } public function getModulePackageName(): string diff --git a/packages/Ecotone/tests/Messaging/Unit/Config/Annotation/ModuleConfiguration/ScheduledModuleTest.php b/packages/Ecotone/tests/Messaging/Unit/Config/Annotation/ModuleConfiguration/ScheduledModuleTest.php new file mode 100644 index 000000000..4f0b7a9c6 --- /dev/null +++ b/packages/Ecotone/tests/Messaging/Unit/Config/Annotation/ModuleConfiguration/ScheduledModuleTest.php @@ -0,0 +1,77 @@ +count++; + } + + #[QueryHandler('scheduledMarker.count')] + public function count(): int + { + return $this->count; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$service::class, $counter::class], + [$service, $counter], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([])), + ); + + $ecotone->run('scheduledWithMarker', ExecutionPollingMetadata::createWithTestingSetup(1, 1)); + + $this->assertSame( + 1, + $ecotone->sendQueryWithRouting('scheduledMarker.count'), + 'Method-level attributes on a #[Scheduled] method must reach the channel adapter so attribute-pointcut interceptors fire.', + ); + } +} diff --git a/packages/Kafka/composer.json b/packages/Kafka/composer.json index ff69c1be6..f2a2c121a 100644 --- a/packages/Kafka/composer.json +++ b/packages/Kafka/composer.json @@ -48,7 +48,9 @@ "phpunit/phpunit": "^10.5|^11.0", "phpstan/phpstan": "^1.8", "psr/container": "^1.1.1|^2.0.1", - "kwn/php-rdkafka-stubs": "^2.2" + "kwn/php-rdkafka-stubs": "^2.2", + "ecotone/dbal": "~1.311.0", + "symfony/expression-language": "^6.4|^7.0|^8.0" }, "scripts": { "tests:phpstan": "vendor/bin/phpstan", diff --git a/packages/Kafka/tests/Integration/MultiTenant/KafkaTenantResolverTest.php b/packages/Kafka/tests/Integration/MultiTenant/KafkaTenantResolverTest.php new file mode 100644 index 000000000..f2b126293 --- /dev/null +++ b/packages/Kafka/tests/Integration/MultiTenant/KafkaTenantResolverTest.php @@ -0,0 +1,145 @@ +toRfc4122(); + $tenantBTopic = 'tenant_b_' . Uuid::v7()->toRfc4122(); + + $consumer = new class () { + /** @var array> */ + private array $captured = []; + + #[KafkaConsumer('tenantTopicConsumer', topics: ['tenant_a_topic', 'tenant_b_topic'])] + #[WithTenantResolver(expression: "headers['kafka_topic']")] + public function handle(string $payload, #[Headers] array $headers): void + { + $this->captured[] = $headers; + } + + /** + * @return array|null + */ + #[QueryHandler('consumer.lastCapturedHeaders')] + public function lastCapturedHeaders(): ?array + { + return array_shift($this->captured); + } + }; + + $stubConnection = new class () implements ConnectionFactory { + public function createContext(): Context + { + throw new LogicException('Tenant resolver test does not exercise downstream connection use.'); + } + }; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$consumer::class], + [ + $consumer, + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + 'tenant_a_connection' => $stubConnection, + 'tenant_b_connection' => $stubConnection, + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::KAFKA_PACKAGE])) + ->withExtensionObjects([ + TopicConfiguration::createWithReferenceName('tenant_a_topic', $tenantATopic), + TopicConfiguration::createWithReferenceName('tenant_b_topic', $tenantBTopic), + MultiTenantConfiguration::create( + 'tenant', + [$tenantATopic => 'tenant_a_connection', $tenantBTopic => 'tenant_b_connection'], + DbalConnectionFactory::class, + ), + DbalConfiguration::createWithDefaults() + ->withTransactionOnCommandBus(false) + ->withTransactionOnAsynchronousEndpoints(false) + ->withClearAndFlushObjectManagerOnCommandBus(false) + ->withDeduplication(false), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $this->publishToTopic($tenantATopic, 'payload_a'); + $this->publishToTopic($tenantBTopic, 'payload_b'); + + $ecotoneLite->run('tenantTopicConsumer', ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 2, + maxExecutionTimeInMilliseconds: 30000, + )); + + $headersList = []; + while (($captured = $ecotoneLite->sendQueryWithRouting('consumer.lastCapturedHeaders')) !== null) { + $headersList[] = $captured; + } + + $this->assertCount(2, $headersList, 'Both Kafka messages should have been consumed.'); + + $byTenant = []; + foreach ($headersList as $headers) { + $this->assertArrayHasKey('tenant', $headers, 'Resolver should inject tenant header derived from kafka_topic.'); + $byTenant[$headers['tenant']] = $headers; + } + + $this->assertArrayHasKey($tenantATopic, $byTenant, 'Message from tenant_a topic should land with tenant=' . $tenantATopic); + $this->assertArrayHasKey($tenantBTopic, $byTenant, 'Message from tenant_b topic should land with tenant=' . $tenantBTopic); + $this->assertSame($tenantATopic, $byTenant[$tenantATopic]['kafka_topic'] ?? null, 'Resolved tenant must equal the originating kafka_topic header.'); + $this->assertSame($tenantBTopic, $byTenant[$tenantBTopic]['kafka_topic'] ?? null); + } + + private function publishToTopic(string $topic, string $payload): void + { + $brokerList = ConnectionTestCase::getConnection()->getBootstrapServers()[0]; + + $conf = new Conf(); + $conf->set('metadata.broker.list', $brokerList); + $conf->set('socket.timeout.ms', '50'); + $producer = new Producer($conf); + + $kafkaTopic = $producer->newTopic($topic); + $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $payload); + $producer->poll(0); + + for ($i = 0; $i < 50 && $producer->getOutQLen() > 0; $i++) { + $producer->poll(50); + } + } +}