feat: per-handler error channel and delayed retry for async endpoints (fixes #659)#665
Merged
Conversation
…nc handler methods Both attributes have no effect when placed alongside #[Asynchronous] on a handler method; they must be passed via #[Asynchronous(asynchronousExecution: [...])] for the polling consumer to pick them up. Detect the misplacement at compile time and throw a ConfigurationException pointing the user at the correct form.
…try] on #[Scheduled] Inbound Channel Adapters (Kafka, AMQP inbound, #[Scheduled]) consume from external systems and have no source Message Channel for the framework to reschedule a delayed retry into — fail-fast at bootstrap with a descriptive error pointing at #[ErrorChannel] and/or #[InstantRetry] as the supported alternatives. InstantRetryAttributeModule now also accepts #[ChannelAdapter] (parent of #[Scheduled]) in addition to #[MessageConsumer], so the recommended workaround works docker-free.
Mirrors Test\Ecotone\Dbal\Integration\DeadLetterTest::test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler — same shape, real Kafka transport. Verifies that a failure on a #[KafkaConsumer] lands in the DBAL Dead Letter, replyAll() routes it back to the same consumer's handler with the original payload, and the handler runs successfully on the second attempt.
The DLQ handler routes via MessagingEntrypoint, which is a synchronous in-process dispatch — invocations should be 2 immediately after replyAll(), no need to run() the consumer again. Drop the redundant second poll and assert directly. Cuts the test runtime from ~35s to ~4s.
…t fixtures; extract exception factory - ChannelAdapter now extends MessageConsumer (and MessageConsumer extends IdentifiedAnnotation), so checks throughout the framework reference a single base class instead of two. - Move all introduced test fixtures inline (anonymous classes for single-use, named classes within the same test file for multi-use). Drops 9 separate fixture files. - Extract ConfigurationException/LicensingException messages for Error Channel + Delayed Retry placement validations into an ErrorChannelExceptionMessages factory; keeps the validation logic readable. - Add a second run() check to dead-letter replay tests (Kafka + Dbal) verifying the replayed message is not re-consumed.
…ctory Move 6 more inline exception strings into the factory so the validation logic stays focused on intent rather than text: - DbalDeadLetterHandler: "cannot reply ... no polledChannelName/inboundRequestChannel/routingSlip" - DelayedRetryErrorHandler: "Failed to handle Error Message via Retry Configuration ..." - InstantRetryAttributeModule: "InstantRetry only on Inbound Channel Adapter" + Enterprise licence check - MessagingSystemConfiguration: "asynchronousExecution requires Enterprise" - MessagingGatewayModule: gateway-level Error Channel + DelayedRetry licence checks
…an name
#[Scheduled] handlers now set INBOUND_REQUEST_CHANNEL, so TracerInterceptor produces "Receiving from inbound channel adapter: <channel>" rather than the old fallback "Endpoint: <endpointId> produced Message". The new format is consistent with pollable channel spans ("Receiving from channel: <channel>") and tells the user where the message went, not just which endpoint produced it.
… messages The async multi-tenant projection test relies on \$ecotone->run() processing both tenants' queued messages on a single call so each tenant's first message triggers lazy projection initialization. The default polling metadata was not guaranteed to drain the queue in CI, leaving tenant_b's projection state UNINITIALIZED and the in_progress_tickets table absent in tenant_b's database. Pass explicit ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 2) so both tenants' events are reliably processed in one run() call, ensuring lazy init fires for each tenant before queries run.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why is this change proposed?
Originally reported in #659: configuring
withDefaultErrorChannel(...)on a#[KafkaConsumer](or any inbound channel adapter) crashed at runtime withMessageHeaderDoesNotExistsExceptionbecause the framework assumed every polled message carriedPOLLED_CHANNEL_NAME, which inbound channel adapters don't set. While fixing that, two related gaps surfaced: there was no way to scope an Error Channel or a delayed-retry policy to a single async handler when multiple handlers shared a transport, and dead-letter replay couldn't route messages back to inbound-adapter handlers because the framework had no header carrying the actual request channel.This PR fixes the original crash and turns the existing
asynchronousExecution: [...]parameter on#[Asynchronous]into a real per-handler error-handling story —#[ErrorChannel]and the new#[DelayedRetry]both work there, with full Dead Letter replay support for inbound channel adapters.Usage examples
Per-handler Error Channel for handlers sharing one transport
#[DelayedRetry]— retries with backoff and dead letter, per handler#[DelayedRetry]on a gatewayCombining attributes (e.g. skip transactions for a notification handler)
Inbound Channel Adapter (
#[KafkaConsumer]) with#[ErrorChannel]Failures are forwarded to the configured Error Channel — typically a Dead Letter for later replay:
Inbound Channel Adapter (
#[KafkaConsumer]) with#[InstantRetry]+#[ErrorChannel]In-process retries before forwarding the failure to the Error Channel:
Inbound Channel Adapter (
#[Scheduled]) with#[ErrorChannel]Inbound Channel Adapter (
#[Scheduled]) with#[InstantRetry]+#[ErrorChannel]Use case scenarios
orderschannel; no need to split queues for different policies.withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL)now works for#[KafkaConsumer](this was the original crash). Failed messages are stored in the DBAL Dead Letter andDeadLetterGateway::replyAll()correctly replays them back to the same Kafka consumer endpoint.CommandBuswith#[DelayedRetry(...)]once and reuse it across an application: callers transparently get retries + dead-letter without each handler having to implement try/catch.Failure flow with
#[DelayedRetry]sequenceDiagram participant User participant CommandBus participant AsyncChannel as Async Channel participant Consumer as Polling Consumer participant Handler participant Retry as DelayedRetryErrorHandler participant DLQ as Dead Letter User->>CommandBus: send(Command) CommandBus->>AsyncChannel: enqueue Consumer->>AsyncChannel: poll AsyncChannel-->>Consumer: Message Consumer->>Handler: invoke Handler--xConsumer: throws Consumer->>Retry: failed Message alt retries remaining Retry->>AsyncChannel: reschedule with delay else retries exhausted Retry->>DLQ: store User->>DLQ: replyAll() DLQ->>Handler: replay (via INBOUND_REQUEST_CHANNEL or POLLED_CHANNEL_NAME) endPull Request Contribution Terms