Skip to content

feat: per-handler error channel and delayed retry for async endpoints (fixes #659)#665

Merged
dgafka merged 11 commits into
mainfrom
dead-letter-with-channel-adapter
May 7, 2026
Merged

feat: per-handler error channel and delayed retry for async endpoints (fixes #659)#665
dgafka merged 11 commits into
mainfrom
dead-letter-with-channel-adapter

Conversation

@dgafka
Copy link
Copy Markdown
Member

@dgafka dgafka commented May 7, 2026

Why is this change proposed?

Originally reported in #659: configuring withDefaultErrorChannel(...) on a #[KafkaConsumer] (or any inbound channel adapter) crashed at runtime with MessageHeaderDoesNotExistsException because the framework assumed every polled message carried POLLED_CHANNEL_NAME, which inbound channel adapters don't set. While fixing that, two related gaps surfaced: there was no way to scope an Error Channel or a delayed-retry policy to a single async handler when multiple handlers shared a transport, and dead-letter replay couldn't route messages back to inbound-adapter handlers because the framework had no header carrying the actual request channel.

This PR fixes the original crash and turns the existing asynchronousExecution: [...] parameter on #[Asynchronous] into a real per-handler error-handling story — #[ErrorChannel] and the new #[DelayedRetry] both work there, with full Dead Letter replay support for inbound channel adapters.

Usage examples

Per-handler Error Channel for handlers sharing one transport

final class OrderProjectors
{
    #[Asynchronous('orders', asynchronousExecution: [new ErrorChannel('paymentsErrors')])]
    #[CommandHandler('order.process_payment', 'paymentsHandler')]
    public function processPayment(ProcessPayment $command): void { /* ... */ }

    #[Asynchronous('orders', asynchronousExecution: [new ErrorChannel('shippingErrors')])]
    #[CommandHandler('order.ship', 'shippingHandler')]
    public function ship(ShipOrder $command): void { /* ... */ }
}

#[DelayedRetry] — retries with backoff and dead letter, per handler

#[Asynchronous('orders', asynchronousExecution: [
    new DelayedRetry(
        initialDelayMs: 1000,
        multiplier: 2,
        maxAttempts: 3,
        deadLetterChannel: 'dbal_dead_letter',
    ),
])]
#[CommandHandler('order.charge', 'chargeHandler')]
public function charge(ChargeOrder $command): void { /* ... */ }

#[DelayedRetry] on a gateway

#[DelayedRetry(initialDelayMs: 1000, multiplier: 2, maxAttempts: 3, deadLetterChannel: 'dbal_dead_letter')]
interface ResilientCommandBus extends CommandBus
{
}

Combining attributes (e.g. skip transactions for a notification handler)

#[Asynchronous('orders', asynchronousExecution: [
    new WithoutDatabaseTransaction(),
    new ErrorChannel('emailDeliveryErrors'),
])]
#[CommandHandler('order.send_confirmation_email', 'emailHandler')]
public function sendConfirmation(SendConfirmation $command): void { /* ... */ }

