From 10a732f17ea5c06e81f7b951ae8d019984083a59 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 18:11:18 +0200 Subject: [PATCH 01/10] additional features --- .../src/Messaging/Attribute/Asynchronous.php | 14 +- .../src/Messaging/Attribute/DelayedRetry.php | 38 +++ .../src/Messaging/Attribute/ErrorChannel.php | 2 +- .../ErrorHandlerModule.php | 121 +++++++- .../MessagingGatewayModule.php | 4 + .../Config/MessagingSystemConfiguration.php | 25 +- .../InterceptedChannelAdapterBuilder.php | 1 + .../InterceptedPollingConsumerBuilder.php | 1 + ...PollingConsumerErrorChannelInterceptor.php | 42 ++- .../EnterpriseGatewayErrorChannelResolver.php | 54 +++- .../Projecting/Config/ProjectingModule.php | 2 +- .../ErrorChannel/FailingScheduledExample.php | 31 ++ .../ErrorChannelAsync/AsyncFailingHandler.php | 41 +++ .../ErrorChannelAsync/DelayedRetryHandler.php | 95 +++++++ .../Gateway/DelayedRetryCommandBus.php | 25 ++ .../Gateway/ErrorChannelCommandBus.php | 11 + .../HandlerLevelErrorChannelService.php | 34 +++ .../Config/AsyncEndpointAnnotationTest.php | 12 +- .../Handler/ErrorHandler/ErrorChannelTest.php | 265 ++++++++++++++++++ .../Gateway/ErrorChannelCommandBusTest.php | 100 +++++++ .../KafkaConsumerFailingExample.php | 21 ++ .../Integration/KafkaChannelAdapterTest.php | 131 +++++++++ 22 files changed, 1035 insertions(+), 35 deletions(-) create mode 100644 packages/Ecotone/src/Messaging/Attribute/DelayedRetry.php create mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannel/FailingScheduledExample.php create mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/AsyncFailingHandler.php create mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/DelayedRetryHandler.php create mode 100644 packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/DelayedRetryCommandBus.php create mode 100644 packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/HandlerLevelErrorChannelService.php create mode 100644 packages/Kafka/tests/Fixture/KafkaConsumer/KafkaConsumerFailingExample.php diff --git a/packages/Ecotone/src/Messaging/Attribute/Asynchronous.php b/packages/Ecotone/src/Messaging/Attribute/Asynchronous.php index 023497f56..01d57ff32 100644 --- a/packages/Ecotone/src/Messaging/Attribute/Asynchronous.php +++ b/packages/Ecotone/src/Messaging/Attribute/Asynchronous.php @@ -15,17 +15,17 @@ class Asynchronous { private string|array $channelName; /** @var AsynchronousEndpointAttribute[] */ - private array $endpointAnnotations; + private array $asynchronousExecution; /** - * @param AsynchronousEndpointAttribute[] $endpointAnnotations + * @param AsynchronousEndpointAttribute[] $asynchronousExecution Attributes scoped to the asynchronous execution context — applied when the polling consumer processes the Message, not at the synchronous bus call. */ - public function __construct(string|array $channelName, array $endpointAnnotations = []) + public function __construct(string|array $channelName, array $asynchronousExecution = []) { Assert::notNullAndEmpty($channelName, 'Channel name can not be empty string'); - Assert::allInstanceOfType($endpointAnnotations, AsynchronousEndpointAttribute::class); + Assert::allInstanceOfType($asynchronousExecution, AsynchronousEndpointAttribute::class); $this->channelName = $channelName; - $this->endpointAnnotations = $endpointAnnotations; + $this->asynchronousExecution = $asynchronousExecution; } public function getChannelName(): array @@ -36,8 +36,8 @@ public function getChannelName(): array /** * @return AsynchronousEndpointAttribute[] */ - public function getEndpointAnnotations(): array + public function getAsynchronousExecution(): array { - return $this->endpointAnnotations; + return $this->asynchronousExecution; } } diff --git a/packages/Ecotone/src/Messaging/Attribute/DelayedRetry.php b/packages/Ecotone/src/Messaging/Attribute/DelayedRetry.php new file mode 100644 index 000000000..d3d089e50 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Attribute/DelayedRetry.php @@ -0,0 +1,38 @@ + 0, 'DelayedRetry initialDelayMs must be greater than 0'); + Assert::isTrue($multiplier > 0, 'DelayedRetry multiplier must be greater than 0'); + Assert::isTrue($maxAttempts === null || $maxAttempts > 0, 'DelayedRetry maxAttempts must be null (unlimited) or greater than 0'); + Assert::isTrue($deadLetterChannel === null || $deadLetterChannel !== '', 'DelayedRetry deadLetterChannel must be null or a non-empty channel name'); + } + + public static function generateChannelName(string $handlerEndpointId): string + { + return 'ecotone.retry.' . $handlerEndpointId; + } + + public static function generateGatewayChannelName(string $gatewayInterfaceFqn): string + { + return 'ecotone.retry.gateway.' . str_replace('\\', '.', $gatewayInterfaceFqn); + } +} diff --git a/packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php b/packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php index 5fa5c08e5..137177995 100644 --- a/packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php +++ b/packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php @@ -11,7 +11,7 @@ * licence Enterprise */ #[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)] -class ErrorChannel +class ErrorChannel implements AsynchronousEndpointAttribute { /** * @param string $errorChannelName Name of the error channel to send Message too diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorHandlerModule.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorHandlerModule.php index 93961fa2e..2c9a65c39 100644 --- a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorHandlerModule.php +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorHandlerModule.php @@ -5,10 +5,15 @@ namespace Ecotone\Messaging\Config\Annotation\ModuleConfiguration; use Ecotone\AnnotationFinder\AnnotationFinder; +use Ecotone\Messaging\Attribute\Asynchronous; +use Ecotone\Messaging\Attribute\ErrorChannel; use Ecotone\Messaging\Attribute\ModuleAnnotation; +use Ecotone\Messaging\Attribute\DelayedRetry; use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder; +use Ecotone\Messaging\Config\Annotation\AnnotatedDefinitionReference; use Ecotone\Messaging\Config\Annotation\AnnotationModule; use Ecotone\Messaging\Config\Configuration; +use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Messaging\Config\Container\Definition; use Ecotone\Messaging\Config\Container\Reference; use Ecotone\Messaging\Config\ModulePackageList; @@ -19,6 +24,7 @@ use Ecotone\Messaging\Handler\Recoverability\DelayedRetryErrorHandler; use Ecotone\Messaging\Handler\Recoverability\ErrorHandlerConfiguration; use Ecotone\Messaging\Handler\Recoverability\RetryRunner; +use Ecotone\Messaging\Handler\Recoverability\RetryTemplateBuilder; use Ecotone\Messaging\Handler\Router\HeaderRouter; use Ecotone\Messaging\Handler\Router\RouterBuilder; use Ecotone\Messaging\Handler\ServiceActivator\ServiceActivatorBuilder; @@ -31,7 +37,10 @@ */ class ErrorHandlerModule extends NoExternalConfigurationModule implements AnnotationModule { - private function __construct() + /** + * @param ErrorHandlerConfiguration[] $perHandlerRetryConfigurations + */ + private function __construct(private array $perHandlerRetryConfigurations) { } @@ -40,7 +49,89 @@ private function __construct() */ public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static { - return new self(); + $perHandlerRetryConfigurations = []; + + $endpointMethods = $annotationRegistrationService->findAnnotatedMethods(\Ecotone\Messaging\Attribute\EndpointAnnotation::class); + $asynchronousMethods = $annotationRegistrationService->findAnnotatedMethods(Asynchronous::class); + + foreach ($asynchronousMethods as $asynchronousMethod) { + /** @var Asynchronous $asyncAnnotation */ + $asyncAnnotation = $asynchronousMethod->getAnnotationForMethod(); + + $delayedRetry = null; + $hasErrorChannel = false; + foreach ($asyncAnnotation->getAsynchronousExecution() as $endpointAnnotation) { + if ($endpointAnnotation instanceof DelayedRetry) { + $delayedRetry = $endpointAnnotation; + } elseif ($endpointAnnotation instanceof ErrorChannel) { + $hasErrorChannel = true; + } + } + if ($delayedRetry === null) { + continue; + } + + foreach ($endpointMethods as $endpoint) { + if ($endpoint->getClassName() !== $asynchronousMethod->getClassName() + || $endpoint->getMethodName() !== $asynchronousMethod->getMethodName()) { + continue; + } + + /** @var \Ecotone\Messaging\Attribute\EndpointAnnotation $endpointAttribute */ + $endpointAttribute = $endpoint->getAnnotationForMethod(); + $handlerEndpointId = $endpointAttribute->getEndpointId(); + + if ($hasErrorChannel) { + throw ConfigurationException::create( + "Handler `{$handlerEndpointId}` declares both #[ErrorChannel] and #[DelayedRetry] in #[Asynchronous] asynchronousExecution — these are mutually exclusive. " . + 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.' + ); + } + + $generatedErrorChannelName = DelayedRetry::generateChannelName($handlerEndpointId); + $retryTemplateBuilder = new RetryTemplateBuilder( + $delayedRetry->initialDelayMs, + $delayedRetry->multiplier, + $delayedRetry->maxDelayMs, + $delayedRetry->maxAttempts, + ); + + $perHandlerRetryConfigurations[] = self::buildErrorHandlerConfig($generatedErrorChannelName, $retryTemplateBuilder, $delayedRetry->deadLetterChannel); + break; + } + } + + $gatewayInterfacesWithDelayedRetry = $annotationRegistrationService->findAnnotatedClasses(DelayedRetry::class); + foreach ($gatewayInterfacesWithDelayedRetry as $gatewayInterfaceFqn) { + /** @var DelayedRetry $delayedRetry */ + $delayedRetry = AnnotatedDefinitionReference::getSingleAnnotationForClass($annotationRegistrationService, $gatewayInterfaceFqn, DelayedRetry::class); + + $errorChannelOnGateway = $annotationRegistrationService->findAnnotatedClasses(ErrorChannel::class); + if (in_array($gatewayInterfaceFqn, $errorChannelOnGateway, true)) { + throw ConfigurationException::create( + "Gateway `{$gatewayInterfaceFqn}` declares both #[ErrorChannel] and #[DelayedRetry] — these are mutually exclusive. " . + 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.' + ); + } + + $generatedErrorChannelName = DelayedRetry::generateGatewayChannelName($gatewayInterfaceFqn); + $retryTemplateBuilder = new RetryTemplateBuilder( + $delayedRetry->initialDelayMs, + $delayedRetry->multiplier, + $delayedRetry->maxDelayMs, + $delayedRetry->maxAttempts, + ); + $perHandlerRetryConfigurations[] = self::buildErrorHandlerConfig($generatedErrorChannelName, $retryTemplateBuilder, $delayedRetry->deadLetterChannel); + } + + return new self($perHandlerRetryConfigurations); + } + + private static function buildErrorHandlerConfig(string $errorChannelName, RetryTemplateBuilder $retryTemplateBuilder, ?string $deadLetterChannel): ErrorHandlerConfiguration + { + return $deadLetterChannel !== null + ? ErrorHandlerConfiguration::createWithDeadLetterChannel($errorChannelName, $retryTemplateBuilder, $deadLetterChannel) + : ErrorHandlerConfiguration::create($errorChannelName, $retryTemplateBuilder); } /** @@ -51,13 +142,18 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO if (! $this->hasErrorConfiguration($extensionObjects)) { $extensionObjects = [ErrorHandlerConfiguration::createDefault()]; } + $extensionObjects = array_merge($extensionObjects, $this->perHandlerRetryConfigurations); + $messagingConfiguration->registerServiceDefinition(RetryRunner::class, [new Reference(EcotoneClockInterface::class), new Reference(LoggingGateway::class)]); + $hasAnyErrorHandlerConfiguration = false; + /** @var ErrorHandlerConfiguration $extensionObject */ foreach ($extensionObjects as $index => $extensionObject) { if (! ($extensionObject instanceof ErrorHandlerConfiguration)) { continue; } + $hasAnyErrorHandlerConfiguration = true; $errorHandler = ServiceActivatorBuilder::createWithDefinition( new Definition(DelayedRetryErrorHandler::class, [ @@ -80,15 +176,18 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $messagingConfiguration ->registerMessageHandler($errorHandler) - ->registerDefaultChannelFor(SimpleMessageChannelBuilder::createPublishSubscribeChannel($extensionObject->getErrorChannelName())) - ->registerMessageHandler( - RouterBuilder::create( - new Definition(HeaderRouter::class, [MessageHeaders::POLLED_CHANNEL_NAME]), - $interfaceToCallRegistry->getFor(HeaderRouter::class, 'route') - ) - ->withEndpointId('error_handler.' . $extensionObject->getErrorChannelName() . '.router') - ->withInputChannelName('ecotone.recoverability.reply') - ); + ->registerDefaultChannelFor(SimpleMessageChannelBuilder::createPublishSubscribeChannel($extensionObject->getErrorChannelName())); + } + + if ($hasAnyErrorHandlerConfiguration) { + $messagingConfiguration->registerMessageHandler( + RouterBuilder::create( + new Definition(HeaderRouter::class, [MessageHeaders::POLLED_CHANNEL_NAME]), + $interfaceToCallRegistry->getFor(HeaderRouter::class, 'route') + ) + ->withEndpointId('error_handler.recoverability.reply.router') + ->withInputChannelName('ecotone.recoverability.reply') + ); } } diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php index 41db89d7b..bfcdb6976 100644 --- a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php @@ -136,6 +136,10 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO if ($errorChannel && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { throw LicensingException::create("Gateway {$gatewayBuilder->getInterfaceName()}::{$gatewayBuilder->getRelatedMethodName()} is marked with synchronous Error Channel. This functionality is available as part of Ecotone Enterprise."); } + $delayedRetry = $interfaceToCallRegistry->getFor($gatewayBuilder->getInterfaceName(), $gatewayBuilder->getRelatedMethodName())->getAnnotationsByImportanceOrder(Type::attribute(\Ecotone\Messaging\Attribute\DelayedRetry::class)); + if ($delayedRetry && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { + throw LicensingException::create("Gateway {$gatewayBuilder->getInterfaceName()}::{$gatewayBuilder->getRelatedMethodName()} is marked with #[DelayedRetry]. This functionality is available as part of Ecotone Enterprise."); + } $messagingConfiguration->registerGatewayBuilder($gatewayBuilder); } diff --git a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php index 3d6cbff89..d39a85122 100644 --- a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php +++ b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php @@ -434,13 +434,34 @@ private function configureAsynchronousEndpoints(InterfaceToCallRegistry $interfa } } } - $endpointAnnotations = $asyncAttribute ? $asyncAttribute->getEndpointAnnotations() : []; + $endpointAnnotations = $asyncAttribute ? $asyncAttribute->getAsynchronousExecution() : []; if ($endpointAnnotations && ! $this->isRunningForEnterpriseLicence) { throw LicensingException::create("Endpoint annotations on #[Asynchronous] attribute for endpoint `{$targetEndpointId}` require Ecotone Enterprise licence."); } + + $hasErrorChannel = false; + $hasDelayedRetry = false; + foreach ($endpointAnnotations as $endpointAnnotation) { + if ($endpointAnnotation instanceof \Ecotone\Messaging\Attribute\ErrorChannel) { + $hasErrorChannel = true; + } elseif ($endpointAnnotation instanceof \Ecotone\Messaging\Attribute\DelayedRetry) { + $hasDelayedRetry = true; + } + } + if ($hasErrorChannel && $hasDelayedRetry) { + throw ConfigurationException::create( + "Handler `{$targetEndpointId}` declares both #[ErrorChannel] and #[DelayedRetry] in #[Asynchronous] asynchronousExecution — these are mutually exclusive. " . + "Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel." + ); + } + + $contextAnnotations = $endpointAnnotations; + if ($hasDelayedRetry) { + $contextAnnotations[] = new AsynchronousRunningEndpoint($targetEndpointId); + } $asyncHandlerAnnotations[$handlerExecutionChannel] = array_map( fn ($a) => AttributeDefinition::fromObject($a), - $endpointAnnotations + $contextAnnotations ); $consequentialChannels = $asynchronousMessageChannels; diff --git a/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php b/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php index 313f2cba0..e8c0f9811 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php @@ -85,6 +85,7 @@ private function getErrorInterceptorReference(MessagingContainerBuilder $builder $builder->register(PollingConsumerErrorChannelInterceptor::class, new Definition(PollingConsumerErrorChannelInterceptor::class, [ Reference::to(ErrorChannelService::class), Reference::to(ChannelResolver::class), + Reference::to(AsyncEndpointAnnotationContext::class), ])); } return AroundInterceptorBuilder::create( diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php index 406770418..a8685e32f 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php @@ -125,6 +125,7 @@ private function getErrorInterceptorReference(MessagingContainerBuilder $builder $builder->register(PollingConsumerErrorChannelInterceptor::class, new Definition(PollingConsumerErrorChannelInterceptor::class, [ Reference::to(ErrorChannelService::class), new Reference(ChannelResolver::class), + new Reference(AsyncEndpointAnnotationContext::class), ])); } return AroundInterceptorBuilder::create( diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php index 37f0ede1d..409327a43 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php @@ -2,6 +2,9 @@ namespace Ecotone\Messaging\Endpoint\PollingConsumer; +use Ecotone\Messaging\Attribute\AsynchronousRunningEndpoint; +use Ecotone\Messaging\Attribute\ErrorChannel; +use Ecotone\Messaging\Attribute\DelayedRetry; use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\ChannelResolver; use Ecotone\Messaging\Handler\Gateway\ErrorChannelService; @@ -18,6 +21,7 @@ class PollingConsumerErrorChannelInterceptor public function __construct( private ErrorChannelService $errorChannelService, private ChannelResolver $channelResolver, + private AsyncEndpointAnnotationContext $asyncEndpointAnnotationContext, ) { } @@ -37,17 +41,28 @@ private function tryToSendToErrorChannel(Throwable $exception, Message $requestM if ($requestMessage->getHeaders()->containsKey(MessageHeaders::CONSUMER_POLLING_METADATA)) { /** @var PollingMetadata $pollingMetadata */ $pollingMetadata = $requestMessage->getHeaders()->get(MessageHeaders::CONSUMER_POLLING_METADATA); - $errorChannelName = $pollingMetadata->getErrorChannelName(); + + if ($pollingMetadata->isStoppedOnError()) { + return false; + } + + $errorChannelName = $this->resolveHandlerScopedErrorChannelName() + ?? $pollingMetadata->getErrorChannelName(); if (! $errorChannelName) { return false; } + $polledChannelName = $requestMessage->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) + ? $requestMessage->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) + : null; + $this->errorChannelService->handle( $requestMessage, $exception, $this->channelResolver->resolve($errorChannelName), - $requestMessage->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) + $polledChannelName, + $polledChannelName === null ? $pollingMetadata->getEndpointId() : null, ); return true; @@ -55,4 +70,27 @@ private function tryToSendToErrorChannel(Throwable $exception, Message $requestM return false; } + + private function resolveHandlerScopedErrorChannelName(): ?string + { + $handlerEndpointId = null; + $hasDelayedRetry = false; + + foreach ($this->asyncEndpointAnnotationContext->getCurrentAnnotations() as $annotation) { + if ($annotation instanceof ErrorChannel) { + return $annotation->errorChannelName; + } + if ($annotation instanceof DelayedRetry) { + $hasDelayedRetry = true; + } elseif ($annotation instanceof AsynchronousRunningEndpoint) { + $handlerEndpointId = $annotation->getEndpointId(); + } + } + + if ($hasDelayedRetry && $handlerEndpointId !== null) { + return DelayedRetry::generateChannelName($handlerEndpointId); + } + + return null; + } } diff --git a/packages/Ecotone/src/Messaging/Handler/Gateway/EnterpriseGatewayErrorChannelResolver.php b/packages/Ecotone/src/Messaging/Handler/Gateway/EnterpriseGatewayErrorChannelResolver.php index 42b5b33e2..e96471980 100644 --- a/packages/Ecotone/src/Messaging/Handler/Gateway/EnterpriseGatewayErrorChannelResolver.php +++ b/packages/Ecotone/src/Messaging/Handler/Gateway/EnterpriseGatewayErrorChannelResolver.php @@ -4,7 +4,9 @@ namespace Ecotone\Messaging\Handler\Gateway; +use Ecotone\Messaging\Attribute\DelayedRetry; use Ecotone\Messaging\Attribute\ErrorChannel; +use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Messaging\Handler\InterfaceToCall; use Ecotone\Messaging\Handler\Type; @@ -19,33 +21,75 @@ public function getErrorChannel(InterfaceToCall $interfaceToCall, array $endpoin return $errorChannelName; } + $this->assertNotBothErrorChannelAndDelayedRetry($interfaceToCall, $endpointAnnotations); + foreach ($endpointAnnotations as $endpointAnnotation) { if ($endpointAnnotation->getClassName() === ErrorChannel::class) { return $endpointAnnotation->instance()->errorChannelName; } + if ($endpointAnnotation->getClassName() === DelayedRetry::class) { + return DelayedRetry::generateGatewayChannelName($interfaceToCall->getInterfaceName()); + } } /** @var ErrorChannel[] $errorChannel */ $errorChannel = $interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class)); + if ($errorChannel) { + return $errorChannel[0]->errorChannelName; + } + + /** @var DelayedRetry[] $delayedRetry */ + $delayedRetry = $interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(DelayedRetry::class)); + if ($delayedRetry) { + return DelayedRetry::generateGatewayChannelName($interfaceToCall->getInterfaceName()); + } - return $errorChannel ? $errorChannel[0]->errorChannelName : null; + return null; } public function getErrorChannelRoutingSlip(InterfaceToCall $interfaceToCall, array $endpointAnnotations, string $requestChannelName): ?string { - /** @var ErrorChannel[] $errorChannelAttributes */ - $errorChannelAttributes = $interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class)); + $this->assertNotBothErrorChannelAndDelayedRetry($interfaceToCall, $endpointAnnotations); foreach ($endpointAnnotations as $endpointAnnotation) { - if ($endpointAnnotation->getClassName() === ErrorChannel::class) { + if ($endpointAnnotation->getClassName() === ErrorChannel::class + || $endpointAnnotation->getClassName() === DelayedRetry::class) { return $requestChannelName; } } - if ($errorChannelAttributes) { + if ($interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class))) { + return $requestChannelName; + } + if ($interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(DelayedRetry::class))) { return $requestChannelName; } return null; } + + private function assertNotBothErrorChannelAndDelayedRetry(InterfaceToCall $interfaceToCall, array $endpointAnnotations): void + { + $hasErrorChannel = false; + $hasDelayedRetry = false; + foreach ($endpointAnnotations as $endpointAnnotation) { + if ($endpointAnnotation->getClassName() === ErrorChannel::class) { + $hasErrorChannel = true; + } elseif ($endpointAnnotation->getClassName() === DelayedRetry::class) { + $hasDelayedRetry = true; + } + } + if ($interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class))) { + $hasErrorChannel = true; + } + if ($interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(DelayedRetry::class))) { + $hasDelayedRetry = true; + } + if ($hasErrorChannel && $hasDelayedRetry) { + throw ConfigurationException::create( + "Gateway `{$interfaceToCall->getInterfaceName()}` declares both #[ErrorChannel] and #[DelayedRetry] — these are mutually exclusive. " . + 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.' + ); + } + } } diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index c85012bcd..fb6b21821 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -126,7 +126,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $asyncAttribute = $projectionBuilder instanceof EcotoneProjectionExecutorBuilder ? $projectionBuilder->getAsyncAttribute() : null; if ($asyncAttribute !== null) { - $endpointAnnotations = $asyncAttribute->getEndpointAnnotations(); + $endpointAnnotations = $asyncAttribute->getAsynchronousExecution(); if ($messagingConfiguration->isRunningForEnterpriseLicence()) { $endpointAnnotations = array_merge($endpointAnnotations, [new WithoutDatabaseTransaction(), new WithoutMessageCollector()]); } diff --git a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannel/FailingScheduledExample.php b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannel/FailingScheduledExample.php new file mode 100644 index 000000000..ebacd3a4e --- /dev/null +++ b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannel/FailingScheduledExample.php @@ -0,0 +1,31 @@ +attemptsRecovers++; + if ($this->attemptsRecovers < 2) { + throw new RuntimeException('transient'); + } + $this->finallyHandled = true; + } + + #[Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ + new DelayedRetry( + initialDelayMs: 1, + multiplier: 1, + maxAttempts: 2, + deadLetterChannel: self::DEAD_LETTER_CHANNEL, + ), + ])] + #[CommandHandler(self::ROUTING_KEY_DEAD_LETTER, 'retryDeadLetter')] + public function alwaysFails(string $payload): void + { + $this->attemptsDeadLetter++; + throw new RuntimeException('permanent'); + } + + #[Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ + new DelayedRetry( + initialDelayMs: 1, + multiplier: 1, + maxAttempts: 1, + deadLetterChannel: self::DEAD_LETTER_CHANNEL, + ), + ])] + #[CommandHandler(self::ROUTING_KEY_OVERRIDE, 'retryOverride')] + public function alwaysFailsOverridingDefault(string $payload): void + { + $this->attemptsOverride++; + throw new RuntimeException('permanent'); + } + + #[QueryHandler('retryHandler.attemptsRecovers')] + public function getAttemptsRecovers(): int + { + return $this->attemptsRecovers; + } + + #[QueryHandler('retryHandler.attemptsDeadLetter')] + public function getAttemptsDeadLetter(): int + { + return $this->attemptsDeadLetter; + } + + #[QueryHandler('retryHandler.attemptsOverride')] + public function getAttemptsOverride(): int + { + return $this->attemptsOverride; + } + + #[QueryHandler('retryHandler.finallyHandled')] + public function isFinallyHandled(): bool + { + return $this->finallyHandled; + } +} diff --git a/packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/DelayedRetryCommandBus.php b/packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/DelayedRetryCommandBus.php new file mode 100644 index 000000000..39e2ac988 --- /dev/null +++ b/packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/DelayedRetryCommandBus.php @@ -0,0 +1,25 @@ +sideEffectExecuted = true; + throw new RuntimeException('handler-failure'); + } +} diff --git a/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php b/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php index 76637394a..afa604c73 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php @@ -45,7 +45,7 @@ public function test_around_interceptor_receives_handler_attribute_on_async_endp $collector->receivedAttribute = null; $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('test-value')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('test-value')])] #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] public function handle(string $payload): void { @@ -89,7 +89,7 @@ public function test_before_interceptor_receives_handler_attribute_on_async_endp $collector->receivedAttribute = null; $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('before-value')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('before-value')])] #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] public function handle(string $payload): void { @@ -133,13 +133,13 @@ public function test_multiple_handlers_on_same_channel_resolve_correct_attribute $collector->receivedAttributes = []; $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('handler-one')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('handler-one')])] #[CommandHandler('doWorkOne', endpointId: 'doWorkOne.endpoint')] public function handleOne(string $payload): void { } - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('handler-two')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('handler-two')])] #[CommandHandler('doWorkTwo', endpointId: 'doWorkTwo.endpoint')] public function handleTwo(string $payload): void { @@ -226,7 +226,7 @@ public function test_endpoint_annotations_require_enterprise_licence(): void $this->expectException(LicensingException::class); $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('test')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('test')])] #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] public function handle(string $payload): void { @@ -289,7 +289,7 @@ public function handle(stdClass $event): void public function test_without_message_collector_events_are_sent_directly_and_survive_handler_failure(): void { $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new WithoutMessageCollector()])] + #[Asynchronous('async', asynchronousExecution: [new WithoutMessageCollector()])] #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] public function handle(string $payload, EventBus $eventBus): void { diff --git a/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php b/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php index 9204c66c0..5b16c5301 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php @@ -10,9 +10,16 @@ use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Messaging\Endpoint\FinalFailureStrategy; +use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\Recoverability\ErrorHandlerConfiguration; use Ecotone\Messaging\Handler\Recoverability\RetryTemplateBuilder; +use Ecotone\Messaging\MessageHeaders; +use Ecotone\Messaging\PollableChannel; +use Ecotone\Test\LicenceTesting; use PHPUnit\Framework\TestCase; +use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannel\FailingScheduledExample; +use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannelAsync\AsyncFailingHandler; +use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannelAsync\DelayedRetryHandler; use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannel\OrderService; /** @@ -159,4 +166,262 @@ public function test_using_custom_channel_for_error_handling(): void ; $this->assertSame(1, $ecotone->sendQueryWithRouting('getOrderAmount')); } + + public function test_channel_adapter_sends_failed_message_to_default_error_channel_using_routing_slip(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [FailingScheduledExample::class], + [new FailingScheduledExample()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel('customErrorChannel') + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel('customErrorChannel'), + ]), + ); + + $ecotone->run(FailingScheduledExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + /** @var PollableChannel $errorChannel */ + $errorChannel = $ecotone->getMessageChannel('customErrorChannel'); + $errorMessage = $errorChannel->receive(); + $this->assertNotNull($errorMessage, 'Expected failed message to be delivered to default error channel'); + + $headers = $errorMessage->getHeaders(); + $this->assertFalse( + $headers->containsKey(MessageHeaders::POLLED_CHANNEL_NAME), + 'Channel adapter has no source pollable channel; POLLED_CHANNEL_NAME must not be set' + ); + $this->assertTrue( + $headers->containsKey(MessageHeaders::ROUTING_SLIP), + 'Routing slip is required for replay back to a channel adapter consumer' + ); + $this->assertSame(FailingScheduledExample::ENDPOINT_ID, $headers->get(MessageHeaders::ROUTING_SLIP)); + } + + public function test_channel_adapter_with_delayed_retry_template_throws_clear_error_about_missing_polled_channel(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [FailingScheduledExample::class], + [new FailingScheduledExample()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel('retryErrorChannel') + ->withExtensionObjects([ + ErrorHandlerConfiguration::create( + 'retryErrorChannel', + RetryTemplateBuilder::exponentialBackoff(1, 1)->maxRetryAttempts(2) + ), + ]), + ); + + $this->expectException(\Ecotone\Messaging\Handler\MessageHandlingException::class); + $this->expectExceptionMessage('does not contain information about origination channel from which it was polled'); + + $ecotone->run(FailingScheduledExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + } + + public function test_async_handler_routes_failure_to_error_channel_declared_via_endpoint_annotations(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [AsyncFailingHandler::class], + [new AsyncFailingHandler()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::SHARED_ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_A), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_B), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(AsyncFailingHandler::ROUTING_KEY_A, 'payload-a'); + $ecotone->run(AsyncFailingHandler::SHARED_ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + /** @var PollableChannel $errorChannelA */ + $errorChannelA = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_A); + /** @var PollableChannel $errorChannelB */ + $errorChannelB = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_B); + + $this->assertNotNull($errorChannelA->receive(), 'Handler A failure must be routed to its declared error channel'); + $this->assertNull($errorChannelB->receive(), 'Handler B error channel must remain empty when only handler A failed'); + } + + public function test_two_async_handlers_sharing_channel_each_route_failures_to_their_own_error_channel(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [AsyncFailingHandler::class], + [new AsyncFailingHandler()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::SHARED_ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_A), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_B), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(AsyncFailingHandler::ROUTING_KEY_A, 'payload-a'); + $ecotone->sendCommandWithRoutingKey(AsyncFailingHandler::ROUTING_KEY_B, 'payload-b'); + + $ecotone->run(AsyncFailingHandler::SHARED_ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 2, + failAtError: false, + )); + + /** @var PollableChannel $errorChannelA */ + $errorChannelA = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_A); + /** @var PollableChannel $errorChannelB */ + $errorChannelB = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_B); + + $messageInA = $errorChannelA->receive(); + $messageInB = $errorChannelB->receive(); + + $this->assertNotNull($messageInA, 'Handler A failure must land in error channel A'); + $this->assertNotNull($messageInB, 'Handler B failure must land in error channel B'); + $this->assertNull($errorChannelA->receive(), 'Only one message expected in error channel A'); + $this->assertNull($errorChannelB->receive(), 'Only one message expected in error channel B'); + } + + public function test_async_handler_endpoint_annotation_error_channel_overrides_default_error_channel(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [AsyncFailingHandler::class], + [new AsyncFailingHandler()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel('globalDefaultErrorChannel') + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::SHARED_ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_A), + SimpleMessageChannelBuilder::createQueueChannel('globalDefaultErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(AsyncFailingHandler::ROUTING_KEY_A, 'payload-a'); + $ecotone->run(AsyncFailingHandler::SHARED_ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + /** @var PollableChannel $globalDefault */ + $globalDefault = $ecotone->getMessageChannel('globalDefaultErrorChannel'); + /** @var PollableChannel $errorChannelA */ + $errorChannelA = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_A); + + $this->assertNotNull($errorChannelA->receive(), 'Per-handler #[ErrorChannel] must override the default error channel'); + $this->assertNull($globalDefault->receive(), 'Default error channel must not receive the failure when handler declares its own'); + } + + public function test_retry_policy_retries_handler_until_success(): void + { + $handler = new DelayedRetryHandler(); + $ecotone = EcotoneLite::bootstrapFlowTesting( + [DelayedRetryHandler::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::ASYNC_CHANNEL), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(DelayedRetryHandler::ROUTING_KEY_RECOVERS, 'payload'); + + $ecotone->run(DelayedRetryHandler::ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + $this->assertSame(1, $ecotone->sendQueryWithRouting('retryHandler.attemptsRecovers')); + $this->assertFalse($ecotone->sendQueryWithRouting('retryHandler.finallyHandled')); + + $ecotone->run(DelayedRetryHandler::ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + $this->assertSame(2, $ecotone->sendQueryWithRouting('retryHandler.attemptsRecovers')); + $this->assertTrue($ecotone->sendQueryWithRouting('retryHandler.finallyHandled')); + } + + public function test_retry_policy_routes_to_dead_letter_when_retries_exhausted(): void + { + $handler = new DelayedRetryHandler(); + $ecotone = EcotoneLite::bootstrapFlowTesting( + [DelayedRetryHandler::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::DEAD_LETTER_CHANNEL), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(DelayedRetryHandler::ROUTING_KEY_DEAD_LETTER, 'payload'); + + for ($i = 0; $i < 3; $i++) { + $ecotone->run(DelayedRetryHandler::ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + } + + $this->assertSame(3, $ecotone->sendQueryWithRouting('retryHandler.attemptsDeadLetter'), 'Handler invoked maxAttempts+1 times before exhaustion'); + + /** @var PollableChannel $deadLetter */ + $deadLetter = $ecotone->getMessageChannel(DelayedRetryHandler::DEAD_LETTER_CHANNEL); + $this->assertNotNull($deadLetter->receive(), 'Failed message must land in the dead letter channel after retries are exhausted'); + $this->assertNull($deadLetter->receive(), 'Only one failed message expected in the dead letter channel'); + } + + public function test_retry_policy_overrides_global_default_error_channel(): void + { + $handler = new DelayedRetryHandler(); + $ecotone = EcotoneLite::bootstrapFlowTesting( + [DelayedRetryHandler::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel('globalDefaultErrorChannel') + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::DEAD_LETTER_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel('globalDefaultErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(DelayedRetryHandler::ROUTING_KEY_OVERRIDE, 'payload'); + + for ($i = 0; $i < 2; $i++) { + $ecotone->run(DelayedRetryHandler::ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + } + + $this->assertSame(2, $ecotone->sendQueryWithRouting('retryHandler.attemptsOverride')); + + /** @var PollableChannel $deadLetter */ + $deadLetter = $ecotone->getMessageChannel(DelayedRetryHandler::DEAD_LETTER_CHANNEL); + /** @var PollableChannel $globalDefault */ + $globalDefault = $ecotone->getMessageChannel('globalDefaultErrorChannel'); + + $this->assertNotNull($deadLetter->receive(), '#[DelayedRetry] must route the failure to its own dead letter channel'); + $this->assertNull($globalDefault->receive(), 'Global default error channel must not receive the failure when handler declares #[DelayedRetry]'); + } } diff --git a/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php b/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php index 772982196..74ca8c361 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php @@ -5,20 +5,28 @@ namespace Messaging\Unit\Handler\Gateway; use Ecotone\Lite\EcotoneLite; +use Ecotone\Messaging\Attribute\DelayedRetry; use Ecotone\Messaging\Attribute\ErrorChannel; use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder; use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\MessagingGatewayModule; +use Ecotone\Messaging\Config\ModulePackageList; +use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Conversion\MediaType; +use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Messaging\Handler\Recoverability\ErrorContext; use Ecotone\Messaging\MessageHeaders; +use Ecotone\Messaging\PollableChannel; use Ecotone\Messaging\Support\LicensingException; use Ecotone\Test\LicenceTesting; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; use Ramsey\Uuid\Uuid; use Ramsey\Uuid\UuidInterface; +use RuntimeException; +use Test\Ecotone\Messaging\Fixture\Service\Gateway\DelayedRetryCommandBus; use Test\Ecotone\Messaging\Fixture\Service\Gateway\ErrorChannelCommandBus; use Test\Ecotone\Messaging\Fixture\Service\Gateway\ErrorChannelWithAsyncChannel; +use Test\Ecotone\Messaging\Fixture\Service\Gateway\HandlerLevelErrorChannelService; use Test\Ecotone\Messaging\Fixture\Service\Gateway\TicketService; use Test\Ecotone\Messaging\SerializationSupport; @@ -115,4 +123,96 @@ public function test_using_custom_error_channel_with_reply_channel(): void $message = $ecotoneLite->getMessageChannel('async')->receive(); $this->assertNotNull($message); } + + public function test_error_channel_on_command_handler_is_silently_ignored_must_be_placed_on_gateway(): void + { + $service = new HandlerLevelErrorChannelService(); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [HandlerLevelErrorChannelService::class], + [$service], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel('handlerLevelErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $thrown = null; + try { + $ecotoneLite->sendCommandWithRoutingKey('handler.level.error.channel.test', 'payload'); + } catch (RuntimeException $exception) { + $thrown = $exception; + } + + $this->assertNotNull( + $thrown, + 'Exception must propagate to the bus caller — #[ErrorChannel] on a #[CommandHandler] is not wired by any resolver.' + ); + $this->assertSame('handler-failure', $thrown->getMessage()); + $this->assertTrue( + $service->sideEffectExecuted, + 'Handler ran and produced side effects with no rollback boundary in place.' + ); + $this->assertNull( + $ecotoneLite->getMessageChannel('handlerLevelErrorChannel')->receive(), + '#[ErrorChannel] on the handler is silently ignored — no message routed. ' + . 'Place it on the gateway entry-point (CommandBus/EventBus/BusinessMethod) so the framework can capture failures ' + . 'after gateway-level interceptors (e.g. transactional rollback) have fully unwound.' + ); + } + + public function test_delayed_retry_on_command_bus_throws_in_non_enterprise_mode(): void + { + $this->expectException(LicensingException::class); + $this->expectExceptionMessage('#[DelayedRetry]'); + + EcotoneLite::bootstrapFlowTesting( + [TicketService::class, DelayedRetryCommandBus::class], + [new TicketService()], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetry::generateGatewayChannelName(DelayedRetryCommandBus::class)), + SimpleMessageChannelBuilder::createQueueChannel('gatewayRetryDeadLetter'), + ], + ); + } + + public function test_delayed_retry_on_command_bus_routes_failures_to_generated_channel(): void + { + $generatedRetryChannel = DelayedRetry::generateGatewayChannelName(DelayedRetryCommandBus::class); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [TicketService::class, DelayedRetryCommandBus::class], + [new TicketService()], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + SimpleMessageChannelBuilder::createQueueChannel($generatedRetryChannel), + SimpleMessageChannelBuilder::createQueueChannel('gatewayRetryDeadLetter'), + ], + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $commandBus = $ecotoneLite->getGateway(DelayedRetryCommandBus::class); + $payload = Uuid::uuid4(); + $commandBus->sendWithRouting( + 'createViaCommand', + $payload, + metadata: ['throwException' => true], + ); + + $this->assertEquals( + [], + $ecotoneLite->sendQueryWithRouting('getTickets'), + 'Handler must throw, leaving no ticket created' + ); + + /** @var PollableChannel $retryChannel */ + $retryChannel = $ecotoneLite->getMessageChannel($generatedRetryChannel); + $this->assertNotNull( + $retryChannel->receive(), + "#[DelayedRetry] on a CommandBus gateway must route failures to the auto-generated channel `{$generatedRetryChannel}`" + ); + } } diff --git a/packages/Kafka/tests/Fixture/KafkaConsumer/KafkaConsumerFailingExample.php b/packages/Kafka/tests/Fixture/KafkaConsumer/KafkaConsumerFailingExample.php new file mode 100644 index 000000000..fd6698760 --- /dev/null +++ b/packages/Kafka/tests/Fixture/KafkaConsumer/KafkaConsumerFailingExample.php @@ -0,0 +1,21 @@ +assertNotNull($ecotoneLite->getMessageChannel('customErrorChannel')->receive()); } + public function test_default_custom_error_channel_on_consumer(): void + { + $topicName = Uuid::v7()->toRfc4122(); + $publisherReferenceName = 'kafka_publisher'; + $consumerReferenceName = 'kafka_consumer_attribute'; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [KafkaConsumerFailingExample::class], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + new KafkaConsumerFailingExample(), + ], + ServiceConfiguration::createWithDefaults() + ->withDefaultErrorChannel('customErrorChannel') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE])) + ->withExtensionObjects([ + KafkaPublisherConfiguration::createWithDefaults($topicName, $publisherReferenceName), + TopicConfiguration::createWithReferenceName('testTopicFailure', $topicName), + KafkaConsumerConfiguration::createWithDefaults($consumerReferenceName), + SimpleMessageChannelBuilder::createQueueChannel('customErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + /** @var KafkaAdmin $kafkaAdmin */ + $kafkaAdmin = $ecotoneLite->getServiceFromContainer(KafkaAdmin::class); + $kafkaAdmin->getTopicForProducer($publisherReferenceName . '.handler') + ->produce(RD_KAFKA_PARTITION_UA, 0, Uuid::v7()->toRfc4122()); + $kafkaAdmin->getProducer($publisherReferenceName. '.handler')->flush(8000); + + $ecotoneLite->run($consumerReferenceName, ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000 + )->withStopOnError(false)); + + $this->assertNotNull($ecotoneLite->getMessageChannel('customErrorChannel')->receive()); + } + + public function test_default_error_channel_on_consumer_with_dead_letter_and_replay(): void + { + $topicName = 'test_topic_dead_letter_' . Uuid::v7()->toRfc4122(); + $consumerReferenceName = 'kafka_consumer_attribute'; + $failureCount = 0; + + $consumer = new class () { + public int $failureCount = 0; + private array $processedMessages = []; + + #[KafkaConsumer('kafka_consumer_attribute', 'testTopicDeadLetter')] + public function handle(#[\Ecotone\Messaging\Attribute\Parameter\Payload] string $payload): void + { + if ($this->failureCount < 1) { + $this->failureCount++; + throw new \RuntimeException('Simulated failure'); + } + $this->processedMessages[] = $payload; + } + + #[QueryHandler('consumer.getProcessedMessages')] + public function getProcessedMessages(): array + { + return $this->processedMessages; + } + }; + + $dbalConnectionFactory = new DbalConnectionFactory(getenv('DATABASE_DSN') ?: 'pgsql://ecotone:secret@database:5432/ecotone'); + $this->cleanDeadLetterTable($dbalConnectionFactory); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$consumer::class], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + DbalConnectionFactory::class => $dbalConnectionFactory, + 'managerRegistry' => $dbalConnectionFactory, + $consumer, + ], + ServiceConfiguration::createWithDefaults() + ->withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL) + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE, ModulePackageList::DBAL_PACKAGE])) + ->withExtensionObjects([ + KafkaPublisherConfiguration::createWithDefaults($topicName) + ->withHeaderMapper('*'), + TopicConfiguration::createWithReferenceName('testTopicDeadLetter', $topicName), + KafkaConsumerConfiguration::createWithDefaults($consumerReferenceName), + DbalConfiguration::createWithDefaults() + ->withAutomaticTableInitialization(true), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + pathToRootCatalog: __DIR__ . '/../../', + ); + + $payload = Uuid::v7()->toRfc4122(); + $messagePublisher = $ecotoneLite->getGateway(MessagePublisher::class); + $messagePublisher->send($payload); + + $ecotoneLite->run($consumerReferenceName, ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000, + failAtError: false, + )); + + /** @var DeadLetterGateway $deadLetter */ + $deadLetter = $ecotoneLite->getGateway(DeadLetterGateway::class); + $this->assertEquals(1, $deadLetter->count()); + + $deadLetter->replyAll(); + $this->assertEquals(0, $deadLetter->count()); + + $ecotoneLite->run($consumerReferenceName, ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000, + failAtError: false, + )); + + $processedMessages = $ecotoneLite->sendQueryWithRouting('consumer.getProcessedMessages'); + $this->assertCount(1, $processedMessages); + $this->assertEquals($payload, $processedMessages[0]); + } + public function test_convert_and_send(): void { $topicName = Uuid::v7()->toRfc4122(); @@ -499,4 +621,13 @@ public function test_kafka_publisher_works_without_explicit_configuration(): voi $this->assertTrue(true); } + + private function cleanDeadLetterTable(DbalConnectionFactory $connectionFactory): void + { + $connection = $connectionFactory->createContext()->getDbalConnection(); + $schemaManager = method_exists($connection, 'getSchemaManager') ? $connection->getSchemaManager() : $connection->createSchemaManager(); + if ($schemaManager->tablesExist(['ecotone_error_messages'])) { + $connection->executeStatement('DELETE FROM ecotone_error_messages'); + } + } } From cee1c068b8fcfa215126ea69ecefac86c9bbd3e6 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 19:27:50 +0200 Subject: [PATCH 02/10] delayed retry --- .../DbalDeadLetterConsoleCommand.php | 6 ++- .../Recoverability/DbalDeadLetterHandler.php | 12 +++-- ...ReplayableInboundChannelAdapterExample.php | 47 +++++++++++++++++++ .../Dbal/tests/Integration/DeadLetterTest.php | 38 +++++++++++++++ .../AcknowledgeConfirmationInterceptor.php | 6 ++- .../InboundChannelAdapterBuilder.php | 1 + .../MessagePoller/InvocationPollerAdapter.php | 20 ++++++-- ...PollingConsumerErrorChannelInterceptor.php | 42 ++++++++++++----- .../Ecotone/src/Messaging/MessageHeaders.php | 10 +++- .../Handler/ErrorHandler/ErrorChannelTest.php | 10 ++-- .../Inbound/KafkaInboundChannelAdapter.php | 5 +- .../OpenTelemetry/src/TracerInterceptor.php | 4 +- 12 files changed, 172 insertions(+), 29 deletions(-) create mode 100644 packages/Dbal/tests/Fixture/DeadLetter/InboundChannelAdapter/ReplayableInboundChannelAdapterExample.php diff --git a/packages/Dbal/src/Recoverability/DbalDeadLetterConsoleCommand.php b/packages/Dbal/src/Recoverability/DbalDeadLetterConsoleCommand.php index 6c052edea..b13754153 100644 --- a/packages/Dbal/src/Recoverability/DbalDeadLetterConsoleCommand.php +++ b/packages/Dbal/src/Recoverability/DbalDeadLetterConsoleCommand.php @@ -53,7 +53,11 @@ public function show(DeadLetterGateway $deadLetterGateway, string $messageId, bo [ ['Message Id', $message->getHeaders()->getMessageId()], ['Failed At', $this->convertTimestampToReadableFormat($message->getHeaders()->getTimestamp())], - ['Channel Name', $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME)], + ['Channel Name', $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) + ? $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) + : ($message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL) + ? $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL) + : 'Unknown')], ['Type', $message->getHeaders()->containsKey(MessageHeaders::TYPE_ID) ? $message->getHeaders()->get(MessageHeaders::TYPE_ID) : 'Unknown'], ['Stacktrace', $this->getReadableStacktrace($message->getHeaders()->get(ErrorContext::EXCEPTION_STACKTRACE), $fullDetails)], ] diff --git a/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php b/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php index 160bc95fe..472dce52e 100644 --- a/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php +++ b/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php @@ -245,12 +245,18 @@ private function initialize(): void private function replyWithoutInitialization(string $messageId, MessagingEntrypointService $messagingEntrypoint): void { $message = $this->show($messageId); - if (! $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) && ! $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP)) { - throw InvalidArgumentException::create("Can not reply to message {$messageId}, as it does not contain either `polledChannelName` or `routingSlip` header. Please add one of them, so Message can be routed back to the original channel."); + $hasPolledChannel = $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME); + $hasInboundRequestChannel = $message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL); + $hasRoutingSlip = $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP); + + if (! $hasPolledChannel && ! $hasInboundRequestChannel && ! $hasRoutingSlip) { + throw InvalidArgumentException::create("Can not reply to message {$messageId}, as it does not contain `polledChannelName`, `inboundRequestChannel` or `routingSlip` header. Please add one of them, so Message can be routed back to the original channel."); } - if ($message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME)) { + if ($hasPolledChannel) { $entrypoint = $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME); + } elseif ($hasInboundRequestChannel) { + $entrypoint = $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL); } else { // This allows to replay Error Message stored for synchronous calls (non asynchronous) $routingSlip = $message->getHeaders()->resolveRoutingSlip(); diff --git a/packages/Dbal/tests/Fixture/DeadLetter/InboundChannelAdapter/ReplayableInboundChannelAdapterExample.php b/packages/Dbal/tests/Fixture/DeadLetter/InboundChannelAdapter/ReplayableInboundChannelAdapterExample.php new file mode 100644 index 000000000..373a1466c --- /dev/null +++ b/packages/Dbal/tests/Fixture/DeadLetter/InboundChannelAdapter/ReplayableInboundChannelAdapterExample.php @@ -0,0 +1,47 @@ +hasEmitted) { + return null; + } + $this->hasEmitted = true; + + return 'first-payload'; + } + + #[ServiceActivator(self::REQUEST_CHANNEL)] + public function handle(string $payload): void + { + $this->invocations++; + if ($this->shouldFail) { + throw new RuntimeException('simulated'); + } + $this->processedPayloads[] = $payload; + } +} diff --git a/packages/Dbal/tests/Integration/DeadLetterTest.php b/packages/Dbal/tests/Integration/DeadLetterTest.php index 9dbc59663..4b9101c8a 100644 --- a/packages/Dbal/tests/Integration/DeadLetterTest.php +++ b/packages/Dbal/tests/Integration/DeadLetterTest.php @@ -10,6 +10,7 @@ use Ecotone\Lite\Test\FlowTestSupport; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\Recoverability\ErrorContext; use Ecotone\Messaging\MessageHeaders; @@ -21,6 +22,7 @@ use Test\Ecotone\Dbal\Fixture\DeadLetter\Example\ErrorConfigurationContext; use Test\Ecotone\Dbal\Fixture\DeadLetter\Example\OrderGateway; use Test\Ecotone\Dbal\Fixture\DeadLetter\Example\OrderService; +use Test\Ecotone\Dbal\Fixture\DeadLetter\InboundChannelAdapter\ReplayableInboundChannelAdapterExample; /** * @internal @@ -203,6 +205,42 @@ public function test_same_event_is_stored_in_dead_letter_twice_for_different_end $this->assertErrorMessageCount($ecotone, 0); } + public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler(): void + { + $handler = new ReplayableInboundChannelAdapterExample(); + $connectionFactory = $this->getConnectionFactory(); + + $ecotone = EcotoneLite::bootstrapFlowTesting( + containerOrAvailableServices: [ + $handler, + DbalConnectionFactory::class => $connectionFactory, + 'managerRegistry' => $connectionFactory, + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withEnvironment('prod') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL), + classesToResolve: [ReplayableInboundChannelAdapterExample::class], + pathToRootCatalog: __DIR__ . '/../../', + ); + + $ecotone->run(ReplayableInboundChannelAdapterExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + $this->assertErrorMessageCount($ecotone, 1); + $this->assertSame(1, $handler->invocations, 'Handler must have been invoked once before the failure was captured'); + $this->assertSame([], $handler->processedPayloads); + + $handler->shouldFail = false; + $this->replyAllErrorMessages($ecotone); + + $this->assertErrorMessageCount($ecotone, 0); + $this->assertSame(2, $handler->invocations, 'Replay must re-invoke the handler synchronously via the routing slip stored on the failed Message'); + $this->assertSame(['first-payload'], $handler->processedPayloads, 'Replayed Message must carry the original payload back to the handler'); + } + private function assertErrorMessageCount(FlowTestSupport $ecotone, int $amount, string $deadLetterReference = DeadLetterGateway::class): void { $gateway = $ecotone->getGateway(DeadLetterGateway::class); diff --git a/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php b/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php index 122cc28ca..9a25e0326 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php +++ b/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php @@ -36,7 +36,11 @@ public function __construct(private RetryRunner $retryRunner, private LoggingGat */ public function ack(MethodInvocation $methodInvocation, Message $message) { - $messageChannelName = $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) ? $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) : 'unknown'; + $messageChannelName = $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) + ? $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) + : ($message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL) + ? $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL) + : 'unknown'); $this->logger->info( sprintf( diff --git a/packages/Ecotone/src/Messaging/Endpoint/InboundChannelAdapter/InboundChannelAdapterBuilder.php b/packages/Ecotone/src/Messaging/Endpoint/InboundChannelAdapter/InboundChannelAdapterBuilder.php index 94fab5afa..442ab8b6c 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/InboundChannelAdapter/InboundChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Messaging/Endpoint/InboundChannelAdapter/InboundChannelAdapterBuilder.php @@ -142,6 +142,7 @@ public function compile(MessagingContainerBuilder $builder): Definition return new Definition(InvocationPollerAdapter::class, [ $objectReference, $methodName, + $this->requestChannelName, ]); } } diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/InvocationPollerAdapter.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/InvocationPollerAdapter.php index 8397024dc..ed7cb5806 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/InvocationPollerAdapter.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/InvocationPollerAdapter.php @@ -4,6 +4,7 @@ use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Message; +use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\MessagePoller; use Ecotone\Messaging\Support\MessageBuilder; @@ -12,8 +13,11 @@ */ class InvocationPollerAdapter implements MessagePoller { - public function __construct(private object $serviceToCall, private string $methodName) - { + public function __construct( + private object $serviceToCall, + private string $methodName, + private ?string $inboundRequestChannelName = null, + ) { } public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message @@ -22,9 +26,15 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message if ($result === null) { return null; } - return $result instanceof Message - ? $result - : MessageBuilder::withPayload($result)->build(); + $message = $result instanceof Message + ? MessageBuilder::fromMessage($result) + : MessageBuilder::withPayload($result); + + if ($this->inboundRequestChannelName !== null) { + $message = $message->setHeader(MessageHeaders::INBOUND_REQUEST_CHANNEL, $this->inboundRequestChannelName); + } + + return $message->build(); } public function onConsumerStop(): void diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php index 409327a43..6a548732c 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php @@ -53,17 +53,23 @@ private function tryToSendToErrorChannel(Throwable $exception, Message $requestM return false; } - $polledChannelName = $requestMessage->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) - ? $requestMessage->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) - : null; - - $this->errorChannelService->handle( - $requestMessage, - $exception, - $this->channelResolver->resolve($errorChannelName), - $polledChannelName, - $polledChannelName === null ? $pollingMetadata->getEndpointId() : null, - ); + if ($this->isInboundChannelAdapter($requestMessage)) { + $this->errorChannelService->handle( + $requestMessage, + $exception, + $this->channelResolver->resolve($errorChannelName), + null, + $this->resolveInboundRequestChannelName($requestMessage, $pollingMetadata), + ); + } else { + $this->errorChannelService->handle( + $requestMessage, + $exception, + $this->channelResolver->resolve($errorChannelName), + $requestMessage->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME), + null, + ); + } return true; } @@ -71,6 +77,20 @@ private function tryToSendToErrorChannel(Throwable $exception, Message $requestM return false; } + private function isInboundChannelAdapter(Message $requestMessage): bool + { + return ! $requestMessage->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME); + } + + private function resolveInboundRequestChannelName(Message $requestMessage, PollingMetadata $pollingMetadata): string + { + if ($requestMessage->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL)) { + return $requestMessage->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL); + } + + return $pollingMetadata->getEndpointId(); + } + private function resolveHandlerScopedErrorChannelName(): ?string { $handlerEndpointId = null; diff --git a/packages/Ecotone/src/Messaging/MessageHeaders.php b/packages/Ecotone/src/Messaging/MessageHeaders.php index d80e9e991..bf031b1ae 100644 --- a/packages/Ecotone/src/Messaging/MessageHeaders.php +++ b/packages/Ecotone/src/Messaging/MessageHeaders.php @@ -93,9 +93,15 @@ final class MessageHeaders */ public const CONSUMER_ACK_HEADER_LOCATION = 'consumerAcknowledgeCallbackHeader'; /** - * Consumed channel name + * Consumed channel name (set when the Message originates from a pollable Message Channel) */ public const POLLED_CHANNEL_NAME = 'polledChannelName'; + /** + * Inbound Channel Adapter request channel name (set when the Message originates from an Inbound Channel Adapter + * such as #[KafkaConsumer], AMQP inbound, #[Scheduled]). Carries the user-facing request channel where the Message + * is dispatched after polling, so it can be replayed back to the same handler. + */ + public const INBOUND_REQUEST_CHANNEL = 'inboundRequestChannel'; /** * Current polling metadata */ @@ -176,6 +182,7 @@ public static function getFrameworksHeaderNames(): array self::TIME_TO_LIVE, self::DELIVERY_DELAY, self::POLLED_CHANNEL_NAME, + self::INBOUND_REQUEST_CHANNEL, self::REPLY_CONTENT_TYPE, self::STREAM_BASED_SOURCED, MessagingEntrypointService::ENTRYPOINT, @@ -251,6 +258,7 @@ public static function unsetEnqueueMetadata(array $metadata): array $metadata[self::CONTENT_TYPE], $metadata[self::CONSUMER_ACK_HEADER_LOCATION], $metadata[self::POLLED_CHANNEL_NAME], + $metadata[self::INBOUND_REQUEST_CHANNEL], $metadata[self::CONSUMER_POLLING_METADATA], $metadata[self::REPLY_CHANNEL], $metadata[self::TEMPORARY_SPAN_CONTEXT_HEADER], diff --git a/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php b/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php index 5b16c5301..46f9daebb 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php @@ -167,7 +167,7 @@ public function test_using_custom_channel_for_error_handling(): void $this->assertSame(1, $ecotone->sendQueryWithRouting('getOrderAmount')); } - public function test_channel_adapter_sends_failed_message_to_default_error_channel_using_routing_slip(): void + public function test_inbound_channel_adapter_sends_failed_message_to_default_error_channel_using_routing_slip(): void { $ecotone = EcotoneLite::bootstrapFlowTesting( [FailingScheduledExample::class], @@ -193,16 +193,16 @@ public function test_channel_adapter_sends_failed_message_to_default_error_chann $headers = $errorMessage->getHeaders(); $this->assertFalse( $headers->containsKey(MessageHeaders::POLLED_CHANNEL_NAME), - 'Channel adapter has no source pollable channel; POLLED_CHANNEL_NAME must not be set' + 'Inbound Channel Adapter has no source pollable Message Channel; POLLED_CHANNEL_NAME must not be set' ); $this->assertTrue( $headers->containsKey(MessageHeaders::ROUTING_SLIP), - 'Routing slip is required for replay back to a channel adapter consumer' + 'Routing slip is required for replay back to an Inbound Channel Adapter consumer (Kafka, AMQP inbound, #[Scheduled], etc.)' ); - $this->assertSame(FailingScheduledExample::ENDPOINT_ID, $headers->get(MessageHeaders::ROUTING_SLIP)); + $this->assertSame(FailingScheduledExample::REQUEST_CHANNEL, $headers->get(MessageHeaders::ROUTING_SLIP)); } - public function test_channel_adapter_with_delayed_retry_template_throws_clear_error_about_missing_polled_channel(): void + public function test_inbound_channel_adapter_with_delayed_retry_template_throws_clear_error_about_missing_polled_channel(): void { $ecotone = EcotoneLite::bootstrapFlowTesting( [FailingScheduledExample::class], diff --git a/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php b/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php index c432f1cc7..faf12e483 100644 --- a/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php +++ b/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php @@ -9,6 +9,7 @@ use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\Logger\LoggingGateway; use Ecotone\Messaging\Message; +use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\MessagePoller; use Ecotone\Messaging\MessagingException; @@ -66,7 +67,9 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message $message, $this->conversionService, $this->batchCommitCoordinator - )->build(); + ) + ->setHeader(MessageHeaders::INBOUND_REQUEST_CHANNEL, $channelName) + ->build(); } if (in_array($message->err, [RD_KAFKA_MSG_PARTITIONER_RANDOM, RD_KAFKA_MSG_PARTITIONER_CONSISTENT, RD_KAFKA_MSG_PARTITIONER_CONSISTENT_RANDOM, RD_KAFKA_MSG_PARTITIONER_MURMUR2, RD_KAFKA_MSG_PARTITIONER_MURMUR2_RANDOM])) { diff --git a/packages/OpenTelemetry/src/TracerInterceptor.php b/packages/OpenTelemetry/src/TracerInterceptor.php index 7d23c3a79..3db50d770 100644 --- a/packages/OpenTelemetry/src/TracerInterceptor.php +++ b/packages/OpenTelemetry/src/TracerInterceptor.php @@ -43,7 +43,9 @@ public function traceAsynchronousEndpoint(MethodInvocation $methodInvocation, Me $trace = $this->trace( $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) ? 'Receiving from channel: ' . $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) - : 'Endpoint: ' . $message->getHeaders()->get(MessageHeaders::CONSUMER_POLLING_METADATA)->getEndpointId() . ' produced Message', + : ($message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL) + ? 'Receiving from inbound channel adapter: ' . $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL) + : 'Endpoint: ' . $message->getHeaders()->get(MessageHeaders::CONSUMER_POLLING_METADATA)->getEndpointId() . ' produced Message'), $methodInvocation, $message, spanKind: SpanKind::KIND_CONSUMER, From 1e69c4e53542f126a29a8457efea98716cc9e881 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 19:34:05 +0200 Subject: [PATCH 03/10] fail-fast on #[ErrorChannel] / #[DelayedRetry] placed directly on async handler methods Both attributes have no effect when placed alongside #[Asynchronous] on a handler method; they must be passed via #[Asynchronous(asynchronousExecution: [...])] for the polling consumer to pick them up. Detect the misplacement at compile time and throw a ConfigurationException pointing the user at the correct form. --- .../Config/MessagingSystemConfiguration.php | 15 +++++++ ...andlerWithDelayedRetryDirectlyOnMethod.php | 28 +++++++++++++ ...andlerWithErrorChannelDirectlyOnMethod.php | 28 +++++++++++++ .../Handler/ErrorHandler/ErrorChannelTest.php | 39 +++++++++++++++++++ 4 files changed, 110 insertions(+) create mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithDelayedRetryDirectlyOnMethod.php create mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithErrorChannelDirectlyOnMethod.php diff --git a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php index d39a85122..c612351f2 100644 --- a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php +++ b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php @@ -455,6 +455,21 @@ private function configureAsynchronousEndpoints(InterfaceToCallRegistry $interfa ); } + foreach ($handlerInterface->getMethodAnnotations() as $methodAnnotation) { + if ($methodAnnotation instanceof \Ecotone\Messaging\Attribute\ErrorChannel) { + throw ConfigurationException::create( + "Asynchronous handler `{$targetEndpointId}` has `#[ErrorChannel]` placed directly on the handler method — this has no effect on async handlers. " . + "Pass it via the #[Asynchronous] attribute instead: `#[Asynchronous('channel', asynchronousExecution: [new ErrorChannel('...')])]` so the polling consumer routes failures correctly." + ); + } + if ($methodAnnotation instanceof \Ecotone\Messaging\Attribute\DelayedRetry) { + throw ConfigurationException::create( + "Asynchronous handler `{$targetEndpointId}` has `#[DelayedRetry]` placed directly on the handler method — this has no effect on async handlers. " . + "Pass it via the #[Asynchronous] attribute instead: `#[Asynchronous('channel', asynchronousExecution: [new DelayedRetry(...)])]` so the polling consumer applies the retry policy correctly." + ); + } + } + $contextAnnotations = $endpointAnnotations; if ($hasDelayedRetry) { $contextAnnotations[] = new AsynchronousRunningEndpoint($targetEndpointId); diff --git a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithDelayedRetryDirectlyOnMethod.php b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithDelayedRetryDirectlyOnMethod.php new file mode 100644 index 000000000..565064c03 --- /dev/null +++ b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithDelayedRetryDirectlyOnMethod.php @@ -0,0 +1,28 @@ +assertNotNull($deadLetter->receive(), '#[DelayedRetry] must route the failure to its own dead letter channel'); $this->assertNull($globalDefault->receive(), 'Global default error channel must not receive the failure when handler declares #[DelayedRetry]'); } + + public function test_async_handler_with_error_channel_directly_on_method_throws_descriptive_error(): void + { + $this->expectException(\Ecotone\Messaging\Config\ConfigurationException::class); + $this->expectExceptionMessage('#[ErrorChannel]'); + $this->expectExceptionMessage('asynchronousExecution'); + + EcotoneLite::bootstrapFlowTesting( + [AsyncHandlerWithErrorChannelDirectlyOnMethod::class], + [new AsyncHandlerWithErrorChannelDirectlyOnMethod()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(AsyncHandlerWithErrorChannelDirectlyOnMethod::ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel('someErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } + + public function test_async_handler_with_delayed_retry_directly_on_method_throws_descriptive_error(): void + { + $this->expectException(\Ecotone\Messaging\Config\ConfigurationException::class); + $this->expectExceptionMessage('#[DelayedRetry]'); + $this->expectExceptionMessage('asynchronousExecution'); + + EcotoneLite::bootstrapFlowTesting( + [AsyncHandlerWithDelayedRetryDirectlyOnMethod::class], + [new AsyncHandlerWithDelayedRetryDirectlyOnMethod()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(AsyncHandlerWithDelayedRetryDirectlyOnMethod::ASYNC_CHANNEL), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } } From 393992f7379ed0e7af21a693e09e425de8a7229f Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 19:53:40 +0200 Subject: [PATCH 04/10] reject #[DelayedRetry] on inbound channel adapters; allow #[InstantRetry] on #[Scheduled] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Inbound Channel Adapters (Kafka, AMQP inbound, #[Scheduled]) consume from external systems and have no source Message Channel for the framework to reschedule a delayed retry into — fail-fast at bootstrap with a descriptive error pointing at #[ErrorChannel] and/or #[InstantRetry] as the supported alternatives. InstantRetryAttributeModule now also accepts #[ChannelAdapter] (parent of #[Scheduled]) in addition to #[MessageConsumer], so the recommended workaround works docker-free. --- .../ErrorHandlerModule.php | 13 +++ .../InstantRetryAttributeModule.php | 13 +-- ...AdapterWithInstantRetryAndErrorChannel.php | 52 +++++++++++ .../InboundChannelAdapterWithDelayedRetry.php | 26 ++++++ .../Handler/ErrorHandler/ErrorChannelTest.php | 90 +++++++++++++++++++ 5 files changed, 189 insertions(+), 5 deletions(-) create mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/InboundChannelAdapterWithInstantRetryAndErrorChannel.php create mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/InboundChannelAdapterWithDelayedRetry.php diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorHandlerModule.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorHandlerModule.php index 2c9a65c39..abe1f5c82 100644 --- a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorHandlerModule.php +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorHandlerModule.php @@ -51,6 +51,19 @@ public static function create(AnnotationFinder $annotationRegistrationService, I { $perHandlerRetryConfigurations = []; + foreach ($annotationRegistrationService->findAnnotatedMethods(DelayedRetry::class) as $delayedRetryMethod) { + $isInboundChannelAdapter = $delayedRetryMethod->hasMethodAnnotation(\Ecotone\Messaging\Attribute\MessageConsumer::class) + || $delayedRetryMethod->hasMethodAnnotation(\Ecotone\Messaging\Attribute\ChannelAdapter::class); + if (! $isInboundChannelAdapter) { + continue; + } + throw ConfigurationException::create( + "#[DelayedRetry] cannot be used on an Inbound Channel Adapter `{$delayedRetryMethod->getClassName()}::{$delayedRetryMethod->getMethodName()}`. " . + 'Inbound Channel Adapters consume from external systems (Kafka, AMQP, scheduled tasks) and have no source Message Channel for the framework to reschedule a delayed retry into. ' . + 'Use #[ErrorChannel] to capture the failure for later replay (e.g. from a Dead Letter), and optionally combine it with #[InstantRetry] for in-process retries before forwarding to the Error Channel.' + ); + } + $endpointMethods = $annotationRegistrationService->findAnnotatedMethods(\Ecotone\Messaging\Attribute\EndpointAnnotation::class); $asynchronousMethods = $annotationRegistrationService->findAnnotatedMethods(Asynchronous::class); diff --git a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php index 6b8d36c19..b7b5fba69 100644 --- a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php +++ b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php @@ -68,16 +68,19 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $asynchronousEndpointsWithInstantRetry = []; $annotatedMethods = $annotationRegistrationService->findAnnotatedMethods(InstantRetry::class); foreach ($annotatedMethods as $annotatedMethod) { - if (! $annotatedMethod->hasMethodAnnotation(MessageConsumer::class)) { + $hasMessageConsumer = $annotatedMethod->hasMethodAnnotation(MessageConsumer::class); + $hasChannelAdapter = $annotatedMethod->hasMethodAnnotation(\Ecotone\Messaging\Attribute\ChannelAdapter::class); + if (! $hasMessageConsumer && ! $hasChannelAdapter) { throw new ConfigurationException(sprintf( - "InstantRetry attribute can only be used on methods annotated with MessageConsumer. '%s' is not annotated with MessageConsumer (e.g. RabbitConsumer, KafkaConsumer).", + "InstantRetry attribute can only be used on Inbound Channel Adapter methods (annotated with MessageConsumer e.g. KafkaConsumer, RabbitConsumer; or with ChannelAdapter subclass e.g. #[Scheduled]). '%s' has neither.", $annotatedMethod->getClassName() . '::' . $annotatedMethod->getMethodName() )); } - /** @var MessageConsumer $messageConsumer */ - $messageConsumer = $annotatedMethod->getMethodAnnotationsWithType(MessageConsumer::class)[0]; - $asynchronousEndpointsWithInstantRetry[$messageConsumer->getEndpointId()] = $annotatedMethod->getAnnotationForMethod(); + $consumerAttribute = $hasMessageConsumer + ? $annotatedMethod->getMethodAnnotationsWithType(MessageConsumer::class)[0] + : $annotatedMethod->getMethodAnnotationsWithType(\Ecotone\Messaging\Attribute\ChannelAdapter::class)[0]; + $asynchronousEndpointsWithInstantRetry[$consumerAttribute->getEndpointId()] = $annotatedMethod->getAnnotationForMethod(); } return new self($commandBusesWithInstantRetry, $asynchronousEndpointsWithInstantRetry); diff --git a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/InboundChannelAdapterWithInstantRetryAndErrorChannel.php b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/InboundChannelAdapterWithInstantRetryAndErrorChannel.php new file mode 100644 index 000000000..1b236e8e1 --- /dev/null +++ b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/InboundChannelAdapterWithInstantRetryAndErrorChannel.php @@ -0,0 +1,52 @@ +hasEmitted) { + return null; + } + $this->hasEmitted = true; + + return 'payload'; + } + + #[ServiceActivator(self::REQUEST_CHANNEL)] + public function handle(string $payload): void + { + $this->invocations++; + if ($this->invocations <= $this->maxFailures) { + throw new RuntimeException('simulated'); + } + } +} diff --git a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/InboundChannelAdapterWithDelayedRetry.php b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/InboundChannelAdapterWithDelayedRetry.php new file mode 100644 index 000000000..11fb0d2e8 --- /dev/null +++ b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/InboundChannelAdapterWithDelayedRetry.php @@ -0,0 +1,26 @@ +expectException(\Ecotone\Messaging\Config\ConfigurationException::class); + $this->expectExceptionMessage('#[DelayedRetry] cannot be used on an Inbound Channel Adapter'); + $this->expectExceptionMessage('#[ErrorChannel]'); + $this->expectExceptionMessage('#[InstantRetry]'); + + EcotoneLite::bootstrapFlowTesting( + [InboundChannelAdapterWithDelayedRetry::class], + [new InboundChannelAdapterWithDelayedRetry()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } + + public function test_inbound_channel_adapter_with_instant_retry_recovers_within_in_process_retries(): void + { + $handler = new InboundChannelAdapterWithInstantRetryAndErrorChannel(); + $handler->maxFailures = 2; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [InboundChannelAdapterWithInstantRetryAndErrorChannel::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->run(InboundChannelAdapterWithInstantRetryAndErrorChannel::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + $this->assertSame(3, $handler->invocations, 'Handler must be invoked once + retried twice (retryTimes: 2) before succeeding on the third attempt'); + + /** @var PollableChannel $errorChannel */ + $errorChannel = $ecotone->getMessageChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL); + $this->assertNull($errorChannel->receive(), 'Error channel must remain empty when InstantRetry recovers within retry budget'); + } + + public function test_instant_retry_on_inbound_channel_adapter_requires_enterprise_licence(): void + { + $this->expectException(\Ecotone\Messaging\Support\LicensingException::class); + $this->expectExceptionMessage('Instant retry attribute is available only for Ecotone Enterprise'); + + EcotoneLite::bootstrapFlowTesting( + [InboundChannelAdapterWithInstantRetryAndErrorChannel::class], + [new InboundChannelAdapterWithInstantRetryAndErrorChannel()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL), + ]), + ); + } + + public function test_inbound_channel_adapter_with_instant_retry_forwards_to_error_channel_after_retries_exhausted(): void + { + $handler = new InboundChannelAdapterWithInstantRetryAndErrorChannel(); + $handler->maxFailures = PHP_INT_MAX; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [InboundChannelAdapterWithInstantRetryAndErrorChannel::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->run(InboundChannelAdapterWithInstantRetryAndErrorChannel::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + $this->assertSame(3, $handler->invocations, 'Handler must be invoked once + retried twice before forwarding to Error Channel'); + + /** @var PollableChannel $errorChannel */ + $errorChannel = $ecotone->getMessageChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL); + $this->assertNotNull($errorChannel->receive(), 'Failed message must land in the configured Error Channel after InstantRetry retries are exhausted'); + } } From 642019702ecc80d67619f15198dafd429d85f686 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 20:06:22 +0200 Subject: [PATCH 05/10] test: KafkaConsumer dead letter replay parity with Dbal scheduled test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors Test\Ecotone\Dbal\Integration\DeadLetterTest::test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler — same shape, real Kafka transport. Verifies that a failure on a #[KafkaConsumer] lands in the DBAL Dead Letter, replyAll() routes it back to the same consumer's handler with the original payload, and the handler runs successfully on the second attempt. --- .../ReplayableKafkaConsumerExample.php | 33 ++++++++++ .../Integration/KafkaChannelAdapterTest.php | 60 +++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 packages/Kafka/tests/Fixture/KafkaConsumer/ReplayableKafkaConsumerExample.php diff --git a/packages/Kafka/tests/Fixture/KafkaConsumer/ReplayableKafkaConsumerExample.php b/packages/Kafka/tests/Fixture/KafkaConsumer/ReplayableKafkaConsumerExample.php new file mode 100644 index 000000000..065b31ed1 --- /dev/null +++ b/packages/Kafka/tests/Fixture/KafkaConsumer/ReplayableKafkaConsumerExample.php @@ -0,0 +1,33 @@ +invocations++; + if ($this->shouldFail) { + throw new RuntimeException('simulated'); + } + $this->processedPayloads[] = $payload; + } +} diff --git a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php index 400efdda9..b1df4028a 100644 --- a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php +++ b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php @@ -42,6 +42,7 @@ use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithDelayedRetryExample; use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithFailStrategyExample; use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithInstantRetryAndErrorChannelExample; +use Test\Ecotone\Kafka\Fixture\KafkaConsumer\ReplayableKafkaConsumerExample; use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithInstantRetryExample; use Test\Ecotone\Kafka\Fixture\MediaTypeConverter\JsonEncodingConverter; @@ -393,6 +394,65 @@ public function getProcessedMessages(): array $this->assertEquals($payload, $processedMessages[0]); } + public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler(): void + { + $topicName = 'test_topic_replayable_' . Uuid::v7()->toRfc4122(); + $handler = new ReplayableKafkaConsumerExample(); + + $dbalConnectionFactory = new DbalConnectionFactory(getenv('DATABASE_DSN') ?: 'pgsql://ecotone:secret@database:5432/ecotone'); + $this->cleanDeadLetterTable($dbalConnectionFactory); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [ReplayableKafkaConsumerExample::class], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + DbalConnectionFactory::class => $dbalConnectionFactory, + 'managerRegistry' => $dbalConnectionFactory, + $handler, + ], + ServiceConfiguration::createWithDefaults() + ->withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL) + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE, ModulePackageList::DBAL_PACKAGE])) + ->withExtensionObjects([ + KafkaPublisherConfiguration::createWithDefaults($topicName) + ->withHeaderMapper('*'), + TopicConfiguration::createWithReferenceName(ReplayableKafkaConsumerExample::TOPIC_REFERENCE, $topicName), + KafkaConsumerConfiguration::createWithDefaults(ReplayableKafkaConsumerExample::ENDPOINT_ID), + DbalConfiguration::createWithDefaults() + ->withAutomaticTableInitialization(true), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + pathToRootCatalog: __DIR__ . '/../../', + ); + + $payload = Uuid::v7()->toRfc4122(); + $messagePublisher = $ecotoneLite->getGateway(MessagePublisher::class); + $messagePublisher->send($payload); + + $ecotoneLite->run(ReplayableKafkaConsumerExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000, + failAtError: false, + )); + + /** @var DeadLetterGateway $deadLetter */ + $deadLetter = $ecotoneLite->getGateway(DeadLetterGateway::class); + $this->assertEquals(1, $deadLetter->count(), 'Failed Kafka Message must land in DBAL Dead Letter'); + $this->assertSame(1, $handler->invocations); + $this->assertSame([], $handler->processedPayloads); + + $handler->shouldFail = false; + $deadLetter->replyAll(); + $this->assertEquals(0, $deadLetter->count()); + + $ecotoneLite->run(ReplayableKafkaConsumerExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000, + failAtError: false, + )); + + $this->assertSame(2, $handler->invocations, 'Replayed Message must re-invoke the handler exactly once'); + $this->assertSame([$payload], $handler->processedPayloads, 'Replayed Message must carry the original payload back to the handler'); + } + public function test_convert_and_send(): void { $topicName = Uuid::v7()->toRfc4122(); From 5559e59f8886ef096e0d2bc8ad350518cb21660d Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 20:08:05 +0200 Subject: [PATCH 06/10] verify replyAll() triggers KafkaConsumer handler synchronously MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The DLQ handler routes via MessagingEntrypoint, which is a synchronous in-process dispatch — invocations should be 2 immediately after replyAll(), no need to run() the consumer again. Drop the redundant second poll and assert directly. Cuts the test runtime from ~35s to ~4s. --- .../Kafka/tests/Integration/KafkaChannelAdapterTest.php | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php index b1df4028a..eaed87352 100644 --- a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php +++ b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php @@ -442,14 +442,9 @@ public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_re $handler->shouldFail = false; $deadLetter->replyAll(); - $this->assertEquals(0, $deadLetter->count()); - $ecotoneLite->run(ReplayableKafkaConsumerExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( - maxExecutionTimeInMilliseconds: 30000, - failAtError: false, - )); - - $this->assertSame(2, $handler->invocations, 'Replayed Message must re-invoke the handler exactly once'); + $this->assertEquals(0, $deadLetter->count()); + $this->assertSame(2, $handler->invocations, 'replyAll() must synchronously re-invoke the handler via MessagingEntrypoint — no second run() needed'); $this->assertSame([$payload], $handler->processedPayloads, 'Replayed Message must carry the original payload back to the handler'); } From 353f925a74aa33813162d3ced45e5908a9ccb641 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 20:19:10 +0200 Subject: [PATCH 07/10] refactor: consolidate ChannelAdapter into MessageConsumer; inline test fixtures; extract exception factory - ChannelAdapter now extends MessageConsumer (and MessageConsumer extends IdentifiedAnnotation), so checks throughout the framework reference a single base class instead of two. - Move all introduced test fixtures inline (anonymous classes for single-use, named classes within the same test file for multi-use). Drops 9 separate fixture files. - Extract ConfigurationException/LicensingException messages for Error Channel + Delayed Retry placement validations into an ErrorChannelExceptionMessages factory; keeps the validation logic readable. - Add a second run() check to dead-letter replay tests (Kafka + Dbal) verifying the replayed message is not re-consumed. --- ...ReplayableInboundChannelAdapterExample.php | 47 ---- .../Dbal/tests/Integration/DeadLetterTest.php | 48 +++- .../Messaging/Attribute/ChannelAdapter.php | 2 +- .../Messaging/Attribute/MessageConsumer.php | 17 +- .../ErrorChannelExceptionMessages.php | 44 ++++ .../ErrorHandlerModule.php | 14 +- .../Config/MessagingSystemConfiguration.php | 9 +- .../InstantRetryAttributeModule.php | 11 +- .../ErrorChannel/FailingScheduledExample.php | 31 --- .../ErrorChannelAsync/AsyncFailingHandler.php | 41 --- .../ErrorChannelAsync/DelayedRetryHandler.php | 95 ------- ...AdapterWithInstantRetryAndErrorChannel.php | 52 ---- ...andlerWithDelayedRetryDirectlyOnMethod.php | 28 --- ...andlerWithErrorChannelDirectlyOnMethod.php | 28 --- .../InboundChannelAdapterWithDelayedRetry.php | 26 -- .../Gateway/DelayedRetryCommandBus.php | 25 -- .../HandlerLevelErrorChannelService.php | 34 --- .../Handler/ErrorHandler/ErrorChannelTest.php | 233 ++++++++++++++++-- .../Gateway/ErrorChannelCommandBusTest.php | 33 ++- .../ReplayableKafkaConsumerExample.php | 33 --- .../Integration/KafkaChannelAdapterTest.php | 37 ++- 21 files changed, 382 insertions(+), 506 deletions(-) delete mode 100644 packages/Dbal/tests/Fixture/DeadLetter/InboundChannelAdapter/ReplayableInboundChannelAdapterExample.php create mode 100644 packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php delete mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannel/FailingScheduledExample.php delete mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/AsyncFailingHandler.php delete mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/DelayedRetryHandler.php delete mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/InboundChannelAdapterWithInstantRetryAndErrorChannel.php delete mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithDelayedRetryDirectlyOnMethod.php delete mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithErrorChannelDirectlyOnMethod.php delete mode 100644 packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/InboundChannelAdapterWithDelayedRetry.php delete mode 100644 packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/DelayedRetryCommandBus.php delete mode 100644 packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/HandlerLevelErrorChannelService.php delete mode 100644 packages/Kafka/tests/Fixture/KafkaConsumer/ReplayableKafkaConsumerExample.php diff --git a/packages/Dbal/tests/Fixture/DeadLetter/InboundChannelAdapter/ReplayableInboundChannelAdapterExample.php b/packages/Dbal/tests/Fixture/DeadLetter/InboundChannelAdapter/ReplayableInboundChannelAdapterExample.php deleted file mode 100644 index 373a1466c..000000000 --- a/packages/Dbal/tests/Fixture/DeadLetter/InboundChannelAdapter/ReplayableInboundChannelAdapterExample.php +++ /dev/null @@ -1,47 +0,0 @@ -hasEmitted) { - return null; - } - $this->hasEmitted = true; - - return 'first-payload'; - } - - #[ServiceActivator(self::REQUEST_CHANNEL)] - public function handle(string $payload): void - { - $this->invocations++; - if ($this->shouldFail) { - throw new RuntimeException('simulated'); - } - $this->processedPayloads[] = $payload; - } -} diff --git a/packages/Dbal/tests/Integration/DeadLetterTest.php b/packages/Dbal/tests/Integration/DeadLetterTest.php index 4b9101c8a..9f1748b38 100644 --- a/packages/Dbal/tests/Integration/DeadLetterTest.php +++ b/packages/Dbal/tests/Integration/DeadLetterTest.php @@ -22,7 +22,6 @@ use Test\Ecotone\Dbal\Fixture\DeadLetter\Example\ErrorConfigurationContext; use Test\Ecotone\Dbal\Fixture\DeadLetter\Example\OrderGateway; use Test\Ecotone\Dbal\Fixture\DeadLetter\Example\OrderService; -use Test\Ecotone\Dbal\Fixture\DeadLetter\InboundChannelAdapter\ReplayableInboundChannelAdapterExample; /** * @internal @@ -207,7 +206,38 @@ public function test_same_event_is_stored_in_dead_letter_twice_for_different_end public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler(): void { - $handler = new ReplayableInboundChannelAdapterExample(); + $handler = new class () { + public const ENDPOINT_ID = 'failingInboundAdapter'; + public const REQUEST_CHANNEL = 'failingInboundAdapterRequestChannel'; + + public bool $shouldFail = true; + public int $invocations = 0; + /** @var string[] */ + public array $processedPayloads = []; + private bool $hasEmitted = false; + + #[\Ecotone\Messaging\Attribute\Scheduled(self::REQUEST_CHANNEL, self::ENDPOINT_ID)] + #[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)] + public function emit(): ?string + { + if ($this->hasEmitted) { + return null; + } + $this->hasEmitted = true; + + return 'first-payload'; + } + + #[\Ecotone\Messaging\Attribute\ServiceActivator(self::REQUEST_CHANNEL)] + public function handle(string $payload): void + { + $this->invocations++; + if ($this->shouldFail) { + throw new \RuntimeException('simulated'); + } + $this->processedPayloads[] = $payload; + } + }; $connectionFactory = $this->getConnectionFactory(); $ecotone = EcotoneLite::bootstrapFlowTesting( @@ -220,11 +250,11 @@ public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_re ->withEnvironment('prod') ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) ->withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL), - classesToResolve: [ReplayableInboundChannelAdapterExample::class], + classesToResolve: [$handler::class], pathToRootCatalog: __DIR__ . '/../../', ); - $ecotone->run(ReplayableInboundChannelAdapterExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + $ecotone->run('failingInboundAdapter', ExecutionPollingMetadata::createWithTestingSetup( amountOfMessagesToHandle: 1, failAtError: false, )); @@ -237,8 +267,16 @@ classesToResolve: [ReplayableInboundChannelAdapterExample::class], $this->replyAllErrorMessages($ecotone); $this->assertErrorMessageCount($ecotone, 0); - $this->assertSame(2, $handler->invocations, 'Replay must re-invoke the handler synchronously via the routing slip stored on the failed Message'); + $this->assertSame(2, $handler->invocations, 'replyAll() must synchronously re-invoke the handler via MessagingEntrypoint — no second run() needed'); $this->assertSame(['first-payload'], $handler->processedPayloads, 'Replayed Message must carry the original payload back to the handler'); + + $ecotone->run('failingInboundAdapter', ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + $this->assertSame(2, $handler->invocations, 'Subsequent polls must not re-process the replayed Message (emit() returns null after the first emission)'); + $this->assertSame(['first-payload'], $handler->processedPayloads); } private function assertErrorMessageCount(FlowTestSupport $ecotone, int $amount, string $deadLetterReference = DeadLetterGateway::class): void diff --git a/packages/Ecotone/src/Messaging/Attribute/ChannelAdapter.php b/packages/Ecotone/src/Messaging/Attribute/ChannelAdapter.php index f88c3fd3b..634735172 100644 --- a/packages/Ecotone/src/Messaging/Attribute/ChannelAdapter.php +++ b/packages/Ecotone/src/Messaging/Attribute/ChannelAdapter.php @@ -7,6 +7,6 @@ /** * licence Apache-2.0 */ -abstract class ChannelAdapter extends IdentifiedAnnotation +abstract class ChannelAdapter extends MessageConsumer { } diff --git a/packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php b/packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php index 8f8128463..5fd09e1e7 100644 --- a/packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php +++ b/packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php @@ -8,19 +8,12 @@ #[Attribute(Attribute::TARGET_METHOD)] /** + * Base attribute for any Inbound Channel Adapter consuming from an external system + * (Kafka, AMQP, scheduled tasks, etc.). Subclasses include #[KafkaConsumer], #[RabbitConsumer], + * and #[ChannelAdapter] (the base for #[Scheduled]). + * * licence Apache-2.0 */ -class MessageConsumer +class MessageConsumer extends IdentifiedAnnotation { - private string $endpointId; - - public function __construct(string $endpointId) - { - $this->endpointId = $endpointId; - } - - public function getEndpointId(): string - { - return $this->endpointId; - } } diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php new file mode 100644 index 000000000..a5ee0d6e5 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php @@ -0,0 +1,44 @@ +findAnnotatedMethods(DelayedRetry::class) as $delayedRetryMethod) { - $isInboundChannelAdapter = $delayedRetryMethod->hasMethodAnnotation(\Ecotone\Messaging\Attribute\MessageConsumer::class) - || $delayedRetryMethod->hasMethodAnnotation(\Ecotone\Messaging\Attribute\ChannelAdapter::class); - if (! $isInboundChannelAdapter) { + if (! $delayedRetryMethod->hasMethodAnnotation(\Ecotone\Messaging\Attribute\MessageConsumer::class)) { continue; } throw ConfigurationException::create( - "#[DelayedRetry] cannot be used on an Inbound Channel Adapter `{$delayedRetryMethod->getClassName()}::{$delayedRetryMethod->getMethodName()}`. " . - 'Inbound Channel Adapters consume from external systems (Kafka, AMQP, scheduled tasks) and have no source Message Channel for the framework to reschedule a delayed retry into. ' . - 'Use #[ErrorChannel] to capture the failure for later replay (e.g. from a Dead Letter), and optionally combine it with #[InstantRetry] for in-process retries before forwarding to the Error Channel.' + ErrorChannelExceptionMessages::delayedRetryOnInboundChannelAdapter($delayedRetryMethod->getClassName(), $delayedRetryMethod->getMethodName()) ); } @@ -96,8 +92,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I if ($hasErrorChannel) { throw ConfigurationException::create( - "Handler `{$handlerEndpointId}` declares both #[ErrorChannel] and #[DelayedRetry] in #[Asynchronous] asynchronousExecution — these are mutually exclusive. " . - 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.' + ErrorChannelExceptionMessages::errorChannelAndDelayedRetryMutuallyExclusiveOnHandler($handlerEndpointId) ); } @@ -122,8 +117,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $errorChannelOnGateway = $annotationRegistrationService->findAnnotatedClasses(ErrorChannel::class); if (in_array($gatewayInterfaceFqn, $errorChannelOnGateway, true)) { throw ConfigurationException::create( - "Gateway `{$gatewayInterfaceFqn}` declares both #[ErrorChannel] and #[DelayedRetry] — these are mutually exclusive. " . - 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.' + ErrorChannelExceptionMessages::errorChannelAndDelayedRetryMutuallyExclusiveOnGateway($gatewayInterfaceFqn) ); } diff --git a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php index c612351f2..bc6a0e161 100644 --- a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php +++ b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php @@ -450,22 +450,19 @@ private function configureAsynchronousEndpoints(InterfaceToCallRegistry $interfa } if ($hasErrorChannel && $hasDelayedRetry) { throw ConfigurationException::create( - "Handler `{$targetEndpointId}` declares both #[ErrorChannel] and #[DelayedRetry] in #[Asynchronous] asynchronousExecution — these are mutually exclusive. " . - "Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel." + \Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::errorChannelAndDelayedRetryMutuallyExclusiveOnHandler($targetEndpointId) ); } foreach ($handlerInterface->getMethodAnnotations() as $methodAnnotation) { if ($methodAnnotation instanceof \Ecotone\Messaging\Attribute\ErrorChannel) { throw ConfigurationException::create( - "Asynchronous handler `{$targetEndpointId}` has `#[ErrorChannel]` placed directly on the handler method — this has no effect on async handlers. " . - "Pass it via the #[Asynchronous] attribute instead: `#[Asynchronous('channel', asynchronousExecution: [new ErrorChannel('...')])]` so the polling consumer routes failures correctly." + \Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::errorChannelDirectlyOnAsyncHandlerMethod($targetEndpointId) ); } if ($methodAnnotation instanceof \Ecotone\Messaging\Attribute\DelayedRetry) { throw ConfigurationException::create( - "Asynchronous handler `{$targetEndpointId}` has `#[DelayedRetry]` placed directly on the handler method — this has no effect on async handlers. " . - "Pass it via the #[Asynchronous] attribute instead: `#[Asynchronous('channel', asynchronousExecution: [new DelayedRetry(...)])]` so the polling consumer applies the retry policy correctly." + \Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::delayedRetryDirectlyOnAsyncHandlerMethod($targetEndpointId) ); } } diff --git a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php index b7b5fba69..bd6e021e2 100644 --- a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php +++ b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php @@ -68,18 +68,15 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $asynchronousEndpointsWithInstantRetry = []; $annotatedMethods = $annotationRegistrationService->findAnnotatedMethods(InstantRetry::class); foreach ($annotatedMethods as $annotatedMethod) { - $hasMessageConsumer = $annotatedMethod->hasMethodAnnotation(MessageConsumer::class); - $hasChannelAdapter = $annotatedMethod->hasMethodAnnotation(\Ecotone\Messaging\Attribute\ChannelAdapter::class); - if (! $hasMessageConsumer && ! $hasChannelAdapter) { + if (! $annotatedMethod->hasMethodAnnotation(MessageConsumer::class)) { throw new ConfigurationException(sprintf( - "InstantRetry attribute can only be used on Inbound Channel Adapter methods (annotated with MessageConsumer e.g. KafkaConsumer, RabbitConsumer; or with ChannelAdapter subclass e.g. #[Scheduled]). '%s' has neither.", + "InstantRetry attribute can only be used on Inbound Channel Adapter methods (annotated with MessageConsumer e.g. #[KafkaConsumer], #[RabbitConsumer], #[Scheduled]). '%s' has none.", $annotatedMethod->getClassName() . '::' . $annotatedMethod->getMethodName() )); } - $consumerAttribute = $hasMessageConsumer - ? $annotatedMethod->getMethodAnnotationsWithType(MessageConsumer::class)[0] - : $annotatedMethod->getMethodAnnotationsWithType(\Ecotone\Messaging\Attribute\ChannelAdapter::class)[0]; + /** @var MessageConsumer $consumerAttribute */ + $consumerAttribute = $annotatedMethod->getMethodAnnotationsWithType(MessageConsumer::class)[0]; $asynchronousEndpointsWithInstantRetry[$consumerAttribute->getEndpointId()] = $annotatedMethod->getAnnotationForMethod(); } diff --git a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannel/FailingScheduledExample.php b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannel/FailingScheduledExample.php deleted file mode 100644 index ebacd3a4e..000000000 --- a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannel/FailingScheduledExample.php +++ /dev/null @@ -1,31 +0,0 @@ -attemptsRecovers++; - if ($this->attemptsRecovers < 2) { - throw new RuntimeException('transient'); - } - $this->finallyHandled = true; - } - - #[Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ - new DelayedRetry( - initialDelayMs: 1, - multiplier: 1, - maxAttempts: 2, - deadLetterChannel: self::DEAD_LETTER_CHANNEL, - ), - ])] - #[CommandHandler(self::ROUTING_KEY_DEAD_LETTER, 'retryDeadLetter')] - public function alwaysFails(string $payload): void - { - $this->attemptsDeadLetter++; - throw new RuntimeException('permanent'); - } - - #[Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ - new DelayedRetry( - initialDelayMs: 1, - multiplier: 1, - maxAttempts: 1, - deadLetterChannel: self::DEAD_LETTER_CHANNEL, - ), - ])] - #[CommandHandler(self::ROUTING_KEY_OVERRIDE, 'retryOverride')] - public function alwaysFailsOverridingDefault(string $payload): void - { - $this->attemptsOverride++; - throw new RuntimeException('permanent'); - } - - #[QueryHandler('retryHandler.attemptsRecovers')] - public function getAttemptsRecovers(): int - { - return $this->attemptsRecovers; - } - - #[QueryHandler('retryHandler.attemptsDeadLetter')] - public function getAttemptsDeadLetter(): int - { - return $this->attemptsDeadLetter; - } - - #[QueryHandler('retryHandler.attemptsOverride')] - public function getAttemptsOverride(): int - { - return $this->attemptsOverride; - } - - #[QueryHandler('retryHandler.finallyHandled')] - public function isFinallyHandled(): bool - { - return $this->finallyHandled; - } -} diff --git a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/InboundChannelAdapterWithInstantRetryAndErrorChannel.php b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/InboundChannelAdapterWithInstantRetryAndErrorChannel.php deleted file mode 100644 index 1b236e8e1..000000000 --- a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsync/InboundChannelAdapterWithInstantRetryAndErrorChannel.php +++ /dev/null @@ -1,52 +0,0 @@ -hasEmitted) { - return null; - } - $this->hasEmitted = true; - - return 'payload'; - } - - #[ServiceActivator(self::REQUEST_CHANNEL)] - public function handle(string $payload): void - { - $this->invocations++; - if ($this->invocations <= $this->maxFailures) { - throw new RuntimeException('simulated'); - } - } -} diff --git a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithDelayedRetryDirectlyOnMethod.php b/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithDelayedRetryDirectlyOnMethod.php deleted file mode 100644 index 565064c03..000000000 --- a/packages/Ecotone/tests/Messaging/Fixture/Handler/ErrorChannelAsyncMisplaced/AsyncHandlerWithDelayedRetryDirectlyOnMethod.php +++ /dev/null @@ -1,28 +0,0 @@ -sideEffectExecuted = true; - throw new RuntimeException('handler-failure'); - } -} diff --git a/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php b/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php index 36f5b564b..cb957ef00 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php @@ -17,14 +17,7 @@ use Ecotone\Messaging\PollableChannel; use Ecotone\Test\LicenceTesting; use PHPUnit\Framework\TestCase; -use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannel\FailingScheduledExample; -use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannelAsync\AsyncFailingHandler; -use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannelAsync\DelayedRetryHandler; use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannel\OrderService; -use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannelAsync\InboundChannelAdapterWithInstantRetryAndErrorChannel; -use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannelAsyncMisplaced\AsyncHandlerWithDelayedRetryDirectlyOnMethod; -use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannelAsyncMisplaced\AsyncHandlerWithErrorChannelDirectlyOnMethod; -use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannelAsyncMisplaced\InboundChannelAdapterWithDelayedRetry; /** * @internal @@ -431,17 +424,26 @@ public function test_retry_policy_overrides_global_default_error_channel(): void public function test_async_handler_with_error_channel_directly_on_method_throws_descriptive_error(): void { + $service = new class () { + #[\Ecotone\Messaging\Attribute\Asynchronous('asyncMisplacedErrorChannel')] + #[\Ecotone\Messaging\Attribute\ErrorChannel('someErrorChannel')] + #[\Ecotone\Modelling\Attribute\CommandHandler('misplaced.errorchannel', 'misplacedErrorChannelHandler')] + public function handle(string $payload): void + { + } + }; + $this->expectException(\Ecotone\Messaging\Config\ConfigurationException::class); $this->expectExceptionMessage('#[ErrorChannel]'); $this->expectExceptionMessage('asynchronousExecution'); EcotoneLite::bootstrapFlowTesting( - [AsyncHandlerWithErrorChannelDirectlyOnMethod::class], - [new AsyncHandlerWithErrorChannelDirectlyOnMethod()], + [$service::class], + [$service], ServiceConfiguration::createWithDefaults() ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) ->withExtensionObjects([ - SimpleMessageChannelBuilder::createQueueChannel(AsyncHandlerWithErrorChannelDirectlyOnMethod::ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel('asyncMisplacedErrorChannel'), SimpleMessageChannelBuilder::createQueueChannel('someErrorChannel'), ]), licenceKey: LicenceTesting::VALID_LICENCE, @@ -450,17 +452,26 @@ public function test_async_handler_with_error_channel_directly_on_method_throws_ public function test_async_handler_with_delayed_retry_directly_on_method_throws_descriptive_error(): void { + $service = new class () { + #[\Ecotone\Messaging\Attribute\Asynchronous('asyncMisplacedDelayedRetry')] + #[\Ecotone\Messaging\Attribute\DelayedRetry(initialDelayMs: 1, maxAttempts: 2)] + #[\Ecotone\Modelling\Attribute\CommandHandler('misplaced.delayedretry', 'misplacedDelayedRetryHandler')] + public function handle(string $payload): void + { + } + }; + $this->expectException(\Ecotone\Messaging\Config\ConfigurationException::class); $this->expectExceptionMessage('#[DelayedRetry]'); $this->expectExceptionMessage('asynchronousExecution'); EcotoneLite::bootstrapFlowTesting( - [AsyncHandlerWithDelayedRetryDirectlyOnMethod::class], - [new AsyncHandlerWithDelayedRetryDirectlyOnMethod()], + [$service::class], + [$service], ServiceConfiguration::createWithDefaults() ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) ->withExtensionObjects([ - SimpleMessageChannelBuilder::createQueueChannel(AsyncHandlerWithDelayedRetryDirectlyOnMethod::ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel('asyncMisplacedDelayedRetry'), ]), licenceKey: LicenceTesting::VALID_LICENCE, ); @@ -468,14 +479,24 @@ public function test_async_handler_with_delayed_retry_directly_on_method_throws_ public function test_delayed_retry_on_inbound_channel_adapter_throws_descriptive_error(): void { + $service = new class () { + #[\Ecotone\Messaging\Attribute\DelayedRetry(initialDelayMs: 1, maxAttempts: 2)] + #[\Ecotone\Messaging\Attribute\Scheduled('inboundDelayedRetryChannel', 'inboundDelayedRetry')] + #[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)] + public function emit(): string + { + return 'payload'; + } + }; + $this->expectException(\Ecotone\Messaging\Config\ConfigurationException::class); $this->expectExceptionMessage('#[DelayedRetry] cannot be used on an Inbound Channel Adapter'); $this->expectExceptionMessage('#[ErrorChannel]'); $this->expectExceptionMessage('#[InstantRetry]'); EcotoneLite::bootstrapFlowTesting( - [InboundChannelAdapterWithDelayedRetry::class], - [new InboundChannelAdapterWithDelayedRetry()], + [$service::class], + [$service], ServiceConfiguration::createWithDefaults() ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])), licenceKey: LicenceTesting::VALID_LICENCE, @@ -554,3 +575,185 @@ public function test_inbound_channel_adapter_with_instant_retry_forwards_to_erro $this->assertNotNull($errorChannel->receive(), 'Failed message must land in the configured Error Channel after InstantRetry retries are exhausted'); } } + +/** + * licence Apache-2.0 + * + * @internal + */ +final class FailingScheduledExample +{ + public const ENDPOINT_ID = 'failing.scheduler'; + public const REQUEST_CHANNEL = 'failing.scheduler.input'; + + #[\Ecotone\Messaging\Attribute\Scheduled(self::REQUEST_CHANNEL, self::ENDPOINT_ID)] + #[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)] + public function poll(): string + { + return 'payload'; + } + + #[\Ecotone\Messaging\Attribute\ServiceActivator(self::REQUEST_CHANNEL)] + public function handle(string $payload): void + { + throw new \InvalidArgumentException('boom'); + } +} + +/** + * licence Enterprise + * + * Two async command handlers share the same async transport channel, + * but each declares its own #[ErrorChannel] via #[Asynchronous] asynchronousExecution. + * + * @internal + */ +final class AsyncFailingHandler +{ + public const SHARED_ASYNC_CHANNEL = 'sharedAsync'; + public const ROUTING_KEY_A = 'async.handler.a'; + public const ROUTING_KEY_B = 'async.handler.b'; + public const ERROR_CHANNEL_A = 'errorChannelA'; + public const ERROR_CHANNEL_B = 'errorChannelB'; + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::SHARED_ASYNC_CHANNEL, asynchronousExecution: [new \Ecotone\Messaging\Attribute\ErrorChannel(self::ERROR_CHANNEL_A)])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_A, 'asyncHandlerA')] + public function handleA(string $payload): void + { + throw new \RuntimeException('handler-a-failure'); + } + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::SHARED_ASYNC_CHANNEL, asynchronousExecution: [new \Ecotone\Messaging\Attribute\ErrorChannel(self::ERROR_CHANNEL_B)])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_B, 'asyncHandlerB')] + public function handleB(string $payload): void + { + throw new \RuntimeException('handler-b-failure'); + } +} + +/** + * licence Enterprise + * + * @internal + */ +final class DelayedRetryHandler +{ + public const ASYNC_CHANNEL = 'delayedRetryAsync'; + public const ROUTING_KEY_RECOVERS = 'retry.recovers'; + public const ROUTING_KEY_DEAD_LETTER = 'retry.deadletter'; + public const ROUTING_KEY_OVERRIDE = 'retry.override'; + public const DEAD_LETTER_CHANNEL = 'retryDeadLetterChannel'; + + public int $attemptsRecovers = 0; + public int $attemptsDeadLetter = 0; + public int $attemptsOverride = 0; + public bool $finallyHandled = false; + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ + new \Ecotone\Messaging\Attribute\DelayedRetry(initialDelayMs: 1, multiplier: 1, maxAttempts: 3), + ])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_RECOVERS, 'retryRecovers')] + public function recovers(string $payload): void + { + $this->attemptsRecovers++; + if ($this->attemptsRecovers < 2) { + throw new \RuntimeException('transient'); + } + $this->finallyHandled = true; + } + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ + new \Ecotone\Messaging\Attribute\DelayedRetry( + initialDelayMs: 1, + multiplier: 1, + maxAttempts: 2, + deadLetterChannel: self::DEAD_LETTER_CHANNEL, + ), + ])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_DEAD_LETTER, 'retryDeadLetter')] + public function alwaysFails(string $payload): void + { + $this->attemptsDeadLetter++; + throw new \RuntimeException('permanent'); + } + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ + new \Ecotone\Messaging\Attribute\DelayedRetry( + initialDelayMs: 1, + multiplier: 1, + maxAttempts: 1, + deadLetterChannel: self::DEAD_LETTER_CHANNEL, + ), + ])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_OVERRIDE, 'retryOverride')] + public function alwaysFailsOverridingDefault(string $payload): void + { + $this->attemptsOverride++; + throw new \RuntimeException('permanent'); + } + + #[\Ecotone\Modelling\Attribute\QueryHandler('retryHandler.attemptsRecovers')] + public function getAttemptsRecovers(): int + { + return $this->attemptsRecovers; + } + + #[\Ecotone\Modelling\Attribute\QueryHandler('retryHandler.attemptsDeadLetter')] + public function getAttemptsDeadLetter(): int + { + return $this->attemptsDeadLetter; + } + + #[\Ecotone\Modelling\Attribute\QueryHandler('retryHandler.attemptsOverride')] + public function getAttemptsOverride(): int + { + return $this->attemptsOverride; + } + + #[\Ecotone\Modelling\Attribute\QueryHandler('retryHandler.finallyHandled')] + public function isFinallyHandled(): bool + { + return $this->finallyHandled; + } +} + +/** + * licence Enterprise + * + * #[InstantRetry] retries the handler in-process before forwarding to #[ErrorChannel]. + * + * @internal + */ +final class InboundChannelAdapterWithInstantRetryAndErrorChannel +{ + public const ENDPOINT_ID = 'inboundInstantRetry'; + public const REQUEST_CHANNEL = 'inboundInstantRetryChannel'; + public const ERROR_CHANNEL = 'inboundInstantRetryErrorChannel'; + + public int $invocations = 0; + public int $maxFailures = 0; + public bool $hasEmitted = false; + + #[\Ecotone\Modelling\Attribute\InstantRetry(retryTimes: 2)] + #[\Ecotone\Messaging\Attribute\ErrorChannel(self::ERROR_CHANNEL)] + #[\Ecotone\Messaging\Attribute\Scheduled(self::REQUEST_CHANNEL, self::ENDPOINT_ID)] + #[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)] + public function emit(): ?string + { + if ($this->hasEmitted) { + return null; + } + $this->hasEmitted = true; + + return 'payload'; + } + + #[\Ecotone\Messaging\Attribute\ServiceActivator(self::REQUEST_CHANNEL)] + public function handle(string $payload): void + { + $this->invocations++; + if ($this->invocations <= $this->maxFailures) { + throw new \RuntimeException('simulated'); + } + } +} diff --git a/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php b/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php index 74ca8c361..ed4f62812 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php @@ -23,10 +23,8 @@ use Ramsey\Uuid\Uuid; use Ramsey\Uuid\UuidInterface; use RuntimeException; -use Test\Ecotone\Messaging\Fixture\Service\Gateway\DelayedRetryCommandBus; use Test\Ecotone\Messaging\Fixture\Service\Gateway\ErrorChannelCommandBus; use Test\Ecotone\Messaging\Fixture\Service\Gateway\ErrorChannelWithAsyncChannel; -use Test\Ecotone\Messaging\Fixture\Service\Gateway\HandlerLevelErrorChannelService; use Test\Ecotone\Messaging\Fixture\Service\Gateway\TicketService; use Test\Ecotone\Messaging\SerializationSupport; @@ -126,10 +124,22 @@ public function test_using_custom_error_channel_with_reply_channel(): void public function test_error_channel_on_command_handler_is_silently_ignored_must_be_placed_on_gateway(): void { - $service = new HandlerLevelErrorChannelService(); + // Wrong placement: #[ErrorChannel] on the handler method has no effect. + // Must be on the messaging entry-point (CommandBus/EventBus/BusinessMethod). + $service = new class () { + public bool $sideEffectExecuted = false; + + #[\Ecotone\Modelling\Attribute\CommandHandler('handler.level.error.channel.test')] + #[\Ecotone\Messaging\Attribute\ErrorChannel('handlerLevelErrorChannel')] + public function handle(mixed $payload): void + { + $this->sideEffectExecuted = true; + throw new RuntimeException('handler-failure'); + } + }; $ecotoneLite = EcotoneLite::bootstrapFlowTesting( - [HandlerLevelErrorChannelService::class], + [$service::class], [$service], ServiceConfiguration::createWithDefaults() ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) @@ -216,3 +226,18 @@ public function test_delayed_retry_on_command_bus_routes_failures_to_generated_c ); } } + +/** + * Custom Command Bus declaring a per-gateway #[DelayedRetry] policy. + * + * @internal + */ +#[DelayedRetry( + initialDelayMs: 1, + multiplier: 1, + maxAttempts: 1, + deadLetterChannel: 'gatewayRetryDeadLetter', +)] +interface DelayedRetryCommandBus extends \Ecotone\Modelling\CommandBus +{ +} diff --git a/packages/Kafka/tests/Fixture/KafkaConsumer/ReplayableKafkaConsumerExample.php b/packages/Kafka/tests/Fixture/KafkaConsumer/ReplayableKafkaConsumerExample.php deleted file mode 100644 index 065b31ed1..000000000 --- a/packages/Kafka/tests/Fixture/KafkaConsumer/ReplayableKafkaConsumerExample.php +++ /dev/null @@ -1,33 +0,0 @@ -invocations++; - if ($this->shouldFail) { - throw new RuntimeException('simulated'); - } - $this->processedPayloads[] = $payload; - } -} diff --git a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php index eaed87352..ea399ad2f 100644 --- a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php +++ b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php @@ -42,7 +42,6 @@ use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithDelayedRetryExample; use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithFailStrategyExample; use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithInstantRetryAndErrorChannelExample; -use Test\Ecotone\Kafka\Fixture\KafkaConsumer\ReplayableKafkaConsumerExample; use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithInstantRetryExample; use Test\Ecotone\Kafka\Fixture\MediaTypeConverter\JsonEncodingConverter; @@ -397,13 +396,31 @@ public function getProcessedMessages(): array public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler(): void { $topicName = 'test_topic_replayable_' . Uuid::v7()->toRfc4122(); - $handler = new ReplayableKafkaConsumerExample(); + $handler = new class () { + public const ENDPOINT_ID = 'replayable_kafka_consumer'; + public const TOPIC_REFERENCE = 'replayableKafkaTopic'; + + public bool $shouldFail = true; + public int $invocations = 0; + /** @var string[] */ + public array $processedPayloads = []; + + #[KafkaConsumer(self::ENDPOINT_ID, self::TOPIC_REFERENCE)] + public function handle(#[\Ecotone\Messaging\Attribute\Parameter\Payload] string $payload): void + { + $this->invocations++; + if ($this->shouldFail) { + throw new \RuntimeException('simulated'); + } + $this->processedPayloads[] = $payload; + } + }; $dbalConnectionFactory = new DbalConnectionFactory(getenv('DATABASE_DSN') ?: 'pgsql://ecotone:secret@database:5432/ecotone'); $this->cleanDeadLetterTable($dbalConnectionFactory); $ecotoneLite = EcotoneLite::bootstrapFlowTesting( - [ReplayableKafkaConsumerExample::class], + [$handler::class], [ KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), DbalConnectionFactory::class => $dbalConnectionFactory, @@ -416,8 +433,8 @@ public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_re ->withExtensionObjects([ KafkaPublisherConfiguration::createWithDefaults($topicName) ->withHeaderMapper('*'), - TopicConfiguration::createWithReferenceName(ReplayableKafkaConsumerExample::TOPIC_REFERENCE, $topicName), - KafkaConsumerConfiguration::createWithDefaults(ReplayableKafkaConsumerExample::ENDPOINT_ID), + TopicConfiguration::createWithReferenceName('replayableKafkaTopic', $topicName), + KafkaConsumerConfiguration::createWithDefaults('replayable_kafka_consumer'), DbalConfiguration::createWithDefaults() ->withAutomaticTableInitialization(true), ]), @@ -429,7 +446,7 @@ public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_re $messagePublisher = $ecotoneLite->getGateway(MessagePublisher::class); $messagePublisher->send($payload); - $ecotoneLite->run(ReplayableKafkaConsumerExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + $ecotoneLite->run('replayable_kafka_consumer', ExecutionPollingMetadata::createWithTestingSetup( maxExecutionTimeInMilliseconds: 30000, failAtError: false, )); @@ -446,6 +463,14 @@ public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_re $this->assertEquals(0, $deadLetter->count()); $this->assertSame(2, $handler->invocations, 'replyAll() must synchronously re-invoke the handler via MessagingEntrypoint — no second run() needed'); $this->assertSame([$payload], $handler->processedPayloads, 'Replayed Message must carry the original payload back to the handler'); + + $ecotoneLite->run('replayable_kafka_consumer', ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000, + failAtError: false, + )); + + $this->assertSame(2, $handler->invocations, 'Replayed Message must not be re-consumed from the Kafka topic (committed)'); + $this->assertSame([$payload], $handler->processedPayloads, 'Processed payloads must remain unchanged after a second poll'); } public function test_convert_and_send(): void From d7062c36e7ef33631285cc34cb17c9810bd1af8d Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 20:23:39 +0200 Subject: [PATCH 08/10] extract more exception messages into ErrorChannelExceptionMessages factory Move 6 more inline exception strings into the factory so the validation logic stays focused on intent rather than text: - DbalDeadLetterHandler: "cannot reply ... no polledChannelName/inboundRequestChannel/routingSlip" - DelayedRetryErrorHandler: "Failed to handle Error Message via Retry Configuration ..." - InstantRetryAttributeModule: "InstantRetry only on Inbound Channel Adapter" + Enterprise licence check - MessagingSystemConfiguration: "asynchronousExecution requires Enterprise" - MessagingGatewayModule: gateway-level Error Channel + DelayedRetry licence checks --- .../Recoverability/DbalDeadLetterHandler.php | 2 +- .../ErrorChannelExceptionMessages.php | 38 +++++++++++++++++++ .../MessagingGatewayModule.php | 4 +- .../Config/MessagingSystemConfiguration.php | 2 +- .../DelayedRetryErrorHandler.php | 2 +- .../InstantRetryAttributeModule.php | 8 ++-- 6 files changed, 47 insertions(+), 9 deletions(-) diff --git a/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php b/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php index 472dce52e..fb156c28f 100644 --- a/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php +++ b/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php @@ -250,7 +250,7 @@ private function replyWithoutInitialization(string $messageId, MessagingEntrypoi $hasRoutingSlip = $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP); if (! $hasPolledChannel && ! $hasInboundRequestChannel && ! $hasRoutingSlip) { - throw InvalidArgumentException::create("Can not reply to message {$messageId}, as it does not contain `polledChannelName`, `inboundRequestChannel` or `routingSlip` header. Please add one of them, so Message can be routed back to the original channel."); + throw InvalidArgumentException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::cannotReplyToDeadLetterMessage($messageId)); } if ($hasPolledChannel) { diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php index a5ee0d6e5..4c9a8e89b 100644 --- a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php @@ -41,4 +41,42 @@ public static function errorChannelAndDelayedRetryMutuallyExclusiveOnGateway(str return "Gateway `{$gatewayInterfaceFqn}` declares both #[ErrorChannel] and #[DelayedRetry] — these are mutually exclusive. " . 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.'; } + + public static function cannotReplyToDeadLetterMessage(string $messageId): string + { + return "Can not reply to message {$messageId}, as it does not contain `polledChannelName`, `inboundRequestChannel` or `routingSlip` header. " + . 'Please add one of them, so Message can be routed back to the original channel.'; + } + + public static function delayedRetryRequiresPolledChannelName(string $originalErrorMessage): string + { + return 'Failed to handle Error Message via Retry Configuration, as it does not contain information about origination channel from which it was polled. Original error message: ' + . $originalErrorMessage; + } + + public static function instantRetryNotOnInboundChannelAdapter(string $className, string $methodName): string + { + return "InstantRetry attribute can only be used on Inbound Channel Adapter methods (annotated with MessageConsumer e.g. #[KafkaConsumer], #[RabbitConsumer], #[Scheduled]). " + . "'{$className}::{$methodName}' has none."; + } + + public static function instantRetryRequiresEnterprise(): string + { + return 'Instant retry attribute is available only for Ecotone Enterprise.'; + } + + public static function asynchronousExecutionRequiresEnterprise(string $endpointId): string + { + return "Endpoint annotations on #[Asynchronous] attribute for endpoint `{$endpointId}` require Ecotone Enterprise licence."; + } + + public static function gatewayErrorChannelRequiresEnterprise(string $interfaceFqn, string $methodName): string + { + return "Gateway {$interfaceFqn}::{$methodName} is marked with synchronous Error Channel. This functionality is available as part of Ecotone Enterprise."; + } + + public static function gatewayDelayedRetryRequiresEnterprise(string $interfaceFqn, string $methodName): string + { + return "Gateway {$interfaceFqn}::{$methodName} is marked with #[DelayedRetry]. This functionality is available as part of Ecotone Enterprise."; + } } diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php index bfcdb6976..74718d59f 100644 --- a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php @@ -134,11 +134,11 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO } $errorChannel = $interfaceToCallRegistry->getFor($gatewayBuilder->getInterfaceName(), $gatewayBuilder->getRelatedMethodName())->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class)); if ($errorChannel && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { - throw LicensingException::create("Gateway {$gatewayBuilder->getInterfaceName()}::{$gatewayBuilder->getRelatedMethodName()} is marked with synchronous Error Channel. This functionality is available as part of Ecotone Enterprise."); + throw LicensingException::create(ErrorChannelExceptionMessages::gatewayErrorChannelRequiresEnterprise($gatewayBuilder->getInterfaceName(), $gatewayBuilder->getRelatedMethodName())); } $delayedRetry = $interfaceToCallRegistry->getFor($gatewayBuilder->getInterfaceName(), $gatewayBuilder->getRelatedMethodName())->getAnnotationsByImportanceOrder(Type::attribute(\Ecotone\Messaging\Attribute\DelayedRetry::class)); if ($delayedRetry && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { - throw LicensingException::create("Gateway {$gatewayBuilder->getInterfaceName()}::{$gatewayBuilder->getRelatedMethodName()} is marked with #[DelayedRetry]. This functionality is available as part of Ecotone Enterprise."); + throw LicensingException::create(ErrorChannelExceptionMessages::gatewayDelayedRetryRequiresEnterprise($gatewayBuilder->getInterfaceName(), $gatewayBuilder->getRelatedMethodName())); } $messagingConfiguration->registerGatewayBuilder($gatewayBuilder); diff --git a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php index bc6a0e161..e48cab727 100644 --- a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php +++ b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php @@ -436,7 +436,7 @@ private function configureAsynchronousEndpoints(InterfaceToCallRegistry $interfa } $endpointAnnotations = $asyncAttribute ? $asyncAttribute->getAsynchronousExecution() : []; if ($endpointAnnotations && ! $this->isRunningForEnterpriseLicence) { - throw LicensingException::create("Endpoint annotations on #[Asynchronous] attribute for endpoint `{$targetEndpointId}` require Ecotone Enterprise licence."); + throw LicensingException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::asynchronousExecutionRequiresEnterprise($targetEndpointId)); } $hasErrorChannel = false; diff --git a/packages/Ecotone/src/Messaging/Handler/Recoverability/DelayedRetryErrorHandler.php b/packages/Ecotone/src/Messaging/Handler/Recoverability/DelayedRetryErrorHandler.php index d0e72e907..a0e112011 100644 --- a/packages/Ecotone/src/Messaging/Handler/Recoverability/DelayedRetryErrorHandler.php +++ b/packages/Ecotone/src/Messaging/Handler/Recoverability/DelayedRetryErrorHandler.php @@ -43,7 +43,7 @@ public function handle( $failedMessage, ); - throw MessageHandlingException::create('Failed to handle Error Message via Retry Configuration, as it does not contain information about origination channel from which it was polled. Original error message: ' . $failedMessage->getExceptionMessage()); + throw MessageHandlingException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::delayedRetryRequiresPolledChannelName($failedMessage->getExceptionMessage())); } /** @var MessageChannel $messageChannel */ $messageChannel = $channelResolver->resolve($failedMessage->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME)); diff --git a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php index bd6e021e2..8d6d1f6a3 100644 --- a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php +++ b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php @@ -69,9 +69,9 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $annotatedMethods = $annotationRegistrationService->findAnnotatedMethods(InstantRetry::class); foreach ($annotatedMethods as $annotatedMethod) { if (! $annotatedMethod->hasMethodAnnotation(MessageConsumer::class)) { - throw new ConfigurationException(sprintf( - "InstantRetry attribute can only be used on Inbound Channel Adapter methods (annotated with MessageConsumer e.g. #[KafkaConsumer], #[RabbitConsumer], #[Scheduled]). '%s' has none.", - $annotatedMethod->getClassName() . '::' . $annotatedMethod->getMethodName() + throw new ConfigurationException(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::instantRetryNotOnInboundChannelAdapter( + $annotatedMethod->getClassName(), + $annotatedMethod->getMethodName(), )); } @@ -93,7 +93,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO } if (! $messagingConfiguration->isRunningForEnterpriseLicence()) { - throw LicensingException::create('Instant retry attribute is available only for Ecotone Enterprise.'); + throw LicensingException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::instantRetryRequiresEnterprise()); } // Register interceptors for interfaces with InstantRetry attribute From 42dfe0026c3dd4f71a4cad24f6a815e748dfb59a Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 20:43:12 +0200 Subject: [PATCH 09/10] update OpenTelemetry tracing tests for new inbound channel adapter span name #[Scheduled] handlers now set INBOUND_REQUEST_CHANNEL, so TracerInterceptor produces "Receiving from inbound channel adapter: " rather than the old fallback "Endpoint: produced Message". The new format is consistent with pollable channel spans ("Receiving from channel: ") and tells the user where the message went, not just which endpoint produced it. --- .../OpenTelemetry/tests/Integration/TracingTreeTest.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/OpenTelemetry/tests/Integration/TracingTreeTest.php b/packages/OpenTelemetry/tests/Integration/TracingTreeTest.php index 6d27dae02..02762a273 100644 --- a/packages/OpenTelemetry/tests/Integration/TracingTreeTest.php +++ b/packages/OpenTelemetry/tests/Integration/TracingTreeTest.php @@ -119,11 +119,11 @@ public function test_tracing_scheduled_handler() self::compareTreesByDetails( [ [ - 'details' => ['name' => 'Endpoint: scheduled_handler produced Message'], + 'details' => ['name' => 'Receiving from inbound channel adapter: nullChannel'], 'children' => [], ], [ - 'details' => ['name' => 'Endpoint: scheduled_handler produced Message'], + 'details' => ['name' => 'Receiving from inbound channel adapter: nullChannel'], 'children' => [], ], ], @@ -146,7 +146,7 @@ public function test_tracing_workflow_scheduled_handler() self::compareTreesByDetails( [ [ - 'details' => ['name' => 'Endpoint: scheduled_handler produced Message'], + 'details' => ['name' => 'Receiving from inbound channel adapter: add'], 'children' => [ [ 'details' => ['name' => 'Message Handler: ' . WorkflowScheduledHandler::class . '::add'], @@ -155,7 +155,7 @@ public function test_tracing_workflow_scheduled_handler() ], ], [ - 'details' => ['name' => 'Endpoint: scheduled_handler produced Message'], + 'details' => ['name' => 'Receiving from inbound channel adapter: add'], 'children' => [ [ 'details' => ['name' => 'Message Handler: ' . WorkflowScheduledHandler::class . '::add'], From 7f67bb9156b7ca5891ce82cdeb067acef1688ec0 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 7 May 2026 21:23:09 +0200 Subject: [PATCH 10/10] make multi-tenant projection test deterministic by polling all queued messages The async multi-tenant projection test relies on \$ecotone->run() processing both tenants' queued messages on a single call so each tenant's first message triggers lazy projection initialization. The default polling metadata was not guaranteed to drain the queue in CI, leaving tenant_b's projection state UNINITIALIZED and the in_progress_tickets table absent in tenant_b's database. Pass explicit ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 2) so both tenants' events are reliably processed in one run() call, ensuring lazy init fires for each tenant before queries run. --- .../tests/Integration/MultiTenantTest.php | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/PdoEventSourcing/tests/Integration/MultiTenantTest.php b/packages/PdoEventSourcing/tests/Integration/MultiTenantTest.php index 8e3b462ac..9ce86dff2 100644 --- a/packages/PdoEventSourcing/tests/Integration/MultiTenantTest.php +++ b/packages/PdoEventSourcing/tests/Integration/MultiTenantTest.php @@ -9,7 +9,9 @@ use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Messaging\Support\InvalidArgumentException; +use Ecotone\Projecting\ProjectionRegistry; use Test\Ecotone\EventSourcing\EventSourcingMessagingTestCase; use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\CloseTicket; use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\RegisterTicket; @@ -61,7 +63,10 @@ public function test_building_asynchronous_event_driven_projection_with_multi_te new RegisterTicket('122', 'Johnny', 'alert'), metadata: ['tenant' => 'tenant_b'] ); - $ecotone->run(InProgressTicketList::PROJECTION_CHANNEL); + $ecotone->run(InProgressTicketList::PROJECTION_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 2, + maxExecutionTimeInMilliseconds: 5000, + )); self::assertEquals( [['ticket_id' => '123', 'ticket_type' => 'alert']], @@ -81,7 +86,10 @@ public function test_building_asynchronous_event_driven_projection_with_multi_te metadata: ['tenant' => 'tenant_a'] ); - $ecotone->run(InProgressTicketList::PROJECTION_CHANNEL); + $ecotone->run(InProgressTicketList::PROJECTION_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 2, + maxExecutionTimeInMilliseconds: 5000, + )); self::assertEquals( [],