Skip to content

dispatcher,event,cloudstorage: add DML two-stage ack#4263

Open
3AceShowHand wants to merge 4 commits intopingcap:masterfrom
3AceShowHand:storage-sink-two-stage-ack
Open

dispatcher,event,cloudstorage: add DML two-stage ack#4263
3AceShowHand wants to merge 4 commits intopingcap:masterfrom
3AceShowHand:storage-sink-two-stage-ack

Conversation

@3AceShowHand
Copy link
Collaborator

@3AceShowHand 3AceShowHand commented Feb 24, 2026

What problem does this PR solve?

Issue Number: close #4269

The dispatcher wake callback for DML was previously tied to PostFlush, so wake latency could be dominated by sink flush latency. This coupled scheduling progress too tightly with remote flush speed.

This PR introduces a two-stage ack model for DML:

  1. enqueue-stage ack for wake
  2. flush-stage ack for checkpoint

It also fixes three follow-up correctness/semantics issues tracked in #4269:

  1. Enqueue callback timing in cloud storage sink was too early.
  2. PostEnqueue exactly-once guard was not concurrency-safe.
  3. Filtered DML rebuild path did not preserve enqueue callbacks.

What is changed and how it works?

  • Added enqueue-stage callback support in DMLEvent:
    • PostTxnEnqueued []func()
    • PostEnqueue()
    • AddPostEnqueueFunc(...)
    • ClearPostEnqueueFunc(...)
  • PostFlush() still calls PostEnqueue() first as fallback, but PostEnqueue now uses atomic CAS to guarantee exactly-once execution under concurrency.
  • Switched dispatcher wake trigger from AddPostFlushFunc(...) to AddPostEnqueueFunc(...) in BasicDispatcher.handleEvents.
  • Cloud storage sink now triggers enqueue ack when an encoded/defragmented fragment is dispatched to writer input (instead of immediately after pushing into source queue).
  • FilterDMLEvent rebuild path now preserves enqueue callbacks/state when constructing filtered DMLEvent.
  • Added/updated tests for:
    • enqueue timing semantics in cloud storage sink
    • callback preservation through filtering path
    • concurrent PostEnqueue/PostFlush race and exactly-once behavior

Behavioral intent:

  • Wake can happen earlier (on enqueue), reducing scheduling delay.
  • Checkpoint semantics remain unchanged (still gated by flush completion).
  • Callback semantics are now consistent and race-free across paths.

Check List

Tests

  • Unit test

Questions

Will it cause performance regression or break compatibility?

No known compatibility break:

  • Checkpoint is still flush-bound.
  • Wake timing is moved earlier to enqueue stage with a stricter enqueue definition in storage sink.
  • Exactly-once enqueue callback execution is now concurrency-safe.

Expected performance effect:

  • Lower wake latency under slow flush conditions.
  • Better decoupling between dispatch scheduling and sink remote flush latency.
Do you need to update user documentation, design documentation or monitoring documentation?

No.

Release note

`None`

Summary by CodeRabbit

  • New Features

    • Added enqueue-stage callbacks for DML events with lifecycle controls.
  • Improvements

    • Enqueue callbacks now run at enqueue time with single-execution guarantees and preserved ordering relative to flush callbacks.
    • Callback state is preserved when events are filtered.
    • Simplified event dispatch path to push fragments directly.
    • Default cross-node table behavior flipped to disabled.
  • Tests

    • Added tests covering enqueue callbacks, concurrency, ordering, and pipeline-start semantics.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 24, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 24, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign sdojjy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label Feb 24, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a two-stage acknowledgment mechanism for DML events, separating the acknowledgment of an event being enqueued into the sink pipeline from its final flushing to the downstream. This change aims to improve the responsiveness of the dispatcher by allowing it to process subsequent events earlier, while still ensuring checkpoint advancement relies on full flushing.

Highlights

  • Introduced Two-Stage DML Acknowledgment: Implemented PostTxnEnqueued callbacks for DML events, allowing for an earlier acknowledgment stage when events are enqueued into the sink pipeline, distinct from the PostTxnFlushed stage.
  • Optimized Dispatcher Wake-up Logic: Modified the BasicDispatcher to trigger its wakeCallback immediately after DML events are enqueued, rather than waiting for them to be fully flushed, improving event processing throughput.
  • Ensured Idempotent Enqueue Callbacks: Added a mechanism to ensure that PostTxnEnqueued callbacks are executed at most once, even if PostEnqueue is called multiple times.
  • Integrated Cloud Storage Sink: Updated the cloud storage DML writer to invoke the PostEnqueue callback once an event is successfully pushed to its internal message channel.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • downstreamadapter/dispatcher/basic_dispatcher.go
    • Updated the handleEvents function's wakeCallback trigger from PostFlushFunc to PostEnqueueFunc for DML events.
    • Revised comments to reflect the new two-stage acknowledgment behavior.
  • downstreamadapter/sink/cloudstorage/dml_writers.go
    • Added a call to event.PostEnqueue() after a DML event is pushed to the internal message channel.
  • pkg/common/event/dml_event.go
    • Added PostTxnEnqueued slice to store callbacks for the enqueue stage.
    • Introduced postEnqueueCalled boolean to ensure PostEnqueue callbacks are executed only once.
    • Implemented PostEnqueue() method to execute PostTxnEnqueued callbacks.
    • Added ClearPostEnqueueFunc() and AddPostEnqueueFunc() methods.
    • Modified PostFlush() to call PostEnqueue() first.
  • pkg/common/event/dml_event_test.go
    • Added TestDMLEventPostEnqueueFuncs to verify multiple enqueue callbacks are triggered.
    • Added TestDMLEventPostFlushTriggersPostEnqueueOnce to confirm PostEnqueue is called once when PostFlush is invoked multiple times.
Activity
  • No specific activity (comments, reviews, progress) was provided in the PR description or context.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 24, 2026

📝 Walkthrough

Walkthrough

Introduces a PostEnqueue stage for DMLEvent with atomic guarding, shifts enqueue callbacks to be invoked at enqueue time, preserves enqueue state across filtering, updates dispatcher/sink to use PostEnqueue, and adds tests validating concurrency and lifecycle.

Changes

Cohort / File(s) Summary
Event callback infrastructure
pkg/common/event/dml_event.go
Added PostTxnEnqueued and postEnqueueCalled fields; implemented PostEnqueue()/Add/Clear helpers; made PostFlush() call PostEnqueue() first; added atomic guard and docs.
Event callback tests
pkg/common/event/dml_event_test.go
Added tests for PostEnqueue behavior, ordering with PostFlush, and concurrency guarantees using atomic counters.
Dispatcher integration
downstreamadapter/dispatcher/basic_dispatcher.go
Replaced PostFlush hook registration with AddPostEnqueueFunc and changed wake semantics to invoke wake callback on enqueue.
Cloud storage sink
downstreamadapter/sink/cloudstorage/defragmenter.go, downstreamadapter/sink/cloudstorage/dml_writers_test.go, downstreamadapter/sink/cloudstorage/dml_writers.go
Call PostEnqueue() after dispatching defragmented fragments; removed batching/metrics wrapper around AddDMLEvent; added test ensuring AddDMLEvent doesn't trigger PostEnqueue before pipeline starts.
Active-active filtering
pkg/common/event/active_active.go, pkg/common/event/active_active_test.go
newFilteredDMLEvent now copies PostTxnEnqueued and postEnqueueCalled atomic state to preserve enqueue/flush callback semantics; added test validating preservation.
Config default
pkg/sink/cloudstorage/config.go, pkg/sink/cloudstorage/config_test.go
Flipped default defaultEnableTableAcrossNodes from true to false and adjusted test expectation accordingly.

Sequence Diagram

sequenceDiagram
    participant Dispatcher
    participant DMLEvent
    participant Worker
    participant Sink

    Dispatcher->>DMLEvent: AddPostEnqueueFunc(wakeCallback)
    Dispatcher->>Worker: Enqueue DML event (fragment)
    Worker->>DMLEvent: PostEnqueue()
    DMLEvent->>Dispatcher: Execute enqueue callbacks (once)
    Worker->>Sink: Process event (defragment/emit)
    Worker->>DMLEvent: PostFlush()
    DMLEvent->>DMLEvent: Ensure PostEnqueue() ran (no-op)
    DMLEvent->>Sink: Execute flush callbacks
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related issues

Suggested labels

lgtm, approved, release-note, size/XL

Suggested reviewers

  • flowbehappy
  • wk989898

Poem

🐰 I hopped through callbacks, one then two,
Enqueue first, then flush — order kept true,
Atomics guard the hop and keep it neat,
Events march onward on tidy little feet! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 9.09% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'dispatcher,event,cloudstorage: add DML two-stage ack' is concise, specific, and accurately reflects the main change: introducing a two-stage acknowledgement model for DML events across multiple components.
Description check ✅ Passed The PR description comprehensively addresses all required template sections: it identifies the issue (#4269), explains the problem and solution, includes test coverage, discusses compatibility, and provides a release note. All critical information is present and well-organized.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a two-stage acknowledgement mechanism for DML events by adding a PostEnqueue callback, which is triggered earlier than the existing PostFlush callback. This allows for better pipelining and potentially higher throughput. The changes are implemented across the dispatcher, event, and cloudstorage sink components.

The core logic change appears correct and is supported by new unit tests. However, I've identified a critical race condition in the implementation of PostEnqueue which could lead to callbacks being executed multiple times under concurrent access. I've provided suggestions to fix this using atomic operations. I've also suggested enhancing the tests to cover concurrent scenarios to prevent similar issues in the future.

Comment on lines +347 to +363
func TestDMLEventPostEnqueueFuncs(t *testing.T) {
t.Parallel()

event := &DMLEvent{}
var called int64
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&called, 1)
})
event.AddPostEnqueueFunc(func() {
atomic.AddInt64(&called, 1)
})

event.PostEnqueue()
event.PostEnqueue()

require.Equal(t, int64(2), atomic.LoadInt64(&called))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test is good for verifying the sequential logic, but it doesn't cover the case of concurrent calls to PostEnqueue, where a race condition exists in the current implementation. After addressing the race condition I pointed out in dml_event.go, it would be beneficial to add a test case that calls PostEnqueue and/or PostFlush from multiple goroutines to ensure the fix is effective and the implementation is thread-safe.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/common/event/dml_event.go`:
- Around line 395-402: The boolean guard postEnqueueCalled in the DML event
struct is not concurrency-safe and can cause double-invokes/races; replace the
bool guard with a sync.Once (or an atomic CAS) and update the PostEnqueue
execution paths to call once.Do(...) so callbacks in PostTxnEnqueued and
PostTxnFlushed are invoked exactly once; apply the same change to the other
occurrence referenced around lines 645-653, ensuring the struct holds a
sync.Once (e.g., postEnqueueOnce) and all code paths use that Once to run the
callbacks instead of checking/setting postEnqueueCalled.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 75e291b and 92ee0e1.

📒 Files selected for processing (4)
  • downstreamadapter/dispatcher/basic_dispatcher.go
  • downstreamadapter/sink/cloudstorage/dml_writers.go
  • pkg/common/event/dml_event.go
  • pkg/common/event/dml_event_test.go

@ti-chi-bot ti-chi-bot bot added release-note-none Denotes a PR that doesn't merit a release note. and removed release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 24, 2026
@3AceShowHand
Copy link
Collaborator Author

/test all

@ti-chi-bot ti-chi-bot bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Feb 25, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
pkg/common/event/dml_event.go (1)

693-704: ClearPostEnqueueFunc inconsistency: method name suggests reset but doesn't reset the postEnqueueCalled guard.

While ClearPostEnqueueFunc clears the callback slice, it doesn't reset the atomic guard that prevents re-execution. If callbacks are cleared after PostEnqueue() has already fired and new ones are added, they will never execute. However, this method is not used anywhere in the codebase and DML events follow a create-once pattern, so this has no practical impact. Consider either documenting that ClearPostEnqueueFunc is a one-way operation (callbacks cannot be re-armed after enqueue), or reset the atomic flag if re-use is intended.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/common/event/dml_event.go` around lines 693 - 704, ClearPostEnqueueFunc
currently clears the PostTxnEnqueued slice but does not reset the atomic guard
(postEnqueueCalled) that PostEnqueue() uses to prevent re-execution, so clearing
then adding callbacks will never run them; update ClearPostEnqueueFunc to also
reset the atomic flag (postEnqueueCalled) to its zero/false state when intended
to allow re-use, or if re-arm is not desired, document that ClearPostEnqueueFunc
is one-way—reference the ClearPostEnqueueFunc method, the PostTxnEnqueued slice,
the postEnqueueCalled atomic guard, and the PostEnqueue() execution path when
making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@pkg/common/event/dml_event.go`:
- Around line 693-704: ClearPostEnqueueFunc currently clears the PostTxnEnqueued
slice but does not reset the atomic guard (postEnqueueCalled) that PostEnqueue()
uses to prevent re-execution, so clearing then adding callbacks will never run
them; update ClearPostEnqueueFunc to also reset the atomic flag
(postEnqueueCalled) to its zero/false state when intended to allow re-use, or if
re-arm is not desired, document that ClearPostEnqueueFunc is one-way—reference
the ClearPostEnqueueFunc method, the PostTxnEnqueued slice, the
postEnqueueCalled atomic guard, and the PostEnqueue() execution path when making
the change.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92ee0e1 and 01bd19a.

📒 Files selected for processing (6)
  • downstreamadapter/sink/cloudstorage/defragmenter.go
  • downstreamadapter/sink/cloudstorage/dml_writers_test.go
  • pkg/common/event/active_active.go
  • pkg/common/event/active_active_test.go
  • pkg/common/event/dml_event.go
  • pkg/common/event/dml_event_test.go

@3AceShowHand
Copy link
Collaborator Author

/test all

@3AceShowHand
Copy link
Collaborator Author

/test all

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
downstreamadapter/sink/cloudstorage/dml_writers.go (1)

116-116: Fix typo in the inline comment.

Line 116 says encoupled; please change it to coupled for clarity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/dml_writers.go` at line 116, Update the
inline comment that currently reads "emit a TxnCallbackableEvent encoupled with
a sequence number starting from one." to correct the typo by replacing
"encoupled" with "coupled" in the comment near the TxnCallbackableEvent emission
in dml_writers.go so it reads "emit a TxnCallbackableEvent coupled with a
sequence number starting from one."
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/dml_writers.go`:
- Line 116: Update the inline comment that currently reads "emit a
TxnCallbackableEvent encoupled with a sequence number starting from one." to
correct the typo by replacing "encoupled" with "coupled" in the comment near the
TxnCallbackableEvent emission in dml_writers.go so it reads "emit a
TxnCallbackableEvent coupled with a sequence number starting from one."

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 01bd19a and 6a1be2d.

📒 Files selected for processing (3)
  • downstreamadapter/sink/cloudstorage/dml_writers.go
  • pkg/sink/cloudstorage/config.go
  • pkg/sink/cloudstorage/config_test.go

@3AceShowHand
Copy link
Collaborator Author

/test all

@3AceShowHand
Copy link
Collaborator Author

/test all

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 25, 2026

@3AceShowHand: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-pulsar-integration-light 6a1be2d link false /test pull-cdc-pulsar-integration-light
pull-cdc-kafka-integration-light 6a1be2d link true /test pull-cdc-kafka-integration-light
pull-cdc-kafka-integration-heavy 6a1be2d link true /test pull-cdc-kafka-integration-heavy
pull-cdc-pulsar-integration-heavy 6a1be2d link false /test pull-cdc-pulsar-integration-heavy
pull-cdc-mysql-integration-heavy 6a1be2d link true /test pull-cdc-mysql-integration-heavy
pull-cdc-storage-integration-light 6a1be2d link true /test pull-cdc-storage-integration-light
pull-cdc-mysql-integration-light 6a1be2d link true /test pull-cdc-mysql-integration-light
pull-cdc-storage-integration-heavy 6a1be2d link true /test pull-cdc-storage-integration-heavy

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note-none Denotes a PR that doesn't merit a release note. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

dispatcher,event,cloudstorage: fix enqueue callback semantics and race

1 participant