Inbound Channel Adapter (#[KafkaConsumer]) with #[ErrorChannel]

Failures are forwarded to the configured Error Channel — typically a Dead Letter for later replay:

#[ErrorChannel('dbal_dead_letter')]
#[KafkaConsumer(endpointId: 'orderConsumer', topics: ['orders'])]
public function handle(Order $payload): void { /* ... */ }

Inbound Channel Adapter (#[KafkaConsumer]) with #[InstantRetry] + #[ErrorChannel]

In-process retries before forwarding the failure to the Error Channel:

#[InstantRetry(retryTimes: 2)]
#[ErrorChannel('dbal_dead_letter')]
#[KafkaConsumer(endpointId: 'orderConsumer', topics: ['orders'])]
public function handle(Order $payload): void { /* ... */ }

Inbound Channel Adapter (#[Scheduled]) with #[ErrorChannel]

#[ErrorChannel('schedulerErrors')]
#[Scheduled(requestChannelName: 'pollPayments', endpointId: 'paymentsPoller')]
#[Poller(fixedRateInMilliseconds: 60000)]
public function poll(): PaymentsBatch { /* ... */ }

Inbound Channel Adapter (#[Scheduled]) with #[InstantRetry] + #[ErrorChannel]

#[InstantRetry(retryTimes: 3)]
#[ErrorChannel('schedulerErrors')]
#[Scheduled(requestChannelName: 'pollPayments', endpointId: 'paymentsPoller')]
#[Poller(fixedRateInMilliseconds: 60000)]
public function poll(): PaymentsBatch { /* ... */ }

⚠️ #[DelayedRetry] is not supported on Inbound Channel Adapters. They consume from external systems (Kafka, AMQP, scheduled tasks) and have no source Message Channel to reschedule into.

Use case scenarios

  1. Different SLAs per handler on a shared queue — payment handlers route to a Dead Letter for manual review, while shipping handlers retry with backoff. Both consume from the same orders channel; no need to split queues for different policies.
  2. Kafka consumer with default error channelwithDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL) now works for #[KafkaConsumer] (this was the original crash). Failed messages are stored in the DBAL Dead Letter and DeadLetterGateway::replyAll() correctly replays them back to the same Kafka consumer endpoint.
  3. Resilient gateways — extend CommandBus with #[DelayedRetry(...)] once and reuse it across an application: callers transparently get retries + dead-letter without each handler having to implement try/catch.

Failure flow with #[DelayedRetry]

sequenceDiagram
    participant User
    participant CommandBus
    participant AsyncChannel as Async Channel
    participant Consumer as Polling Consumer
    participant Handler
    participant Retry as DelayedRetryErrorHandler
    participant DLQ as Dead Letter

    User->>CommandBus: send(Command)
    CommandBus->>AsyncChannel: enqueue
    Consumer->>AsyncChannel: poll
    AsyncChannel-->>Consumer: Message
    Consumer->>Handler: invoke
    Handler--xConsumer: throws
    Consumer->>Retry: failed Message
    alt retries remaining
        Retry->>AsyncChannel: reschedule with delay
    else retries exhausted
        Retry->>DLQ: store
        User->>DLQ: replyAll()
        DLQ->>Handler: replay (via INBOUND_REQUEST_CHANNEL or POLLED_CHANNEL_NAME)
    end
Loading

Pull Request Contribution Terms

  • I have read and agree to the contribution terms outlined in CONTRIBUTING.

dgafka added 11 commits May 7, 2026 18:11
…nc handler methods

Both attributes have no effect when placed alongside #[Asynchronous] on a handler method;
they must be passed via #[Asynchronous(asynchronousExecution: [...])] for the polling
consumer to pick them up. Detect the misplacement at compile time and throw a
ConfigurationException pointing the user at the correct form.
…try] on #[Scheduled]

Inbound Channel Adapters (Kafka, AMQP inbound, #[Scheduled]) consume from external
systems and have no source Message Channel for the framework to reschedule a delayed
retry into — fail-fast at bootstrap with a descriptive error pointing at #[ErrorChannel]
and/or #[InstantRetry] as the supported alternatives.

InstantRetryAttributeModule now also accepts #[ChannelAdapter] (parent of #[Scheduled])
in addition to #[MessageConsumer], so the recommended workaround works docker-free.
Mirrors Test\Ecotone\Dbal\Integration\DeadLetterTest::test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler — same shape, real Kafka transport. Verifies that a failure on a #[KafkaConsumer] lands in the DBAL Dead Letter, replyAll() routes it back to the same consumer's handler with the original payload, and the handler runs successfully on the second attempt.
The DLQ handler routes via MessagingEntrypoint, which is a synchronous in-process dispatch — invocations should be 2 immediately after replyAll(), no need to run() the consumer again. Drop the redundant second poll and assert directly. Cuts the test runtime from ~35s to ~4s.
…t fixtures; extract exception factory

- ChannelAdapter now extends MessageConsumer (and MessageConsumer extends IdentifiedAnnotation), so checks throughout the framework reference a single base class instead of two.
- Move all introduced test fixtures inline (anonymous classes for single-use, named classes within the same test file for multi-use). Drops 9 separate fixture files.
- Extract ConfigurationException/LicensingException messages for Error Channel + Delayed Retry placement validations into an ErrorChannelExceptionMessages factory; keeps the validation logic readable.
- Add a second run() check to dead-letter replay tests (Kafka + Dbal) verifying the replayed message is not re-consumed.
…ctory

Move 6 more inline exception strings into the factory so the validation logic stays focused on intent rather than text:

- DbalDeadLetterHandler: "cannot reply ... no polledChannelName/inboundRequestChannel/routingSlip"
- DelayedRetryErrorHandler: "Failed to handle Error Message via Retry Configuration ..."
- InstantRetryAttributeModule: "InstantRetry only on Inbound Channel Adapter" + Enterprise licence check
- MessagingSystemConfiguration: "asynchronousExecution requires Enterprise"
- MessagingGatewayModule: gateway-level Error Channel + DelayedRetry licence checks
…an name

#[Scheduled] handlers now set INBOUND_REQUEST_CHANNEL, so TracerInterceptor produces "Receiving from inbound channel adapter: <channel>" rather than the old fallback "Endpoint: <endpointId> produced Message". The new format is consistent with pollable channel spans ("Receiving from channel: <channel>") and tells the user where the message went, not just which endpoint produced it.
… messages

The async multi-tenant projection test relies on \$ecotone->run() processing both tenants' queued messages on a single call so each tenant's first message triggers lazy projection initialization. The default polling metadata was not guaranteed to drain the queue in CI, leaving tenant_b's projection state UNINITIALIZED and the in_progress_tickets table absent in tenant_b's database.

Pass explicit ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 2) so both tenants' events are reliably processed in one run() call, ensuring lazy init fires for each tenant before queries run.
@dgafka dgafka merged commit 4a70a01 into main May 7, 2026
8 checks passed
@dgafka dgafka deleted the dead-letter-with-channel-adapter branch May 7, 2026 19:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant