Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public function show(DeadLetterGateway $deadLetterGateway, string $messageId, bo
[
['Message Id', $message->getHeaders()->getMessageId()],
['Failed At', $this->convertTimestampToReadableFormat($message->getHeaders()->getTimestamp())],
['Channel Name', $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME)],
['Channel Name', $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME)
? $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME)
: ($message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL)
? $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL)
: 'Unknown')],
['Type', $message->getHeaders()->containsKey(MessageHeaders::TYPE_ID) ? $message->getHeaders()->get(MessageHeaders::TYPE_ID) : 'Unknown'],
['Stacktrace', $this->getReadableStacktrace($message->getHeaders()->get(ErrorContext::EXCEPTION_STACKTRACE), $fullDetails)],
]
Expand Down
12 changes: 9 additions & 3 deletions packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,18 @@ private function initialize(): void
private function replyWithoutInitialization(string $messageId, MessagingEntrypointService $messagingEntrypoint): void
{
$message = $this->show($messageId);
if (! $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) && ! $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP)) {
throw InvalidArgumentException::create("Can not reply to message {$messageId}, as it does not contain either `polledChannelName` or `routingSlip` header. Please add one of them, so Message can be routed back to the original channel.");
$hasPolledChannel = $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME);
$hasInboundRequestChannel = $message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL);
$hasRoutingSlip = $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP);

if (! $hasPolledChannel && ! $hasInboundRequestChannel && ! $hasRoutingSlip) {
throw InvalidArgumentException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::cannotReplyToDeadLetterMessage($messageId));
}

if ($message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME)) {
if ($hasPolledChannel) {
$entrypoint = $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME);
} elseif ($hasInboundRequestChannel) {
$entrypoint = $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL);
} else {
// This allows to replay Error Message stored for synchronous calls (non asynchronous)
$routingSlip = $message->getHeaders()->resolveRoutingSlip();
Expand Down
76 changes: 76 additions & 0 deletions packages/Dbal/tests/Integration/DeadLetterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Ecotone\Lite\Test\FlowTestSupport;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use Ecotone\Messaging\Endpoint\PollingMetadata;
use Ecotone\Messaging\Handler\Recoverability\ErrorContext;
use Ecotone\Messaging\MessageHeaders;
Expand Down Expand Up @@ -203,6 +204,81 @@ public function test_same_event_is_stored_in_dead_letter_twice_for_different_end
$this->assertErrorMessageCount($ecotone, 0);
}

public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler(): void
{
$handler = new class () {
public const ENDPOINT_ID = 'failingInboundAdapter';
public const REQUEST_CHANNEL = 'failingInboundAdapterRequestChannel';

public bool $shouldFail = true;
public int $invocations = 0;
/** @var string[] */
public array $processedPayloads = [];
private bool $hasEmitted = false;

#[\Ecotone\Messaging\Attribute\Scheduled(self::REQUEST_CHANNEL, self::ENDPOINT_ID)]
#[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)]
public function emit(): ?string
{
if ($this->hasEmitted) {
return null;
}
$this->hasEmitted = true;

return 'first-payload';
}

#[\Ecotone\Messaging\Attribute\ServiceActivator(self::REQUEST_CHANNEL)]
public function handle(string $payload): void
{
$this->invocations++;
if ($this->shouldFail) {
throw new \RuntimeException('simulated');
}
$this->processedPayloads[] = $payload;
}
};
$connectionFactory = $this->getConnectionFactory();

$ecotone = EcotoneLite::bootstrapFlowTesting(
containerOrAvailableServices: [
$handler,
DbalConnectionFactory::class => $connectionFactory,
'managerRegistry' => $connectionFactory,
],
configuration: ServiceConfiguration::createWithDefaults()
->withEnvironment('prod')
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL),
classesToResolve: [$handler::class],
pathToRootCatalog: __DIR__ . '/../../',
);

$ecotone->run('failingInboundAdapter', ExecutionPollingMetadata::createWithTestingSetup(
amountOfMessagesToHandle: 1,
failAtError: false,
));

$this->assertErrorMessageCount($ecotone, 1);
$this->assertSame(1, $handler->invocations, 'Handler must have been invoked once before the failure was captured');
$this->assertSame([], $handler->processedPayloads);

$handler->shouldFail = false;
$this->replyAllErrorMessages($ecotone);

$this->assertErrorMessageCount($ecotone, 0);
$this->assertSame(2, $handler->invocations, 'replyAll() must synchronously re-invoke the handler via MessagingEntrypoint — no second run() needed');
$this->assertSame(['first-payload'], $handler->processedPayloads, 'Replayed Message must carry the original payload back to the handler');

$ecotone->run('failingInboundAdapter', ExecutionPollingMetadata::createWithTestingSetup(
amountOfMessagesToHandle: 1,
failAtError: false,
));

$this->assertSame(2, $handler->invocations, 'Subsequent polls must not re-process the replayed Message (emit() returns null after the first emission)');
$this->assertSame(['first-payload'], $handler->processedPayloads);
}

private function assertErrorMessageCount(FlowTestSupport $ecotone, int $amount, string $deadLetterReference = DeadLetterGateway::class): void
{
$gateway = $ecotone->getGateway(DeadLetterGateway::class);
Expand Down
14 changes: 7 additions & 7 deletions packages/Ecotone/src/Messaging/Attribute/Asynchronous.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ class Asynchronous
{
private string|array $channelName;
/** @var AsynchronousEndpointAttribute[] */
private array $endpointAnnotations;
private array $asynchronousExecution;

/**
* @param AsynchronousEndpointAttribute[] $endpointAnnotations
* @param AsynchronousEndpointAttribute[] $asynchronousExecution Attributes scoped to the asynchronous execution context — applied when the polling consumer processes the Message, not at the synchronous bus call.
*/
public function __construct(string|array $channelName, array $endpointAnnotations = [])
public function __construct(string|array $channelName, array $asynchronousExecution = [])
{
Assert::notNullAndEmpty($channelName, 'Channel name can not be empty string');
Assert::allInstanceOfType($endpointAnnotations, AsynchronousEndpointAttribute::class);
Assert::allInstanceOfType($asynchronousExecution, AsynchronousEndpointAttribute::class);
$this->channelName = $channelName;
$this->endpointAnnotations = $endpointAnnotations;
$this->asynchronousExecution = $asynchronousExecution;
}

public function getChannelName(): array
Expand All @@ -36,8 +36,8 @@ public function getChannelName(): array
/**
* @return AsynchronousEndpointAttribute[]
*/
public function getEndpointAnnotations(): array
public function getAsynchronousExecution(): array
{
return $this->endpointAnnotations;
return $this->asynchronousExecution;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
/**
* licence Apache-2.0
*/
abstract class ChannelAdapter extends IdentifiedAnnotation
abstract class ChannelAdapter extends MessageConsumer
{
}
38 changes: 38 additions & 0 deletions packages/Ecotone/src/Messaging/Attribute/DelayedRetry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Attribute;

use Attribute;
use Ecotone\Messaging\Support\Assert;

/**
* licence Enterprise
*/
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
final class DelayedRetry implements AsynchronousEndpointAttribute
{
public function __construct(
public readonly int $initialDelayMs,
public readonly int $multiplier = 1,
public readonly ?int $maxDelayMs = null,
public readonly ?int $maxAttempts = 3,
public readonly ?string $deadLetterChannel = null,
) {
Assert::isTrue($initialDelayMs > 0, 'DelayedRetry initialDelayMs must be greater than 0');
Assert::isTrue($multiplier > 0, 'DelayedRetry multiplier must be greater than 0');
Assert::isTrue($maxAttempts === null || $maxAttempts > 0, 'DelayedRetry maxAttempts must be null (unlimited) or greater than 0');
Assert::isTrue($deadLetterChannel === null || $deadLetterChannel !== '', 'DelayedRetry deadLetterChannel must be null or a non-empty channel name');
}

public static function generateChannelName(string $handlerEndpointId): string
{
return 'ecotone.retry.' . $handlerEndpointId;
}

public static function generateGatewayChannelName(string $gatewayInterfaceFqn): string
{
return 'ecotone.retry.gateway.' . str_replace('\\', '.', $gatewayInterfaceFqn);
}
}
2 changes: 1 addition & 1 deletion packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* licence Enterprise
*/
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
class ErrorChannel
class ErrorChannel implements AsynchronousEndpointAttribute
{
/**
* @param string $errorChannelName Name of the error channel to send Message too
Expand Down
17 changes: 5 additions & 12 deletions packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,12 @@

#[Attribute(Attribute::TARGET_METHOD)]
/**
* Base attribute for any Inbound Channel Adapter consuming from an external system
* (Kafka, AMQP, scheduled tasks, etc.). Subclasses include #[KafkaConsumer], #[RabbitConsumer],
* and #[ChannelAdapter] (the base for #[Scheduled]).
*
* licence Apache-2.0
*/
class MessageConsumer
class MessageConsumer extends IdentifiedAnnotation
{
private string $endpointId;

public function __construct(string $endpointId)
{
$this->endpointId = $endpointId;
}

public function getEndpointId(): string
{
return $this->endpointId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Config\Annotation\ModuleConfiguration;

/**
* licence Apache-2.0
*
* @internal
*/
final class ErrorChannelExceptionMessages
{
public static function delayedRetryOnInboundChannelAdapter(string $className, string $methodName): string
{
return "#[DelayedRetry] cannot be used on an Inbound Channel Adapter `{$className}::{$methodName}`. "
. 'Inbound Channel Adapters consume from external systems (Kafka, AMQP, scheduled tasks) and have no source Message Channel for the framework to reschedule a delayed retry into. '
. 'Use #[ErrorChannel] to capture the failure for later replay (e.g. from a Dead Letter), and optionally combine it with #[InstantRetry] for in-process retries before forwarding to the Error Channel.';
}

public static function errorChannelDirectlyOnAsyncHandlerMethod(string $endpointId): string
{
return "Asynchronous handler `{$endpointId}` has `#[ErrorChannel]` placed directly on the handler method — this has no effect on async handlers. "
. "Pass it via the #[Asynchronous] attribute instead: `#[Asynchronous('channel', asynchronousExecution: [new ErrorChannel('...')])]` so the polling consumer routes failures correctly.";
}

public static function delayedRetryDirectlyOnAsyncHandlerMethod(string $endpointId): string
{
return "Asynchronous handler `{$endpointId}` has `#[DelayedRetry]` placed directly on the handler method — this has no effect on async handlers. "
. "Pass it via the #[Asynchronous] attribute instead: `#[Asynchronous('channel', asynchronousExecution: [new DelayedRetry(...)])]` so the polling consumer applies the retry policy correctly.";
}

public static function errorChannelAndDelayedRetryMutuallyExclusiveOnHandler(string $endpointId): string
{
return "Handler `{$endpointId}` declares both #[ErrorChannel] and #[DelayedRetry] in #[Asynchronous] asynchronousExecution — these are mutually exclusive. "
. 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.';
}

public static function errorChannelAndDelayedRetryMutuallyExclusiveOnGateway(string $gatewayInterfaceFqn): string
{
return "Gateway `{$gatewayInterfaceFqn}` declares both #[ErrorChannel] and #[DelayedRetry] — these are mutually exclusive. "
. 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.';
}

public static function cannotReplyToDeadLetterMessage(string $messageId): string
{
return "Can not reply to message {$messageId}, as it does not contain `polledChannelName`, `inboundRequestChannel` or `routingSlip` header. "
. 'Please add one of them, so Message can be routed back to the original channel.';
}

public static function delayedRetryRequiresPolledChannelName(string $originalErrorMessage): string
{
return 'Failed to handle Error Message via Retry Configuration, as it does not contain information about origination channel from which it was polled. Original error message: '
. $originalErrorMessage;
}

public static function instantRetryNotOnInboundChannelAdapter(string $className, string $methodName): string
{
return "InstantRetry attribute can only be used on Inbound Channel Adapter methods (annotated with MessageConsumer e.g. #[KafkaConsumer], #[RabbitConsumer], #[Scheduled]). "
. "'{$className}::{$methodName}' has none.";
}

public static function instantRetryRequiresEnterprise(): string
{
return 'Instant retry attribute is available only for Ecotone Enterprise.';
}

public static function asynchronousExecutionRequiresEnterprise(string $endpointId): string
{
return "Endpoint annotations on #[Asynchronous] attribute for endpoint `{$endpointId}` require Ecotone Enterprise licence.";
}

public static function gatewayErrorChannelRequiresEnterprise(string $interfaceFqn, string $methodName): string
{
return "Gateway {$interfaceFqn}::{$methodName} is marked with synchronous Error Channel. This functionality is available as part of Ecotone Enterprise.";
}

public static function gatewayDelayedRetryRequiresEnterprise(string $interfaceFqn, string $methodName): string
{
return "Gateway {$interfaceFqn}::{$methodName} is marked with #[DelayedRetry]. This functionality is available as part of Ecotone Enterprise.";
}
}
Loading
Loading