diff --git a/packages/Dbal/src/Recoverability/DbalDeadLetterConsoleCommand.php b/packages/Dbal/src/Recoverability/DbalDeadLetterConsoleCommand.php index 6c052edea..b13754153 100644 --- a/packages/Dbal/src/Recoverability/DbalDeadLetterConsoleCommand.php +++ b/packages/Dbal/src/Recoverability/DbalDeadLetterConsoleCommand.php @@ -53,7 +53,11 @@ public function show(DeadLetterGateway $deadLetterGateway, string $messageId, bo [ ['Message Id', $message->getHeaders()->getMessageId()], ['Failed At', $this->convertTimestampToReadableFormat($message->getHeaders()->getTimestamp())], - ['Channel Name', $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME)], + ['Channel Name', $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) + ? $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) + : ($message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL) + ? $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL) + : 'Unknown')], ['Type', $message->getHeaders()->containsKey(MessageHeaders::TYPE_ID) ? $message->getHeaders()->get(MessageHeaders::TYPE_ID) : 'Unknown'], ['Stacktrace', $this->getReadableStacktrace($message->getHeaders()->get(ErrorContext::EXCEPTION_STACKTRACE), $fullDetails)], ] diff --git a/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php b/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php index 160bc95fe..fb156c28f 100644 --- a/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php +++ b/packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php @@ -245,12 +245,18 @@ private function initialize(): void private function replyWithoutInitialization(string $messageId, MessagingEntrypointService $messagingEntrypoint): void { $message = $this->show($messageId); - if (! $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) && ! $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP)) { - throw InvalidArgumentException::create("Can not reply to message {$messageId}, as it does not contain either `polledChannelName` or `routingSlip` header. Please add one of them, so Message can be routed back to the original channel."); + $hasPolledChannel = $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME); + $hasInboundRequestChannel = $message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL); + $hasRoutingSlip = $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP); + + if (! $hasPolledChannel && ! $hasInboundRequestChannel && ! $hasRoutingSlip) { + throw InvalidArgumentException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::cannotReplyToDeadLetterMessage($messageId)); } - if ($message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME)) { + if ($hasPolledChannel) { $entrypoint = $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME); + } elseif ($hasInboundRequestChannel) { + $entrypoint = $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL); } else { // This allows to replay Error Message stored for synchronous calls (non asynchronous) $routingSlip = $message->getHeaders()->resolveRoutingSlip(); diff --git a/packages/Dbal/tests/Integration/DeadLetterTest.php b/packages/Dbal/tests/Integration/DeadLetterTest.php index 9dbc59663..9f1748b38 100644 --- a/packages/Dbal/tests/Integration/DeadLetterTest.php +++ b/packages/Dbal/tests/Integration/DeadLetterTest.php @@ -10,6 +10,7 @@ use Ecotone\Lite\Test\FlowTestSupport; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\Recoverability\ErrorContext; use Ecotone\Messaging\MessageHeaders; @@ -203,6 +204,81 @@ public function test_same_event_is_stored_in_dead_letter_twice_for_different_end $this->assertErrorMessageCount($ecotone, 0); } + public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler(): void + { + $handler = new class () { + public const ENDPOINT_ID = 'failingInboundAdapter'; + public const REQUEST_CHANNEL = 'failingInboundAdapterRequestChannel'; + + public bool $shouldFail = true; + public int $invocations = 0; + /** @var string[] */ + public array $processedPayloads = []; + private bool $hasEmitted = false; + + #[\Ecotone\Messaging\Attribute\Scheduled(self::REQUEST_CHANNEL, self::ENDPOINT_ID)] + #[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)] + public function emit(): ?string + { + if ($this->hasEmitted) { + return null; + } + $this->hasEmitted = true; + + return 'first-payload'; + } + + #[\Ecotone\Messaging\Attribute\ServiceActivator(self::REQUEST_CHANNEL)] + public function handle(string $payload): void + { + $this->invocations++; + if ($this->shouldFail) { + throw new \RuntimeException('simulated'); + } + $this->processedPayloads[] = $payload; + } + }; + $connectionFactory = $this->getConnectionFactory(); + + $ecotone = EcotoneLite::bootstrapFlowTesting( + containerOrAvailableServices: [ + $handler, + DbalConnectionFactory::class => $connectionFactory, + 'managerRegistry' => $connectionFactory, + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withEnvironment('prod') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL), + classesToResolve: [$handler::class], + pathToRootCatalog: __DIR__ . '/../../', + ); + + $ecotone->run('failingInboundAdapter', ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + $this->assertErrorMessageCount($ecotone, 1); + $this->assertSame(1, $handler->invocations, 'Handler must have been invoked once before the failure was captured'); + $this->assertSame([], $handler->processedPayloads); + + $handler->shouldFail = false; + $this->replyAllErrorMessages($ecotone); + + $this->assertErrorMessageCount($ecotone, 0); + $this->assertSame(2, $handler->invocations, 'replyAll() must synchronously re-invoke the handler via MessagingEntrypoint — no second run() needed'); + $this->assertSame(['first-payload'], $handler->processedPayloads, 'Replayed Message must carry the original payload back to the handler'); + + $ecotone->run('failingInboundAdapter', ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + $this->assertSame(2, $handler->invocations, 'Subsequent polls must not re-process the replayed Message (emit() returns null after the first emission)'); + $this->assertSame(['first-payload'], $handler->processedPayloads); + } + private function assertErrorMessageCount(FlowTestSupport $ecotone, int $amount, string $deadLetterReference = DeadLetterGateway::class): void { $gateway = $ecotone->getGateway(DeadLetterGateway::class); diff --git a/packages/Ecotone/src/Messaging/Attribute/Asynchronous.php b/packages/Ecotone/src/Messaging/Attribute/Asynchronous.php index 023497f56..01d57ff32 100644 --- a/packages/Ecotone/src/Messaging/Attribute/Asynchronous.php +++ b/packages/Ecotone/src/Messaging/Attribute/Asynchronous.php @@ -15,17 +15,17 @@ class Asynchronous { private string|array $channelName; /** @var AsynchronousEndpointAttribute[] */ - private array $endpointAnnotations; + private array $asynchronousExecution; /** - * @param AsynchronousEndpointAttribute[] $endpointAnnotations + * @param AsynchronousEndpointAttribute[] $asynchronousExecution Attributes scoped to the asynchronous execution context — applied when the polling consumer processes the Message, not at the synchronous bus call. */ - public function __construct(string|array $channelName, array $endpointAnnotations = []) + public function __construct(string|array $channelName, array $asynchronousExecution = []) { Assert::notNullAndEmpty($channelName, 'Channel name can not be empty string'); - Assert::allInstanceOfType($endpointAnnotations, AsynchronousEndpointAttribute::class); + Assert::allInstanceOfType($asynchronousExecution, AsynchronousEndpointAttribute::class); $this->channelName = $channelName; - $this->endpointAnnotations = $endpointAnnotations; + $this->asynchronousExecution = $asynchronousExecution; } public function getChannelName(): array @@ -36,8 +36,8 @@ public function getChannelName(): array /** * @return AsynchronousEndpointAttribute[] */ - public function getEndpointAnnotations(): array + public function getAsynchronousExecution(): array { - return $this->endpointAnnotations; + return $this->asynchronousExecution; } } diff --git a/packages/Ecotone/src/Messaging/Attribute/ChannelAdapter.php b/packages/Ecotone/src/Messaging/Attribute/ChannelAdapter.php index f88c3fd3b..634735172 100644 --- a/packages/Ecotone/src/Messaging/Attribute/ChannelAdapter.php +++ b/packages/Ecotone/src/Messaging/Attribute/ChannelAdapter.php @@ -7,6 +7,6 @@ /** * licence Apache-2.0 */ -abstract class ChannelAdapter extends IdentifiedAnnotation +abstract class ChannelAdapter extends MessageConsumer { } diff --git a/packages/Ecotone/src/Messaging/Attribute/DelayedRetry.php b/packages/Ecotone/src/Messaging/Attribute/DelayedRetry.php new file mode 100644 index 000000000..d3d089e50 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Attribute/DelayedRetry.php @@ -0,0 +1,38 @@ + 0, 'DelayedRetry initialDelayMs must be greater than 0'); + Assert::isTrue($multiplier > 0, 'DelayedRetry multiplier must be greater than 0'); + Assert::isTrue($maxAttempts === null || $maxAttempts > 0, 'DelayedRetry maxAttempts must be null (unlimited) or greater than 0'); + Assert::isTrue($deadLetterChannel === null || $deadLetterChannel !== '', 'DelayedRetry deadLetterChannel must be null or a non-empty channel name'); + } + + public static function generateChannelName(string $handlerEndpointId): string + { + return 'ecotone.retry.' . $handlerEndpointId; + } + + public static function generateGatewayChannelName(string $gatewayInterfaceFqn): string + { + return 'ecotone.retry.gateway.' . str_replace('\\', '.', $gatewayInterfaceFqn); + } +} diff --git a/packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php b/packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php index 5fa5c08e5..137177995 100644 --- a/packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php +++ b/packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php @@ -11,7 +11,7 @@ * licence Enterprise */ #[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)] -class ErrorChannel +class ErrorChannel implements AsynchronousEndpointAttribute { /** * @param string $errorChannelName Name of the error channel to send Message too diff --git a/packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php b/packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php index 8f8128463..5fd09e1e7 100644 --- a/packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php +++ b/packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php @@ -8,19 +8,12 @@ #[Attribute(Attribute::TARGET_METHOD)] /** + * Base attribute for any Inbound Channel Adapter consuming from an external system + * (Kafka, AMQP, scheduled tasks, etc.). Subclasses include #[KafkaConsumer], #[RabbitConsumer], + * and #[ChannelAdapter] (the base for #[Scheduled]). + * * licence Apache-2.0 */ -class MessageConsumer +class MessageConsumer extends IdentifiedAnnotation { - private string $endpointId; - - public function __construct(string $endpointId) - { - $this->endpointId = $endpointId; - } - - public function getEndpointId(): string - { - return $this->endpointId; - } } diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php new file mode 100644 index 000000000..4c9a8e89b --- /dev/null +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/ErrorChannelExceptionMessages.php @@ -0,0 +1,82 @@ +findAnnotatedMethods(DelayedRetry::class) as $delayedRetryMethod) { + if (! $delayedRetryMethod->hasMethodAnnotation(\Ecotone\Messaging\Attribute\MessageConsumer::class)) { + continue; + } + throw ConfigurationException::create( + ErrorChannelExceptionMessages::delayedRetryOnInboundChannelAdapter($delayedRetryMethod->getClassName(), $delayedRetryMethod->getMethodName()) + ); + } + + $endpointMethods = $annotationRegistrationService->findAnnotatedMethods(\Ecotone\Messaging\Attribute\EndpointAnnotation::class); + $asynchronousMethods = $annotationRegistrationService->findAnnotatedMethods(Asynchronous::class); + + foreach ($asynchronousMethods as $asynchronousMethod) { + /** @var Asynchronous $asyncAnnotation */ + $asyncAnnotation = $asynchronousMethod->getAnnotationForMethod(); + + $delayedRetry = null; + $hasErrorChannel = false; + foreach ($asyncAnnotation->getAsynchronousExecution() as $endpointAnnotation) { + if ($endpointAnnotation instanceof DelayedRetry) { + $delayedRetry = $endpointAnnotation; + } elseif ($endpointAnnotation instanceof ErrorChannel) { + $hasErrorChannel = true; + } + } + if ($delayedRetry === null) { + continue; + } + + foreach ($endpointMethods as $endpoint) { + if ($endpoint->getClassName() !== $asynchronousMethod->getClassName() + || $endpoint->getMethodName() !== $asynchronousMethod->getMethodName()) { + continue; + } + + /** @var \Ecotone\Messaging\Attribute\EndpointAnnotation $endpointAttribute */ + $endpointAttribute = $endpoint->getAnnotationForMethod(); + $handlerEndpointId = $endpointAttribute->getEndpointId(); + + if ($hasErrorChannel) { + throw ConfigurationException::create( + ErrorChannelExceptionMessages::errorChannelAndDelayedRetryMutuallyExclusiveOnHandler($handlerEndpointId) + ); + } + + $generatedErrorChannelName = DelayedRetry::generateChannelName($handlerEndpointId); + $retryTemplateBuilder = new RetryTemplateBuilder( + $delayedRetry->initialDelayMs, + $delayedRetry->multiplier, + $delayedRetry->maxDelayMs, + $delayedRetry->maxAttempts, + ); + + $perHandlerRetryConfigurations[] = self::buildErrorHandlerConfig($generatedErrorChannelName, $retryTemplateBuilder, $delayedRetry->deadLetterChannel); + break; + } + } + + $gatewayInterfacesWithDelayedRetry = $annotationRegistrationService->findAnnotatedClasses(DelayedRetry::class); + foreach ($gatewayInterfacesWithDelayedRetry as $gatewayInterfaceFqn) { + /** @var DelayedRetry $delayedRetry */ + $delayedRetry = AnnotatedDefinitionReference::getSingleAnnotationForClass($annotationRegistrationService, $gatewayInterfaceFqn, DelayedRetry::class); + + $errorChannelOnGateway = $annotationRegistrationService->findAnnotatedClasses(ErrorChannel::class); + if (in_array($gatewayInterfaceFqn, $errorChannelOnGateway, true)) { + throw ConfigurationException::create( + ErrorChannelExceptionMessages::errorChannelAndDelayedRetryMutuallyExclusiveOnGateway($gatewayInterfaceFqn) + ); + } + + $generatedErrorChannelName = DelayedRetry::generateGatewayChannelName($gatewayInterfaceFqn); + $retryTemplateBuilder = new RetryTemplateBuilder( + $delayedRetry->initialDelayMs, + $delayedRetry->multiplier, + $delayedRetry->maxDelayMs, + $delayedRetry->maxAttempts, + ); + $perHandlerRetryConfigurations[] = self::buildErrorHandlerConfig($generatedErrorChannelName, $retryTemplateBuilder, $delayedRetry->deadLetterChannel); + } + + return new self($perHandlerRetryConfigurations); + } + + private static function buildErrorHandlerConfig(string $errorChannelName, RetryTemplateBuilder $retryTemplateBuilder, ?string $deadLetterChannel): ErrorHandlerConfiguration + { + return $deadLetterChannel !== null + ? ErrorHandlerConfiguration::createWithDeadLetterChannel($errorChannelName, $retryTemplateBuilder, $deadLetterChannel) + : ErrorHandlerConfiguration::create($errorChannelName, $retryTemplateBuilder); } /** @@ -51,13 +149,18 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO if (! $this->hasErrorConfiguration($extensionObjects)) { $extensionObjects = [ErrorHandlerConfiguration::createDefault()]; } + $extensionObjects = array_merge($extensionObjects, $this->perHandlerRetryConfigurations); + $messagingConfiguration->registerServiceDefinition(RetryRunner::class, [new Reference(EcotoneClockInterface::class), new Reference(LoggingGateway::class)]); + $hasAnyErrorHandlerConfiguration = false; + /** @var ErrorHandlerConfiguration $extensionObject */ foreach ($extensionObjects as $index => $extensionObject) { if (! ($extensionObject instanceof ErrorHandlerConfiguration)) { continue; } + $hasAnyErrorHandlerConfiguration = true; $errorHandler = ServiceActivatorBuilder::createWithDefinition( new Definition(DelayedRetryErrorHandler::class, [ @@ -80,15 +183,18 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $messagingConfiguration ->registerMessageHandler($errorHandler) - ->registerDefaultChannelFor(SimpleMessageChannelBuilder::createPublishSubscribeChannel($extensionObject->getErrorChannelName())) - ->registerMessageHandler( - RouterBuilder::create( - new Definition(HeaderRouter::class, [MessageHeaders::POLLED_CHANNEL_NAME]), - $interfaceToCallRegistry->getFor(HeaderRouter::class, 'route') - ) - ->withEndpointId('error_handler.' . $extensionObject->getErrorChannelName() . '.router') - ->withInputChannelName('ecotone.recoverability.reply') - ); + ->registerDefaultChannelFor(SimpleMessageChannelBuilder::createPublishSubscribeChannel($extensionObject->getErrorChannelName())); + } + + if ($hasAnyErrorHandlerConfiguration) { + $messagingConfiguration->registerMessageHandler( + RouterBuilder::create( + new Definition(HeaderRouter::class, [MessageHeaders::POLLED_CHANNEL_NAME]), + $interfaceToCallRegistry->getFor(HeaderRouter::class, 'route') + ) + ->withEndpointId('error_handler.recoverability.reply.router') + ->withInputChannelName('ecotone.recoverability.reply') + ); } } diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php index 41db89d7b..74718d59f 100644 --- a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/MessagingGatewayModule.php @@ -134,7 +134,11 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO } $errorChannel = $interfaceToCallRegistry->getFor($gatewayBuilder->getInterfaceName(), $gatewayBuilder->getRelatedMethodName())->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class)); if ($errorChannel && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { - throw LicensingException::create("Gateway {$gatewayBuilder->getInterfaceName()}::{$gatewayBuilder->getRelatedMethodName()} is marked with synchronous Error Channel. This functionality is available as part of Ecotone Enterprise."); + throw LicensingException::create(ErrorChannelExceptionMessages::gatewayErrorChannelRequiresEnterprise($gatewayBuilder->getInterfaceName(), $gatewayBuilder->getRelatedMethodName())); + } + $delayedRetry = $interfaceToCallRegistry->getFor($gatewayBuilder->getInterfaceName(), $gatewayBuilder->getRelatedMethodName())->getAnnotationsByImportanceOrder(Type::attribute(\Ecotone\Messaging\Attribute\DelayedRetry::class)); + if ($delayedRetry && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { + throw LicensingException::create(ErrorChannelExceptionMessages::gatewayDelayedRetryRequiresEnterprise($gatewayBuilder->getInterfaceName(), $gatewayBuilder->getRelatedMethodName())); } $messagingConfiguration->registerGatewayBuilder($gatewayBuilder); diff --git a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php index 3d6cbff89..e48cab727 100644 --- a/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php +++ b/packages/Ecotone/src/Messaging/Config/MessagingSystemConfiguration.php @@ -434,13 +434,46 @@ private function configureAsynchronousEndpoints(InterfaceToCallRegistry $interfa } } } - $endpointAnnotations = $asyncAttribute ? $asyncAttribute->getEndpointAnnotations() : []; + $endpointAnnotations = $asyncAttribute ? $asyncAttribute->getAsynchronousExecution() : []; if ($endpointAnnotations && ! $this->isRunningForEnterpriseLicence) { - throw LicensingException::create("Endpoint annotations on #[Asynchronous] attribute for endpoint `{$targetEndpointId}` require Ecotone Enterprise licence."); + throw LicensingException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::asynchronousExecutionRequiresEnterprise($targetEndpointId)); + } + + $hasErrorChannel = false; + $hasDelayedRetry = false; + foreach ($endpointAnnotations as $endpointAnnotation) { + if ($endpointAnnotation instanceof \Ecotone\Messaging\Attribute\ErrorChannel) { + $hasErrorChannel = true; + } elseif ($endpointAnnotation instanceof \Ecotone\Messaging\Attribute\DelayedRetry) { + $hasDelayedRetry = true; + } + } + if ($hasErrorChannel && $hasDelayedRetry) { + throw ConfigurationException::create( + \Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::errorChannelAndDelayedRetryMutuallyExclusiveOnHandler($targetEndpointId) + ); + } + + foreach ($handlerInterface->getMethodAnnotations() as $methodAnnotation) { + if ($methodAnnotation instanceof \Ecotone\Messaging\Attribute\ErrorChannel) { + throw ConfigurationException::create( + \Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::errorChannelDirectlyOnAsyncHandlerMethod($targetEndpointId) + ); + } + if ($methodAnnotation instanceof \Ecotone\Messaging\Attribute\DelayedRetry) { + throw ConfigurationException::create( + \Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::delayedRetryDirectlyOnAsyncHandlerMethod($targetEndpointId) + ); + } + } + + $contextAnnotations = $endpointAnnotations; + if ($hasDelayedRetry) { + $contextAnnotations[] = new AsynchronousRunningEndpoint($targetEndpointId); } $asyncHandlerAnnotations[$handlerExecutionChannel] = array_map( fn ($a) => AttributeDefinition::fromObject($a), - $endpointAnnotations + $contextAnnotations ); $consequentialChannels = $asynchronousMessageChannels; diff --git a/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php b/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php index 122cc28ca..9a25e0326 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php +++ b/packages/Ecotone/src/Messaging/Endpoint/AcknowledgeConfirmationInterceptor.php @@ -36,7 +36,11 @@ public function __construct(private RetryRunner $retryRunner, private LoggingGat */ public function ack(MethodInvocation $methodInvocation, Message $message) { - $messageChannelName = $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) ? $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) : 'unknown'; + $messageChannelName = $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) + ? $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) + : ($message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL) + ? $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL) + : 'unknown'); $this->logger->info( sprintf( diff --git a/packages/Ecotone/src/Messaging/Endpoint/InboundChannelAdapter/InboundChannelAdapterBuilder.php b/packages/Ecotone/src/Messaging/Endpoint/InboundChannelAdapter/InboundChannelAdapterBuilder.php index 94fab5afa..442ab8b6c 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/InboundChannelAdapter/InboundChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Messaging/Endpoint/InboundChannelAdapter/InboundChannelAdapterBuilder.php @@ -142,6 +142,7 @@ public function compile(MessagingContainerBuilder $builder): Definition return new Definition(InvocationPollerAdapter::class, [ $objectReference, $methodName, + $this->requestChannelName, ]); } } diff --git a/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php b/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php index 313f2cba0..e8c0f9811 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Messaging/Endpoint/InterceptedChannelAdapterBuilder.php @@ -85,6 +85,7 @@ private function getErrorInterceptorReference(MessagingContainerBuilder $builder $builder->register(PollingConsumerErrorChannelInterceptor::class, new Definition(PollingConsumerErrorChannelInterceptor::class, [ Reference::to(ErrorChannelService::class), Reference::to(ChannelResolver::class), + Reference::to(AsyncEndpointAnnotationContext::class), ])); } return AroundInterceptorBuilder::create( diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php index 406770418..a8685e32f 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedPollingConsumerBuilder.php @@ -125,6 +125,7 @@ private function getErrorInterceptorReference(MessagingContainerBuilder $builder $builder->register(PollingConsumerErrorChannelInterceptor::class, new Definition(PollingConsumerErrorChannelInterceptor::class, [ Reference::to(ErrorChannelService::class), new Reference(ChannelResolver::class), + new Reference(AsyncEndpointAnnotationContext::class), ])); } return AroundInterceptorBuilder::create( diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/InvocationPollerAdapter.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/InvocationPollerAdapter.php index 8397024dc..ed7cb5806 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/InvocationPollerAdapter.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/MessagePoller/InvocationPollerAdapter.php @@ -4,6 +4,7 @@ use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Message; +use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\MessagePoller; use Ecotone\Messaging\Support\MessageBuilder; @@ -12,8 +13,11 @@ */ class InvocationPollerAdapter implements MessagePoller { - public function __construct(private object $serviceToCall, private string $methodName) - { + public function __construct( + private object $serviceToCall, + private string $methodName, + private ?string $inboundRequestChannelName = null, + ) { } public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message @@ -22,9 +26,15 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message if ($result === null) { return null; } - return $result instanceof Message - ? $result - : MessageBuilder::withPayload($result)->build(); + $message = $result instanceof Message + ? MessageBuilder::fromMessage($result) + : MessageBuilder::withPayload($result); + + if ($this->inboundRequestChannelName !== null) { + $message = $message->setHeader(MessageHeaders::INBOUND_REQUEST_CHANNEL, $this->inboundRequestChannelName); + } + + return $message->build(); } public function onConsumerStop(): void diff --git a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php index 37f0ede1d..6a548732c 100644 --- a/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php +++ b/packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/PollingConsumerErrorChannelInterceptor.php @@ -2,6 +2,9 @@ namespace Ecotone\Messaging\Endpoint\PollingConsumer; +use Ecotone\Messaging\Attribute\AsynchronousRunningEndpoint; +use Ecotone\Messaging\Attribute\ErrorChannel; +use Ecotone\Messaging\Attribute\DelayedRetry; use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\ChannelResolver; use Ecotone\Messaging\Handler\Gateway\ErrorChannelService; @@ -18,6 +21,7 @@ class PollingConsumerErrorChannelInterceptor public function __construct( private ErrorChannelService $errorChannelService, private ChannelResolver $channelResolver, + private AsyncEndpointAnnotationContext $asyncEndpointAnnotationContext, ) { } @@ -37,22 +41,76 @@ private function tryToSendToErrorChannel(Throwable $exception, Message $requestM if ($requestMessage->getHeaders()->containsKey(MessageHeaders::CONSUMER_POLLING_METADATA)) { /** @var PollingMetadata $pollingMetadata */ $pollingMetadata = $requestMessage->getHeaders()->get(MessageHeaders::CONSUMER_POLLING_METADATA); - $errorChannelName = $pollingMetadata->getErrorChannelName(); + + if ($pollingMetadata->isStoppedOnError()) { + return false; + } + + $errorChannelName = $this->resolveHandlerScopedErrorChannelName() + ?? $pollingMetadata->getErrorChannelName(); if (! $errorChannelName) { return false; } - $this->errorChannelService->handle( - $requestMessage, - $exception, - $this->channelResolver->resolve($errorChannelName), - $requestMessage->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) - ); + if ($this->isInboundChannelAdapter($requestMessage)) { + $this->errorChannelService->handle( + $requestMessage, + $exception, + $this->channelResolver->resolve($errorChannelName), + null, + $this->resolveInboundRequestChannelName($requestMessage, $pollingMetadata), + ); + } else { + $this->errorChannelService->handle( + $requestMessage, + $exception, + $this->channelResolver->resolve($errorChannelName), + $requestMessage->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME), + null, + ); + } return true; } return false; } + + private function isInboundChannelAdapter(Message $requestMessage): bool + { + return ! $requestMessage->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME); + } + + private function resolveInboundRequestChannelName(Message $requestMessage, PollingMetadata $pollingMetadata): string + { + if ($requestMessage->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL)) { + return $requestMessage->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL); + } + + return $pollingMetadata->getEndpointId(); + } + + private function resolveHandlerScopedErrorChannelName(): ?string + { + $handlerEndpointId = null; + $hasDelayedRetry = false; + + foreach ($this->asyncEndpointAnnotationContext->getCurrentAnnotations() as $annotation) { + if ($annotation instanceof ErrorChannel) { + return $annotation->errorChannelName; + } + if ($annotation instanceof DelayedRetry) { + $hasDelayedRetry = true; + } elseif ($annotation instanceof AsynchronousRunningEndpoint) { + $handlerEndpointId = $annotation->getEndpointId(); + } + } + + if ($hasDelayedRetry && $handlerEndpointId !== null) { + return DelayedRetry::generateChannelName($handlerEndpointId); + } + + return null; + } } diff --git a/packages/Ecotone/src/Messaging/Handler/Gateway/EnterpriseGatewayErrorChannelResolver.php b/packages/Ecotone/src/Messaging/Handler/Gateway/EnterpriseGatewayErrorChannelResolver.php index 42b5b33e2..e96471980 100644 --- a/packages/Ecotone/src/Messaging/Handler/Gateway/EnterpriseGatewayErrorChannelResolver.php +++ b/packages/Ecotone/src/Messaging/Handler/Gateway/EnterpriseGatewayErrorChannelResolver.php @@ -4,7 +4,9 @@ namespace Ecotone\Messaging\Handler\Gateway; +use Ecotone\Messaging\Attribute\DelayedRetry; use Ecotone\Messaging\Attribute\ErrorChannel; +use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Messaging\Handler\InterfaceToCall; use Ecotone\Messaging\Handler\Type; @@ -19,33 +21,75 @@ public function getErrorChannel(InterfaceToCall $interfaceToCall, array $endpoin return $errorChannelName; } + $this->assertNotBothErrorChannelAndDelayedRetry($interfaceToCall, $endpointAnnotations); + foreach ($endpointAnnotations as $endpointAnnotation) { if ($endpointAnnotation->getClassName() === ErrorChannel::class) { return $endpointAnnotation->instance()->errorChannelName; } + if ($endpointAnnotation->getClassName() === DelayedRetry::class) { + return DelayedRetry::generateGatewayChannelName($interfaceToCall->getInterfaceName()); + } } /** @var ErrorChannel[] $errorChannel */ $errorChannel = $interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class)); + if ($errorChannel) { + return $errorChannel[0]->errorChannelName; + } + + /** @var DelayedRetry[] $delayedRetry */ + $delayedRetry = $interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(DelayedRetry::class)); + if ($delayedRetry) { + return DelayedRetry::generateGatewayChannelName($interfaceToCall->getInterfaceName()); + } - return $errorChannel ? $errorChannel[0]->errorChannelName : null; + return null; } public function getErrorChannelRoutingSlip(InterfaceToCall $interfaceToCall, array $endpointAnnotations, string $requestChannelName): ?string { - /** @var ErrorChannel[] $errorChannelAttributes */ - $errorChannelAttributes = $interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class)); + $this->assertNotBothErrorChannelAndDelayedRetry($interfaceToCall, $endpointAnnotations); foreach ($endpointAnnotations as $endpointAnnotation) { - if ($endpointAnnotation->getClassName() === ErrorChannel::class) { + if ($endpointAnnotation->getClassName() === ErrorChannel::class + || $endpointAnnotation->getClassName() === DelayedRetry::class) { return $requestChannelName; } } - if ($errorChannelAttributes) { + if ($interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class))) { + return $requestChannelName; + } + if ($interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(DelayedRetry::class))) { return $requestChannelName; } return null; } + + private function assertNotBothErrorChannelAndDelayedRetry(InterfaceToCall $interfaceToCall, array $endpointAnnotations): void + { + $hasErrorChannel = false; + $hasDelayedRetry = false; + foreach ($endpointAnnotations as $endpointAnnotation) { + if ($endpointAnnotation->getClassName() === ErrorChannel::class) { + $hasErrorChannel = true; + } elseif ($endpointAnnotation->getClassName() === DelayedRetry::class) { + $hasDelayedRetry = true; + } + } + if ($interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(ErrorChannel::class))) { + $hasErrorChannel = true; + } + if ($interfaceToCall->getAnnotationsByImportanceOrder(Type::attribute(DelayedRetry::class))) { + $hasDelayedRetry = true; + } + if ($hasErrorChannel && $hasDelayedRetry) { + throw ConfigurationException::create( + "Gateway `{$interfaceToCall->getInterfaceName()}` declares both #[ErrorChannel] and #[DelayedRetry] — these are mutually exclusive. " . + 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.' + ); + } + } } diff --git a/packages/Ecotone/src/Messaging/Handler/Recoverability/DelayedRetryErrorHandler.php b/packages/Ecotone/src/Messaging/Handler/Recoverability/DelayedRetryErrorHandler.php index d0e72e907..a0e112011 100644 --- a/packages/Ecotone/src/Messaging/Handler/Recoverability/DelayedRetryErrorHandler.php +++ b/packages/Ecotone/src/Messaging/Handler/Recoverability/DelayedRetryErrorHandler.php @@ -43,7 +43,7 @@ public function handle( $failedMessage, ); - throw MessageHandlingException::create('Failed to handle Error Message via Retry Configuration, as it does not contain information about origination channel from which it was polled. Original error message: ' . $failedMessage->getExceptionMessage()); + throw MessageHandlingException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::delayedRetryRequiresPolledChannelName($failedMessage->getExceptionMessage())); } /** @var MessageChannel $messageChannel */ $messageChannel = $channelResolver->resolve($failedMessage->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME)); diff --git a/packages/Ecotone/src/Messaging/MessageHeaders.php b/packages/Ecotone/src/Messaging/MessageHeaders.php index d80e9e991..bf031b1ae 100644 --- a/packages/Ecotone/src/Messaging/MessageHeaders.php +++ b/packages/Ecotone/src/Messaging/MessageHeaders.php @@ -93,9 +93,15 @@ final class MessageHeaders */ public const CONSUMER_ACK_HEADER_LOCATION = 'consumerAcknowledgeCallbackHeader'; /** - * Consumed channel name + * Consumed channel name (set when the Message originates from a pollable Message Channel) */ public const POLLED_CHANNEL_NAME = 'polledChannelName'; + /** + * Inbound Channel Adapter request channel name (set when the Message originates from an Inbound Channel Adapter + * such as #[KafkaConsumer], AMQP inbound, #[Scheduled]). Carries the user-facing request channel where the Message + * is dispatched after polling, so it can be replayed back to the same handler. + */ + public const INBOUND_REQUEST_CHANNEL = 'inboundRequestChannel'; /** * Current polling metadata */ @@ -176,6 +182,7 @@ public static function getFrameworksHeaderNames(): array self::TIME_TO_LIVE, self::DELIVERY_DELAY, self::POLLED_CHANNEL_NAME, + self::INBOUND_REQUEST_CHANNEL, self::REPLY_CONTENT_TYPE, self::STREAM_BASED_SOURCED, MessagingEntrypointService::ENTRYPOINT, @@ -251,6 +258,7 @@ public static function unsetEnqueueMetadata(array $metadata): array $metadata[self::CONTENT_TYPE], $metadata[self::CONSUMER_ACK_HEADER_LOCATION], $metadata[self::POLLED_CHANNEL_NAME], + $metadata[self::INBOUND_REQUEST_CHANNEL], $metadata[self::CONSUMER_POLLING_METADATA], $metadata[self::REPLY_CHANNEL], $metadata[self::TEMPORARY_SPAN_CONTEXT_HEADER], diff --git a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php index 6b8d36c19..8d6d1f6a3 100644 --- a/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php +++ b/packages/Ecotone/src/Modelling/Config/InstantRetry/InstantRetryAttributeModule.php @@ -69,15 +69,15 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $annotatedMethods = $annotationRegistrationService->findAnnotatedMethods(InstantRetry::class); foreach ($annotatedMethods as $annotatedMethod) { if (! $annotatedMethod->hasMethodAnnotation(MessageConsumer::class)) { - throw new ConfigurationException(sprintf( - "InstantRetry attribute can only be used on methods annotated with MessageConsumer. '%s' is not annotated with MessageConsumer (e.g. RabbitConsumer, KafkaConsumer).", - $annotatedMethod->getClassName() . '::' . $annotatedMethod->getMethodName() + throw new ConfigurationException(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::instantRetryNotOnInboundChannelAdapter( + $annotatedMethod->getClassName(), + $annotatedMethod->getMethodName(), )); } - /** @var MessageConsumer $messageConsumer */ - $messageConsumer = $annotatedMethod->getMethodAnnotationsWithType(MessageConsumer::class)[0]; - $asynchronousEndpointsWithInstantRetry[$messageConsumer->getEndpointId()] = $annotatedMethod->getAnnotationForMethod(); + /** @var MessageConsumer $consumerAttribute */ + $consumerAttribute = $annotatedMethod->getMethodAnnotationsWithType(MessageConsumer::class)[0]; + $asynchronousEndpointsWithInstantRetry[$consumerAttribute->getEndpointId()] = $annotatedMethod->getAnnotationForMethod(); } return new self($commandBusesWithInstantRetry, $asynchronousEndpointsWithInstantRetry); @@ -93,7 +93,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO } if (! $messagingConfiguration->isRunningForEnterpriseLicence()) { - throw LicensingException::create('Instant retry attribute is available only for Ecotone Enterprise.'); + throw LicensingException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::instantRetryRequiresEnterprise()); } // Register interceptors for interfaces with InstantRetry attribute diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index c85012bcd..fb6b21821 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -126,7 +126,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $asyncAttribute = $projectionBuilder instanceof EcotoneProjectionExecutorBuilder ? $projectionBuilder->getAsyncAttribute() : null; if ($asyncAttribute !== null) { - $endpointAnnotations = $asyncAttribute->getEndpointAnnotations(); + $endpointAnnotations = $asyncAttribute->getAsynchronousExecution(); if ($messagingConfiguration->isRunningForEnterpriseLicence()) { $endpointAnnotations = array_merge($endpointAnnotations, [new WithoutDatabaseTransaction(), new WithoutMessageCollector()]); } diff --git a/packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/ErrorChannelCommandBus.php b/packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/ErrorChannelCommandBus.php index ff639640c..770d3455d 100644 --- a/packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/ErrorChannelCommandBus.php +++ b/packages/Ecotone/tests/Messaging/Fixture/Service/Gateway/ErrorChannelCommandBus.php @@ -7,6 +7,17 @@ use Ecotone\Messaging\Attribute\ErrorChannel; use Ecotone\Modelling\CommandBus; +/** + * Recommended pattern: extend CommandBus and place #[ErrorChannel] on the + * gateway interface. This wires the framework's ErrorChannelInterceptor at + * the gateway boundary, so any interceptor stack registered on the gateway + * (e.g. #[WithTransactional]) wraps the handler call. On failure, those + * gateway-level interceptors fully unwind their effects (transaction rollback, + * etc.) BEFORE the error message is captured to the configured error channel. + * + * The same pattern applies to EventBus, QueryBus, MessagePublisher and any + * #[BusinessMethod] interface — placement must be on the entry-point. + */ #[ErrorChannel('someErrorChannel')] interface ErrorChannelCommandBus extends CommandBus { diff --git a/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php b/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php index 76637394a..afa604c73 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Config/AsyncEndpointAnnotationTest.php @@ -45,7 +45,7 @@ public function test_around_interceptor_receives_handler_attribute_on_async_endp $collector->receivedAttribute = null; $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('test-value')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('test-value')])] #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] public function handle(string $payload): void { @@ -89,7 +89,7 @@ public function test_before_interceptor_receives_handler_attribute_on_async_endp $collector->receivedAttribute = null; $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('before-value')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('before-value')])] #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] public function handle(string $payload): void { @@ -133,13 +133,13 @@ public function test_multiple_handlers_on_same_channel_resolve_correct_attribute $collector->receivedAttributes = []; $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('handler-one')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('handler-one')])] #[CommandHandler('doWorkOne', endpointId: 'doWorkOne.endpoint')] public function handleOne(string $payload): void { } - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('handler-two')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('handler-two')])] #[CommandHandler('doWorkTwo', endpointId: 'doWorkTwo.endpoint')] public function handleTwo(string $payload): void { @@ -226,7 +226,7 @@ public function test_endpoint_annotations_require_enterprise_licence(): void $this->expectException(LicensingException::class); $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new CustomAsyncAttribute('test')])] + #[Asynchronous('async', asynchronousExecution: [new CustomAsyncAttribute('test')])] #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] public function handle(string $payload): void { @@ -289,7 +289,7 @@ public function handle(stdClass $event): void public function test_without_message_collector_events_are_sent_directly_and_survive_handler_failure(): void { $handler = new class () { - #[Asynchronous('async', endpointAnnotations: [new WithoutMessageCollector()])] + #[Asynchronous('async', asynchronousExecution: [new WithoutMessageCollector()])] #[CommandHandler('doWork', endpointId: 'doWork.endpoint')] public function handle(string $payload, EventBus $eventBus): void { diff --git a/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php b/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php index 9204c66c0..cb957ef00 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Handler/ErrorHandler/ErrorChannelTest.php @@ -10,8 +10,12 @@ use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Messaging\Endpoint\FinalFailureStrategy; +use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\Recoverability\ErrorHandlerConfiguration; use Ecotone\Messaging\Handler\Recoverability\RetryTemplateBuilder; +use Ecotone\Messaging\MessageHeaders; +use Ecotone\Messaging\PollableChannel; +use Ecotone\Test\LicenceTesting; use PHPUnit\Framework\TestCase; use Test\Ecotone\Messaging\Fixture\Handler\ErrorChannel\OrderService; @@ -159,4 +163,597 @@ public function test_using_custom_channel_for_error_handling(): void ; $this->assertSame(1, $ecotone->sendQueryWithRouting('getOrderAmount')); } + + public function test_inbound_channel_adapter_sends_failed_message_to_default_error_channel_using_routing_slip(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [FailingScheduledExample::class], + [new FailingScheduledExample()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel('customErrorChannel') + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel('customErrorChannel'), + ]), + ); + + $ecotone->run(FailingScheduledExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + /** @var PollableChannel $errorChannel */ + $errorChannel = $ecotone->getMessageChannel('customErrorChannel'); + $errorMessage = $errorChannel->receive(); + $this->assertNotNull($errorMessage, 'Expected failed message to be delivered to default error channel'); + + $headers = $errorMessage->getHeaders(); + $this->assertFalse( + $headers->containsKey(MessageHeaders::POLLED_CHANNEL_NAME), + 'Inbound Channel Adapter has no source pollable Message Channel; POLLED_CHANNEL_NAME must not be set' + ); + $this->assertTrue( + $headers->containsKey(MessageHeaders::ROUTING_SLIP), + 'Routing slip is required for replay back to an Inbound Channel Adapter consumer (Kafka, AMQP inbound, #[Scheduled], etc.)' + ); + $this->assertSame(FailingScheduledExample::REQUEST_CHANNEL, $headers->get(MessageHeaders::ROUTING_SLIP)); + } + + public function test_inbound_channel_adapter_with_delayed_retry_template_throws_clear_error_about_missing_polled_channel(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [FailingScheduledExample::class], + [new FailingScheduledExample()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel('retryErrorChannel') + ->withExtensionObjects([ + ErrorHandlerConfiguration::create( + 'retryErrorChannel', + RetryTemplateBuilder::exponentialBackoff(1, 1)->maxRetryAttempts(2) + ), + ]), + ); + + $this->expectException(\Ecotone\Messaging\Handler\MessageHandlingException::class); + $this->expectExceptionMessage('does not contain information about origination channel from which it was polled'); + + $ecotone->run(FailingScheduledExample::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + } + + public function test_async_handler_routes_failure_to_error_channel_declared_via_endpoint_annotations(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [AsyncFailingHandler::class], + [new AsyncFailingHandler()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::SHARED_ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_A), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_B), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(AsyncFailingHandler::ROUTING_KEY_A, 'payload-a'); + $ecotone->run(AsyncFailingHandler::SHARED_ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + /** @var PollableChannel $errorChannelA */ + $errorChannelA = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_A); + /** @var PollableChannel $errorChannelB */ + $errorChannelB = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_B); + + $this->assertNotNull($errorChannelA->receive(), 'Handler A failure must be routed to its declared error channel'); + $this->assertNull($errorChannelB->receive(), 'Handler B error channel must remain empty when only handler A failed'); + } + + public function test_two_async_handlers_sharing_channel_each_route_failures_to_their_own_error_channel(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [AsyncFailingHandler::class], + [new AsyncFailingHandler()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::SHARED_ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_A), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_B), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(AsyncFailingHandler::ROUTING_KEY_A, 'payload-a'); + $ecotone->sendCommandWithRoutingKey(AsyncFailingHandler::ROUTING_KEY_B, 'payload-b'); + + $ecotone->run(AsyncFailingHandler::SHARED_ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 2, + failAtError: false, + )); + + /** @var PollableChannel $errorChannelA */ + $errorChannelA = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_A); + /** @var PollableChannel $errorChannelB */ + $errorChannelB = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_B); + + $messageInA = $errorChannelA->receive(); + $messageInB = $errorChannelB->receive(); + + $this->assertNotNull($messageInA, 'Handler A failure must land in error channel A'); + $this->assertNotNull($messageInB, 'Handler B failure must land in error channel B'); + $this->assertNull($errorChannelA->receive(), 'Only one message expected in error channel A'); + $this->assertNull($errorChannelB->receive(), 'Only one message expected in error channel B'); + } + + public function test_async_handler_endpoint_annotation_error_channel_overrides_default_error_channel(): void + { + $ecotone = EcotoneLite::bootstrapFlowTesting( + [AsyncFailingHandler::class], + [new AsyncFailingHandler()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel('globalDefaultErrorChannel') + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::SHARED_ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(AsyncFailingHandler::ERROR_CHANNEL_A), + SimpleMessageChannelBuilder::createQueueChannel('globalDefaultErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(AsyncFailingHandler::ROUTING_KEY_A, 'payload-a'); + $ecotone->run(AsyncFailingHandler::SHARED_ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + /** @var PollableChannel $globalDefault */ + $globalDefault = $ecotone->getMessageChannel('globalDefaultErrorChannel'); + /** @var PollableChannel $errorChannelA */ + $errorChannelA = $ecotone->getMessageChannel(AsyncFailingHandler::ERROR_CHANNEL_A); + + $this->assertNotNull($errorChannelA->receive(), 'Per-handler #[ErrorChannel] must override the default error channel'); + $this->assertNull($globalDefault->receive(), 'Default error channel must not receive the failure when handler declares its own'); + } + + public function test_retry_policy_retries_handler_until_success(): void + { + $handler = new DelayedRetryHandler(); + $ecotone = EcotoneLite::bootstrapFlowTesting( + [DelayedRetryHandler::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::ASYNC_CHANNEL), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(DelayedRetryHandler::ROUTING_KEY_RECOVERS, 'payload'); + + $ecotone->run(DelayedRetryHandler::ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + $this->assertSame(1, $ecotone->sendQueryWithRouting('retryHandler.attemptsRecovers')); + $this->assertFalse($ecotone->sendQueryWithRouting('retryHandler.finallyHandled')); + + $ecotone->run(DelayedRetryHandler::ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + $this->assertSame(2, $ecotone->sendQueryWithRouting('retryHandler.attemptsRecovers')); + $this->assertTrue($ecotone->sendQueryWithRouting('retryHandler.finallyHandled')); + } + + public function test_retry_policy_routes_to_dead_letter_when_retries_exhausted(): void + { + $handler = new DelayedRetryHandler(); + $ecotone = EcotoneLite::bootstrapFlowTesting( + [DelayedRetryHandler::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::DEAD_LETTER_CHANNEL), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(DelayedRetryHandler::ROUTING_KEY_DEAD_LETTER, 'payload'); + + for ($i = 0; $i < 3; $i++) { + $ecotone->run(DelayedRetryHandler::ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + } + + $this->assertSame(3, $ecotone->sendQueryWithRouting('retryHandler.attemptsDeadLetter'), 'Handler invoked maxAttempts+1 times before exhaustion'); + + /** @var PollableChannel $deadLetter */ + $deadLetter = $ecotone->getMessageChannel(DelayedRetryHandler::DEAD_LETTER_CHANNEL); + $this->assertNotNull($deadLetter->receive(), 'Failed message must land in the dead letter channel after retries are exhausted'); + $this->assertNull($deadLetter->receive(), 'Only one failed message expected in the dead letter channel'); + } + + public function test_retry_policy_overrides_global_default_error_channel(): void + { + $handler = new DelayedRetryHandler(); + $ecotone = EcotoneLite::bootstrapFlowTesting( + [DelayedRetryHandler::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withDefaultErrorChannel('globalDefaultErrorChannel') + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::ASYNC_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetryHandler::DEAD_LETTER_CHANNEL), + SimpleMessageChannelBuilder::createQueueChannel('globalDefaultErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->sendCommandWithRoutingKey(DelayedRetryHandler::ROUTING_KEY_OVERRIDE, 'payload'); + + for ($i = 0; $i < 2; $i++) { + $ecotone->run(DelayedRetryHandler::ASYNC_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + } + + $this->assertSame(2, $ecotone->sendQueryWithRouting('retryHandler.attemptsOverride')); + + /** @var PollableChannel $deadLetter */ + $deadLetter = $ecotone->getMessageChannel(DelayedRetryHandler::DEAD_LETTER_CHANNEL); + /** @var PollableChannel $globalDefault */ + $globalDefault = $ecotone->getMessageChannel('globalDefaultErrorChannel'); + + $this->assertNotNull($deadLetter->receive(), '#[DelayedRetry] must route the failure to its own dead letter channel'); + $this->assertNull($globalDefault->receive(), 'Global default error channel must not receive the failure when handler declares #[DelayedRetry]'); + } + + public function test_async_handler_with_error_channel_directly_on_method_throws_descriptive_error(): void + { + $service = new class () { + #[\Ecotone\Messaging\Attribute\Asynchronous('asyncMisplacedErrorChannel')] + #[\Ecotone\Messaging\Attribute\ErrorChannel('someErrorChannel')] + #[\Ecotone\Modelling\Attribute\CommandHandler('misplaced.errorchannel', 'misplacedErrorChannelHandler')] + public function handle(string $payload): void + { + } + }; + + $this->expectException(\Ecotone\Messaging\Config\ConfigurationException::class); + $this->expectExceptionMessage('#[ErrorChannel]'); + $this->expectExceptionMessage('asynchronousExecution'); + + EcotoneLite::bootstrapFlowTesting( + [$service::class], + [$service], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel('asyncMisplacedErrorChannel'), + SimpleMessageChannelBuilder::createQueueChannel('someErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } + + public function test_async_handler_with_delayed_retry_directly_on_method_throws_descriptive_error(): void + { + $service = new class () { + #[\Ecotone\Messaging\Attribute\Asynchronous('asyncMisplacedDelayedRetry')] + #[\Ecotone\Messaging\Attribute\DelayedRetry(initialDelayMs: 1, maxAttempts: 2)] + #[\Ecotone\Modelling\Attribute\CommandHandler('misplaced.delayedretry', 'misplacedDelayedRetryHandler')] + public function handle(string $payload): void + { + } + }; + + $this->expectException(\Ecotone\Messaging\Config\ConfigurationException::class); + $this->expectExceptionMessage('#[DelayedRetry]'); + $this->expectExceptionMessage('asynchronousExecution'); + + EcotoneLite::bootstrapFlowTesting( + [$service::class], + [$service], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel('asyncMisplacedDelayedRetry'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } + + public function test_delayed_retry_on_inbound_channel_adapter_throws_descriptive_error(): void + { + $service = new class () { + #[\Ecotone\Messaging\Attribute\DelayedRetry(initialDelayMs: 1, maxAttempts: 2)] + #[\Ecotone\Messaging\Attribute\Scheduled('inboundDelayedRetryChannel', 'inboundDelayedRetry')] + #[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)] + public function emit(): string + { + return 'payload'; + } + }; + + $this->expectException(\Ecotone\Messaging\Config\ConfigurationException::class); + $this->expectExceptionMessage('#[DelayedRetry] cannot be used on an Inbound Channel Adapter'); + $this->expectExceptionMessage('#[ErrorChannel]'); + $this->expectExceptionMessage('#[InstantRetry]'); + + EcotoneLite::bootstrapFlowTesting( + [$service::class], + [$service], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } + + public function test_inbound_channel_adapter_with_instant_retry_recovers_within_in_process_retries(): void + { + $handler = new InboundChannelAdapterWithInstantRetryAndErrorChannel(); + $handler->maxFailures = 2; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [InboundChannelAdapterWithInstantRetryAndErrorChannel::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->run(InboundChannelAdapterWithInstantRetryAndErrorChannel::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + $this->assertSame(3, $handler->invocations, 'Handler must be invoked once + retried twice (retryTimes: 2) before succeeding on the third attempt'); + + /** @var PollableChannel $errorChannel */ + $errorChannel = $ecotone->getMessageChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL); + $this->assertNull($errorChannel->receive(), 'Error channel must remain empty when InstantRetry recovers within retry budget'); + } + + public function test_instant_retry_on_inbound_channel_adapter_requires_enterprise_licence(): void + { + $this->expectException(\Ecotone\Messaging\Support\LicensingException::class); + $this->expectExceptionMessage('Instant retry attribute is available only for Ecotone Enterprise'); + + EcotoneLite::bootstrapFlowTesting( + [InboundChannelAdapterWithInstantRetryAndErrorChannel::class], + [new InboundChannelAdapterWithInstantRetryAndErrorChannel()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL), + ]), + ); + } + + public function test_inbound_channel_adapter_with_instant_retry_forwards_to_error_channel_after_retries_exhausted(): void + { + $handler = new InboundChannelAdapterWithInstantRetryAndErrorChannel(); + $handler->maxFailures = PHP_INT_MAX; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [InboundChannelAdapterWithInstantRetryAndErrorChannel::class], + [$handler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->run(InboundChannelAdapterWithInstantRetryAndErrorChannel::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 1, + failAtError: false, + )); + + $this->assertSame(3, $handler->invocations, 'Handler must be invoked once + retried twice before forwarding to Error Channel'); + + /** @var PollableChannel $errorChannel */ + $errorChannel = $ecotone->getMessageChannel(InboundChannelAdapterWithInstantRetryAndErrorChannel::ERROR_CHANNEL); + $this->assertNotNull($errorChannel->receive(), 'Failed message must land in the configured Error Channel after InstantRetry retries are exhausted'); + } +} + +/** + * licence Apache-2.0 + * + * @internal + */ +final class FailingScheduledExample +{ + public const ENDPOINT_ID = 'failing.scheduler'; + public const REQUEST_CHANNEL = 'failing.scheduler.input'; + + #[\Ecotone\Messaging\Attribute\Scheduled(self::REQUEST_CHANNEL, self::ENDPOINT_ID)] + #[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)] + public function poll(): string + { + return 'payload'; + } + + #[\Ecotone\Messaging\Attribute\ServiceActivator(self::REQUEST_CHANNEL)] + public function handle(string $payload): void + { + throw new \InvalidArgumentException('boom'); + } +} + +/** + * licence Enterprise + * + * Two async command handlers share the same async transport channel, + * but each declares its own #[ErrorChannel] via #[Asynchronous] asynchronousExecution. + * + * @internal + */ +final class AsyncFailingHandler +{ + public const SHARED_ASYNC_CHANNEL = 'sharedAsync'; + public const ROUTING_KEY_A = 'async.handler.a'; + public const ROUTING_KEY_B = 'async.handler.b'; + public const ERROR_CHANNEL_A = 'errorChannelA'; + public const ERROR_CHANNEL_B = 'errorChannelB'; + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::SHARED_ASYNC_CHANNEL, asynchronousExecution: [new \Ecotone\Messaging\Attribute\ErrorChannel(self::ERROR_CHANNEL_A)])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_A, 'asyncHandlerA')] + public function handleA(string $payload): void + { + throw new \RuntimeException('handler-a-failure'); + } + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::SHARED_ASYNC_CHANNEL, asynchronousExecution: [new \Ecotone\Messaging\Attribute\ErrorChannel(self::ERROR_CHANNEL_B)])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_B, 'asyncHandlerB')] + public function handleB(string $payload): void + { + throw new \RuntimeException('handler-b-failure'); + } +} + +/** + * licence Enterprise + * + * @internal + */ +final class DelayedRetryHandler +{ + public const ASYNC_CHANNEL = 'delayedRetryAsync'; + public const ROUTING_KEY_RECOVERS = 'retry.recovers'; + public const ROUTING_KEY_DEAD_LETTER = 'retry.deadletter'; + public const ROUTING_KEY_OVERRIDE = 'retry.override'; + public const DEAD_LETTER_CHANNEL = 'retryDeadLetterChannel'; + + public int $attemptsRecovers = 0; + public int $attemptsDeadLetter = 0; + public int $attemptsOverride = 0; + public bool $finallyHandled = false; + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ + new \Ecotone\Messaging\Attribute\DelayedRetry(initialDelayMs: 1, multiplier: 1, maxAttempts: 3), + ])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_RECOVERS, 'retryRecovers')] + public function recovers(string $payload): void + { + $this->attemptsRecovers++; + if ($this->attemptsRecovers < 2) { + throw new \RuntimeException('transient'); + } + $this->finallyHandled = true; + } + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ + new \Ecotone\Messaging\Attribute\DelayedRetry( + initialDelayMs: 1, + multiplier: 1, + maxAttempts: 2, + deadLetterChannel: self::DEAD_LETTER_CHANNEL, + ), + ])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_DEAD_LETTER, 'retryDeadLetter')] + public function alwaysFails(string $payload): void + { + $this->attemptsDeadLetter++; + throw new \RuntimeException('permanent'); + } + + #[\Ecotone\Messaging\Attribute\Asynchronous(self::ASYNC_CHANNEL, asynchronousExecution: [ + new \Ecotone\Messaging\Attribute\DelayedRetry( + initialDelayMs: 1, + multiplier: 1, + maxAttempts: 1, + deadLetterChannel: self::DEAD_LETTER_CHANNEL, + ), + ])] + #[\Ecotone\Modelling\Attribute\CommandHandler(self::ROUTING_KEY_OVERRIDE, 'retryOverride')] + public function alwaysFailsOverridingDefault(string $payload): void + { + $this->attemptsOverride++; + throw new \RuntimeException('permanent'); + } + + #[\Ecotone\Modelling\Attribute\QueryHandler('retryHandler.attemptsRecovers')] + public function getAttemptsRecovers(): int + { + return $this->attemptsRecovers; + } + + #[\Ecotone\Modelling\Attribute\QueryHandler('retryHandler.attemptsDeadLetter')] + public function getAttemptsDeadLetter(): int + { + return $this->attemptsDeadLetter; + } + + #[\Ecotone\Modelling\Attribute\QueryHandler('retryHandler.attemptsOverride')] + public function getAttemptsOverride(): int + { + return $this->attemptsOverride; + } + + #[\Ecotone\Modelling\Attribute\QueryHandler('retryHandler.finallyHandled')] + public function isFinallyHandled(): bool + { + return $this->finallyHandled; + } +} + +/** + * licence Enterprise + * + * #[InstantRetry] retries the handler in-process before forwarding to #[ErrorChannel]. + * + * @internal + */ +final class InboundChannelAdapterWithInstantRetryAndErrorChannel +{ + public const ENDPOINT_ID = 'inboundInstantRetry'; + public const REQUEST_CHANNEL = 'inboundInstantRetryChannel'; + public const ERROR_CHANNEL = 'inboundInstantRetryErrorChannel'; + + public int $invocations = 0; + public int $maxFailures = 0; + public bool $hasEmitted = false; + + #[\Ecotone\Modelling\Attribute\InstantRetry(retryTimes: 2)] + #[\Ecotone\Messaging\Attribute\ErrorChannel(self::ERROR_CHANNEL)] + #[\Ecotone\Messaging\Attribute\Scheduled(self::REQUEST_CHANNEL, self::ENDPOINT_ID)] + #[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)] + public function emit(): ?string + { + if ($this->hasEmitted) { + return null; + } + $this->hasEmitted = true; + + return 'payload'; + } + + #[\Ecotone\Messaging\Attribute\ServiceActivator(self::REQUEST_CHANNEL)] + public function handle(string $payload): void + { + $this->invocations++; + if ($this->invocations <= $this->maxFailures) { + throw new \RuntimeException('simulated'); + } + } } diff --git a/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php b/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php index 772982196..ed4f62812 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Handler/Gateway/ErrorChannelCommandBusTest.php @@ -5,18 +5,24 @@ namespace Messaging\Unit\Handler\Gateway; use Ecotone\Lite\EcotoneLite; +use Ecotone\Messaging\Attribute\DelayedRetry; use Ecotone\Messaging\Attribute\ErrorChannel; use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder; use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\MessagingGatewayModule; +use Ecotone\Messaging\Config\ModulePackageList; +use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Conversion\MediaType; +use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Messaging\Handler\Recoverability\ErrorContext; use Ecotone\Messaging\MessageHeaders; +use Ecotone\Messaging\PollableChannel; use Ecotone\Messaging\Support\LicensingException; use Ecotone\Test\LicenceTesting; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; use Ramsey\Uuid\Uuid; use Ramsey\Uuid\UuidInterface; +use RuntimeException; use Test\Ecotone\Messaging\Fixture\Service\Gateway\ErrorChannelCommandBus; use Test\Ecotone\Messaging\Fixture\Service\Gateway\ErrorChannelWithAsyncChannel; use Test\Ecotone\Messaging\Fixture\Service\Gateway\TicketService; @@ -115,4 +121,123 @@ public function test_using_custom_error_channel_with_reply_channel(): void $message = $ecotoneLite->getMessageChannel('async')->receive(); $this->assertNotNull($message); } + + public function test_error_channel_on_command_handler_is_silently_ignored_must_be_placed_on_gateway(): void + { + // Wrong placement: #[ErrorChannel] on the handler method has no effect. + // Must be on the messaging entry-point (CommandBus/EventBus/BusinessMethod). + $service = new class () { + public bool $sideEffectExecuted = false; + + #[\Ecotone\Modelling\Attribute\CommandHandler('handler.level.error.channel.test')] + #[\Ecotone\Messaging\Attribute\ErrorChannel('handlerLevelErrorChannel')] + public function handle(mixed $payload): void + { + $this->sideEffectExecuted = true; + throw new RuntimeException('handler-failure'); + } + }; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$service::class], + [$service], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel('handlerLevelErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $thrown = null; + try { + $ecotoneLite->sendCommandWithRoutingKey('handler.level.error.channel.test', 'payload'); + } catch (RuntimeException $exception) { + $thrown = $exception; + } + + $this->assertNotNull( + $thrown, + 'Exception must propagate to the bus caller — #[ErrorChannel] on a #[CommandHandler] is not wired by any resolver.' + ); + $this->assertSame('handler-failure', $thrown->getMessage()); + $this->assertTrue( + $service->sideEffectExecuted, + 'Handler ran and produced side effects with no rollback boundary in place.' + ); + $this->assertNull( + $ecotoneLite->getMessageChannel('handlerLevelErrorChannel')->receive(), + '#[ErrorChannel] on the handler is silently ignored — no message routed. ' + . 'Place it on the gateway entry-point (CommandBus/EventBus/BusinessMethod) so the framework can capture failures ' + . 'after gateway-level interceptors (e.g. transactional rollback) have fully unwound.' + ); + } + + public function test_delayed_retry_on_command_bus_throws_in_non_enterprise_mode(): void + { + $this->expectException(LicensingException::class); + $this->expectExceptionMessage('#[DelayedRetry]'); + + EcotoneLite::bootstrapFlowTesting( + [TicketService::class, DelayedRetryCommandBus::class], + [new TicketService()], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + SimpleMessageChannelBuilder::createQueueChannel(DelayedRetry::generateGatewayChannelName(DelayedRetryCommandBus::class)), + SimpleMessageChannelBuilder::createQueueChannel('gatewayRetryDeadLetter'), + ], + ); + } + + public function test_delayed_retry_on_command_bus_routes_failures_to_generated_channel(): void + { + $generatedRetryChannel = DelayedRetry::generateGatewayChannelName(DelayedRetryCommandBus::class); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [TicketService::class, DelayedRetryCommandBus::class], + [new TicketService()], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + SimpleMessageChannelBuilder::createQueueChannel($generatedRetryChannel), + SimpleMessageChannelBuilder::createQueueChannel('gatewayRetryDeadLetter'), + ], + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $commandBus = $ecotoneLite->getGateway(DelayedRetryCommandBus::class); + $payload = Uuid::uuid4(); + $commandBus->sendWithRouting( + 'createViaCommand', + $payload, + metadata: ['throwException' => true], + ); + + $this->assertEquals( + [], + $ecotoneLite->sendQueryWithRouting('getTickets'), + 'Handler must throw, leaving no ticket created' + ); + + /** @var PollableChannel $retryChannel */ + $retryChannel = $ecotoneLite->getMessageChannel($generatedRetryChannel); + $this->assertNotNull( + $retryChannel->receive(), + "#[DelayedRetry] on a CommandBus gateway must route failures to the auto-generated channel `{$generatedRetryChannel}`" + ); + } +} + +/** + * Custom Command Bus declaring a per-gateway #[DelayedRetry] policy. + * + * @internal + */ +#[DelayedRetry( + initialDelayMs: 1, + multiplier: 1, + maxAttempts: 1, + deadLetterChannel: 'gatewayRetryDeadLetter', +)] +interface DelayedRetryCommandBus extends \Ecotone\Modelling\CommandBus +{ } diff --git a/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php b/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php index c432f1cc7..faf12e483 100644 --- a/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php +++ b/packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php @@ -9,6 +9,7 @@ use Ecotone\Messaging\Endpoint\PollingMetadata; use Ecotone\Messaging\Handler\Logger\LoggingGateway; use Ecotone\Messaging\Message; +use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\MessagePoller; use Ecotone\Messaging\MessagingException; @@ -66,7 +67,9 @@ public function receiveWithTimeout(PollingMetadata $pollingMetadata): ?Message $message, $this->conversionService, $this->batchCommitCoordinator - )->build(); + ) + ->setHeader(MessageHeaders::INBOUND_REQUEST_CHANNEL, $channelName) + ->build(); } if (in_array($message->err, [RD_KAFKA_MSG_PARTITIONER_RANDOM, RD_KAFKA_MSG_PARTITIONER_CONSISTENT, RD_KAFKA_MSG_PARTITIONER_CONSISTENT_RANDOM, RD_KAFKA_MSG_PARTITIONER_MURMUR2, RD_KAFKA_MSG_PARTITIONER_MURMUR2_RANDOM])) { diff --git a/packages/Kafka/tests/Fixture/KafkaConsumer/KafkaConsumerFailingExample.php b/packages/Kafka/tests/Fixture/KafkaConsumer/KafkaConsumerFailingExample.php new file mode 100644 index 000000000..fd6698760 --- /dev/null +++ b/packages/Kafka/tests/Fixture/KafkaConsumer/KafkaConsumerFailingExample.php @@ -0,0 +1,21 @@ +assertNotNull($ecotoneLite->getMessageChannel('customErrorChannel')->receive()); } + public function test_default_custom_error_channel_on_consumer(): void + { + $topicName = Uuid::v7()->toRfc4122(); + $publisherReferenceName = 'kafka_publisher'; + $consumerReferenceName = 'kafka_consumer_attribute'; + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [KafkaConsumerFailingExample::class], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + new KafkaConsumerFailingExample(), + ], + ServiceConfiguration::createWithDefaults() + ->withDefaultErrorChannel('customErrorChannel') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE])) + ->withExtensionObjects([ + KafkaPublisherConfiguration::createWithDefaults($topicName, $publisherReferenceName), + TopicConfiguration::createWithReferenceName('testTopicFailure', $topicName), + KafkaConsumerConfiguration::createWithDefaults($consumerReferenceName), + SimpleMessageChannelBuilder::createQueueChannel('customErrorChannel'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + /** @var KafkaAdmin $kafkaAdmin */ + $kafkaAdmin = $ecotoneLite->getServiceFromContainer(KafkaAdmin::class); + $kafkaAdmin->getTopicForProducer($publisherReferenceName . '.handler') + ->produce(RD_KAFKA_PARTITION_UA, 0, Uuid::v7()->toRfc4122()); + $kafkaAdmin->getProducer($publisherReferenceName. '.handler')->flush(8000); + + $ecotoneLite->run($consumerReferenceName, ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000 + )->withStopOnError(false)); + + $this->assertNotNull($ecotoneLite->getMessageChannel('customErrorChannel')->receive()); + } + + public function test_default_error_channel_on_consumer_with_dead_letter_and_replay(): void + { + $topicName = 'test_topic_dead_letter_' . Uuid::v7()->toRfc4122(); + $consumerReferenceName = 'kafka_consumer_attribute'; + $failureCount = 0; + + $consumer = new class () { + public int $failureCount = 0; + private array $processedMessages = []; + + #[KafkaConsumer('kafka_consumer_attribute', 'testTopicDeadLetter')] + public function handle(#[\Ecotone\Messaging\Attribute\Parameter\Payload] string $payload): void + { + if ($this->failureCount < 1) { + $this->failureCount++; + throw new \RuntimeException('Simulated failure'); + } + $this->processedMessages[] = $payload; + } + + #[QueryHandler('consumer.getProcessedMessages')] + public function getProcessedMessages(): array + { + return $this->processedMessages; + } + }; + + $dbalConnectionFactory = new DbalConnectionFactory(getenv('DATABASE_DSN') ?: 'pgsql://ecotone:secret@database:5432/ecotone'); + $this->cleanDeadLetterTable($dbalConnectionFactory); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$consumer::class], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + DbalConnectionFactory::class => $dbalConnectionFactory, + 'managerRegistry' => $dbalConnectionFactory, + $consumer, + ], + ServiceConfiguration::createWithDefaults() + ->withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL) + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE, ModulePackageList::DBAL_PACKAGE])) + ->withExtensionObjects([ + KafkaPublisherConfiguration::createWithDefaults($topicName) + ->withHeaderMapper('*'), + TopicConfiguration::createWithReferenceName('testTopicDeadLetter', $topicName), + KafkaConsumerConfiguration::createWithDefaults($consumerReferenceName), + DbalConfiguration::createWithDefaults() + ->withAutomaticTableInitialization(true), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + pathToRootCatalog: __DIR__ . '/../../', + ); + + $payload = Uuid::v7()->toRfc4122(); + $messagePublisher = $ecotoneLite->getGateway(MessagePublisher::class); + $messagePublisher->send($payload); + + $ecotoneLite->run($consumerReferenceName, ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000, + failAtError: false, + )); + + /** @var DeadLetterGateway $deadLetter */ + $deadLetter = $ecotoneLite->getGateway(DeadLetterGateway::class); + $this->assertEquals(1, $deadLetter->count()); + + $deadLetter->replyAll(); + $this->assertEquals(0, $deadLetter->count()); + + $ecotoneLite->run($consumerReferenceName, ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000, + failAtError: false, + )); + + $processedMessages = $ecotoneLite->sendQueryWithRouting('consumer.getProcessedMessages'); + $this->assertCount(1, $processedMessages); + $this->assertEquals($payload, $processedMessages[0]); + } + + public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler(): void + { + $topicName = 'test_topic_replayable_' . Uuid::v7()->toRfc4122(); + $handler = new class () { + public const ENDPOINT_ID = 'replayable_kafka_consumer'; + public const TOPIC_REFERENCE = 'replayableKafkaTopic'; + + public bool $shouldFail = true; + public int $invocations = 0; + /** @var string[] */ + public array $processedPayloads = []; + + #[KafkaConsumer(self::ENDPOINT_ID, self::TOPIC_REFERENCE)] + public function handle(#[\Ecotone\Messaging\Attribute\Parameter\Payload] string $payload): void + { + $this->invocations++; + if ($this->shouldFail) { + throw new \RuntimeException('simulated'); + } + $this->processedPayloads[] = $payload; + } + }; + + $dbalConnectionFactory = new DbalConnectionFactory(getenv('DATABASE_DSN') ?: 'pgsql://ecotone:secret@database:5432/ecotone'); + $this->cleanDeadLetterTable($dbalConnectionFactory); + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [$handler::class], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + DbalConnectionFactory::class => $dbalConnectionFactory, + 'managerRegistry' => $dbalConnectionFactory, + $handler, + ], + ServiceConfiguration::createWithDefaults() + ->withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL) + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE, ModulePackageList::DBAL_PACKAGE])) + ->withExtensionObjects([ + KafkaPublisherConfiguration::createWithDefaults($topicName) + ->withHeaderMapper('*'), + TopicConfiguration::createWithReferenceName('replayableKafkaTopic', $topicName), + KafkaConsumerConfiguration::createWithDefaults('replayable_kafka_consumer'), + DbalConfiguration::createWithDefaults() + ->withAutomaticTableInitialization(true), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + pathToRootCatalog: __DIR__ . '/../../', + ); + + $payload = Uuid::v7()->toRfc4122(); + $messagePublisher = $ecotoneLite->getGateway(MessagePublisher::class); + $messagePublisher->send($payload); + + $ecotoneLite->run('replayable_kafka_consumer', ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000, + failAtError: false, + )); + + /** @var DeadLetterGateway $deadLetter */ + $deadLetter = $ecotoneLite->getGateway(DeadLetterGateway::class); + $this->assertEquals(1, $deadLetter->count(), 'Failed Kafka Message must land in DBAL Dead Letter'); + $this->assertSame(1, $handler->invocations); + $this->assertSame([], $handler->processedPayloads); + + $handler->shouldFail = false; + $deadLetter->replyAll(); + + $this->assertEquals(0, $deadLetter->count()); + $this->assertSame(2, $handler->invocations, 'replyAll() must synchronously re-invoke the handler via MessagingEntrypoint — no second run() needed'); + $this->assertSame([$payload], $handler->processedPayloads, 'Replayed Message must carry the original payload back to the handler'); + + $ecotoneLite->run('replayable_kafka_consumer', ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000, + failAtError: false, + )); + + $this->assertSame(2, $handler->invocations, 'Replayed Message must not be re-consumed from the Kafka topic (committed)'); + $this->assertSame([$payload], $handler->processedPayloads, 'Processed payloads must remain unchanged after a second poll'); + } + public function test_convert_and_send(): void { $topicName = Uuid::v7()->toRfc4122(); @@ -499,4 +701,13 @@ public function test_kafka_publisher_works_without_explicit_configuration(): voi $this->assertTrue(true); } + + private function cleanDeadLetterTable(DbalConnectionFactory $connectionFactory): void + { + $connection = $connectionFactory->createContext()->getDbalConnection(); + $schemaManager = method_exists($connection, 'getSchemaManager') ? $connection->getSchemaManager() : $connection->createSchemaManager(); + if ($schemaManager->tablesExist(['ecotone_error_messages'])) { + $connection->executeStatement('DELETE FROM ecotone_error_messages'); + } + } } diff --git a/packages/OpenTelemetry/src/TracerInterceptor.php b/packages/OpenTelemetry/src/TracerInterceptor.php index 7d23c3a79..3db50d770 100644 --- a/packages/OpenTelemetry/src/TracerInterceptor.php +++ b/packages/OpenTelemetry/src/TracerInterceptor.php @@ -43,7 +43,9 @@ public function traceAsynchronousEndpoint(MethodInvocation $methodInvocation, Me $trace = $this->trace( $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) ? 'Receiving from channel: ' . $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) - : 'Endpoint: ' . $message->getHeaders()->get(MessageHeaders::CONSUMER_POLLING_METADATA)->getEndpointId() . ' produced Message', + : ($message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL) + ? 'Receiving from inbound channel adapter: ' . $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL) + : 'Endpoint: ' . $message->getHeaders()->get(MessageHeaders::CONSUMER_POLLING_METADATA)->getEndpointId() . ' produced Message'), $methodInvocation, $message, spanKind: SpanKind::KIND_CONSUMER, diff --git a/packages/OpenTelemetry/tests/Integration/TracingTreeTest.php b/packages/OpenTelemetry/tests/Integration/TracingTreeTest.php index 6d27dae02..02762a273 100644 --- a/packages/OpenTelemetry/tests/Integration/TracingTreeTest.php +++ b/packages/OpenTelemetry/tests/Integration/TracingTreeTest.php @@ -119,11 +119,11 @@ public function test_tracing_scheduled_handler() self::compareTreesByDetails( [ [ - 'details' => ['name' => 'Endpoint: scheduled_handler produced Message'], + 'details' => ['name' => 'Receiving from inbound channel adapter: nullChannel'], 'children' => [], ], [ - 'details' => ['name' => 'Endpoint: scheduled_handler produced Message'], + 'details' => ['name' => 'Receiving from inbound channel adapter: nullChannel'], 'children' => [], ], ], @@ -146,7 +146,7 @@ public function test_tracing_workflow_scheduled_handler() self::compareTreesByDetails( [ [ - 'details' => ['name' => 'Endpoint: scheduled_handler produced Message'], + 'details' => ['name' => 'Receiving from inbound channel adapter: add'], 'children' => [ [ 'details' => ['name' => 'Message Handler: ' . WorkflowScheduledHandler::class . '::add'], @@ -155,7 +155,7 @@ public function test_tracing_workflow_scheduled_handler() ], ], [ - 'details' => ['name' => 'Endpoint: scheduled_handler produced Message'], + 'details' => ['name' => 'Receiving from inbound channel adapter: add'], 'children' => [ [ 'details' => ['name' => 'Message Handler: ' . WorkflowScheduledHandler::class . '::add'], diff --git a/packages/PdoEventSourcing/tests/Integration/MultiTenantTest.php b/packages/PdoEventSourcing/tests/Integration/MultiTenantTest.php index 8e3b462ac..9ce86dff2 100644 --- a/packages/PdoEventSourcing/tests/Integration/MultiTenantTest.php +++ b/packages/PdoEventSourcing/tests/Integration/MultiTenantTest.php @@ -9,7 +9,9 @@ use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Messaging\Support\InvalidArgumentException; +use Ecotone\Projecting\ProjectionRegistry; use Test\Ecotone\EventSourcing\EventSourcingMessagingTestCase; use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\CloseTicket; use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\RegisterTicket; @@ -61,7 +63,10 @@ public function test_building_asynchronous_event_driven_projection_with_multi_te new RegisterTicket('122', 'Johnny', 'alert'), metadata: ['tenant' => 'tenant_b'] ); - $ecotone->run(InProgressTicketList::PROJECTION_CHANNEL); + $ecotone->run(InProgressTicketList::PROJECTION_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 2, + maxExecutionTimeInMilliseconds: 5000, + )); self::assertEquals( [['ticket_id' => '123', 'ticket_type' => 'alert']], @@ -81,7 +86,10 @@ public function test_building_asynchronous_event_driven_projection_with_multi_te metadata: ['tenant' => 'tenant_a'] ); - $ecotone->run(InProgressTicketList::PROJECTION_CHANNEL); + $ecotone->run(InProgressTicketList::PROJECTION_CHANNEL, ExecutionPollingMetadata::createWithTestingSetup( + amountOfMessagesToHandle: 2, + maxExecutionTimeInMilliseconds: 5000, + )); self::assertEquals( [],