Skip to content

dispatcher,event,cloudstorage: add DML two-stage ack#4263

Open
3AceShowHand wants to merge 7 commits intopingcap:masterfrom
3AceShowHand:storage-sink-two-stage-ack
Open

dispatcher,event,cloudstorage: add DML two-stage ack#4263
3AceShowHand wants to merge 7 commits intopingcap:masterfrom
3AceShowHand:storage-sink-two-stage-ack

Conversation

@3AceShowHand
Copy link
Collaborator

@3AceShowHand 3AceShowHand commented Feb 24, 2026

What problem does this PR solve?

Issue Number: close #4269

The dispatcher wake callback for DML was previously tied to PostFlush, so wake latency could be dominated by sink flush latency. This coupled scheduling progress too tightly with remote flush speed.

This PR introduces a two-stage ack model for DML:

  1. enqueue-stage ack for wake
  2. flush-stage ack for checkpoint

It also fixes three follow-up correctness/semantics issues tracked in #4269:

  1. Enqueue callback timing in cloud storage sink was too early.
  2. PostEnqueue exactly-once guard was not concurrency-safe.
  3. Filtered DML rebuild path did not preserve enqueue callbacks.

What is changed and how it works?

  • Added enqueue-stage callback support in DMLEvent:
    • PostTxnEnqueued []func()
    • PostEnqueue()
    • AddPostEnqueueFunc(...)
    • ClearPostEnqueueFunc(...)
  • PostFlush() still calls PostEnqueue() first as fallback, but PostEnqueue now uses atomic CAS to guarantee exactly-once execution under concurrency.
  • Switched dispatcher wake trigger from AddPostFlushFunc(...) to AddPostEnqueueFunc(...) in BasicDispatcher.handleEvents.
  • Cloud storage sink now triggers enqueue ack when an encoded/defragmented fragment is dispatched to writer input (instead of immediately after pushing into source queue).
  • FilterDMLEvent rebuild path now preserves enqueue callbacks/state when constructing filtered DMLEvent.
  • Added/updated tests for:
    • enqueue timing semantics in cloud storage sink
    • callback preservation through filtering path
    • concurrent PostEnqueue/PostFlush race and exactly-once behavior

Behavioral intent:

  • Wake can happen earlier (on enqueue), reducing scheduling delay.
  • Checkpoint semantics remain unchanged (still gated by flush completion).
  • Callback semantics are now consistent and race-free across paths.

Check List

Tests

  • Unit test

Questions

Will it cause performance regression or break compatibility?

No known compatibility break:

  • Checkpoint is still flush-bound.
  • Wake timing is moved earlier to enqueue stage with a stricter enqueue definition in storage sink.
  • Exactly-once enqueue callback execution is now concurrency-safe.

Expected performance effect:

  • Lower wake latency under slow flush conditions.
  • Better decoupling between dispatch scheduling and sink remote flush latency.
Do you need to update user documentation, design documentation or monitoring documentation?

No.

Release note

`None`

Summary by CodeRabbit

  • New Features

    • Added enqueue-stage callbacks for DML events with single-execution guarantees.
  • Improvements

    • Enqueue callbacks now run at enqueue time and preserve ordering relative to flush callbacks.
    • Callback state is preserved when events are filtered and across sinks.
    • Simplified dispatch path to push fragments directly and removed a batching wrapper around emits.
    • Shared helper for row-level flush triggering; default cross-node table behavior now disabled.
  • Tests

    • Added tests covering enqueue callbacks, concurrency, ordering, and pipeline-start semantics.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 24, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 24, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign sdojjy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label Feb 24, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a two-stage acknowledgment mechanism for DML events, separating the acknowledgment of an event being enqueued into the sink pipeline from its final flushing to the downstream. This change aims to improve the responsiveness of the dispatcher by allowing it to process subsequent events earlier, while still ensuring checkpoint advancement relies on full flushing.

Highlights

  • Introduced Two-Stage DML Acknowledgment: Implemented PostTxnEnqueued callbacks for DML events, allowing for an earlier acknowledgment stage when events are enqueued into the sink pipeline, distinct from the PostTxnFlushed stage.
  • Optimized Dispatcher Wake-up Logic: Modified the BasicDispatcher to trigger its wakeCallback immediately after DML events are enqueued, rather than waiting for them to be fully flushed, improving event processing throughput.
  • Ensured Idempotent Enqueue Callbacks: Added a mechanism to ensure that PostTxnEnqueued callbacks are executed at most once, even if PostEnqueue is called multiple times.
  • Integrated Cloud Storage Sink: Updated the cloud storage DML writer to invoke the PostEnqueue callback once an event is successfully pushed to its internal message channel.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • downstreamadapter/dispatcher/basic_dispatcher.go
    • Updated the handleEvents function's wakeCallback trigger from PostFlushFunc to PostEnqueueFunc for DML events.
    • Revised comments to reflect the new two-stage acknowledgment behavior.
  • downstreamadapter/sink/cloudstorage/dml_writers.go
    • Added a call to event.PostEnqueue() after a DML event is pushed to the internal message channel.
  • pkg/common/event/dml_event.go
    • Added PostTxnEnqueued slice to store callbacks for the enqueue stage.
    • Introduced postEnqueueCalled boolean to ensure PostEnqueue callbacks are executed only once.
    • Implemented PostEnqueue() method to execute PostTxnEnqueued callbacks.
    • Added ClearPostEnqueueFunc() and AddPostEnqueueFunc() methods.
    • Modified PostFlush() to call PostEnqueue() first.
  • pkg/common/event/dml_event_test.go
    • Added TestDMLEventPostEnqueueFuncs to verify multiple enqueue callbacks are triggered.
    • Added TestDMLEventPostFlushTriggersPostEnqueueOnce to confirm PostEnqueue is called once when PostFlush is invoked multiple times.
Activity
  • No specific activity (comments, reviews, progress) was provided in the PR description or context.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 24, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a new PostEnqueue stage to DMLEvent (atomic, idempotent), invokes enqueue callbacks at enqueue completion across sinks/dispatchers, preserves enqueue state through filtering, replaces several inline row-callbacks with a shared helper, and adds tests for ordering and concurrency.

Changes

Cohort / File(s) Summary
Event callback infrastructure
pkg/common/event/dml_event.go, pkg/common/event/dml_event_test.go
Introduce PostTxnEnqueued and postEnqueueCalled with atomic guarding; add PostEnqueue/Add/Clear helpers; ensure PostFlush calls PostEnqueue first; add tests for ordering and concurrency.
Active-active filtering
pkg/common/event/active_active.go, pkg/common/event/active_active_test.go
Preserve PostTxnEnqueued and postEnqueueCalled when filtering DMLEvent; add test verifying callbacks survive filtering.
Dispatcher behavior & tests
downstreamadapter/dispatcher/basic_dispatcher.go, downstreamadapter/dispatcher/event_dispatcher_test.go
Switch from waking on PostFlush to waking on PostEnqueue for DML enqueue completion; add test ensuring correct wake timing per sink type.
CloudStorage sink integration & config
downstreamadapter/sink/cloudstorage/defragmenter.go, downstreamadapter/sink/cloudstorage/dml_writers.go, downstreamadapter/sink/cloudstorage/dml_writers_test.go, pkg/sink/cloudstorage/config.go, pkg/sink/cloudstorage/config_test.go
Call PostEnqueue after enqueue for defragmented fragments and CloudStorage mock; remove batching wrapper around AddDMLEvent; flip default EnableTableAcrossNodes and update test; add test ensuring AddDMLEvent doesn't call PostEnqueue before pipeline runs.
Sinks: row-callback consolidation
downstreamadapter/sink/kafka/sink.go, downstreamadapter/sink/pulsar/sink.go, downstreamadapter/sink/redo/sink.go, downstreamadapter/sink/helper/row_callback.go, downstreamadapter/sink/helper/row_callback_test.go
Replace inline per-row atomic closures with helper.NewTxnPostFlushRowCallback(event, rowsCount); add helper and tests validating flush-on-final-row behavior.
Mock sink
downstreamadapter/sink/mock_sink.go
Invoke event.PostEnqueue() for CloudStorage sink type after appending event (enqueue-time side-effect).

Sequence Diagram

sequenceDiagram
    participant Dispatcher as Dispatcher
    participant Worker as Worker
    participant DMLEvent as DMLEvent
    participant Sink as Sink

    Dispatcher->>Worker: enqueue DML fragment (AddPostEnqueueFunc(wake))
    Worker->>DMLEvent: PostEnqueue()
    DMLEvent->>Dispatcher: execute enqueue callbacks (once)
    Worker->>Sink: deliver/process event
    Worker->>DMLEvent: PostFlush()
    DMLEvent->>DMLEvent: ensure PostEnqueue() ran (no-op if already)
    DMLEvent->>Sink: execute flush callbacks
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related issues

Possibly related PRs

Suggested labels

lgtm, approved

Suggested reviewers

  • hongyunyan
  • wk989898
  • asddongmen

Poem

🐰 I hopped the enqueue then gave a cheer,
Callbacks lined up, no races to fear,
Atomics kept count as events sailed through,
Enqueue rings the bell — flush finishes true,
A tiny hop for code, a carrot for you! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main change: introducing a two-stage acknowledgement model for DML events with enqueue and flush stages.
Description check ✅ Passed The description is comprehensive and follows the template structure: it clearly states the problem (#4269), explains the two-stage ack model in detail, lists all changes made, covers testing (unit tests added), addresses performance and compatibility concerns, and confirms no documentation updates needed.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a two-stage acknowledgement mechanism for DML events by adding a PostEnqueue callback, which is triggered earlier than the existing PostFlush callback. This allows for better pipelining and potentially higher throughput. The changes are implemented across the dispatcher, event, and cloudstorage sink components.

The core logic change appears correct and is supported by new unit tests. However, I've identified a critical race condition in the implementation of PostEnqueue which could lead to callbacks being executed multiple times under concurrent access. I've provided suggestions to fix this using atomic operations. I've also suggested enhancing the tests to cover concurrent scenarios to prevent similar issues in the future.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/common/event/dml_event.go`:
- Around line 395-402: The boolean guard postEnqueueCalled in the DML event
struct is not concurrency-safe and can cause double-invokes/races; replace the
bool guard with a sync.Once (or an atomic CAS) and update the PostEnqueue
execution paths to call once.Do(...) so callbacks in PostTxnEnqueued and
PostTxnFlushed are invoked exactly once; apply the same change to the other
occurrence referenced around lines 645-653, ensuring the struct holds a
sync.Once (e.g., postEnqueueOnce) and all code paths use that Once to run the
callbacks instead of checking/setting postEnqueueCalled.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 75e291b and 92ee0e1.

📒 Files selected for processing (4)
  • downstreamadapter/dispatcher/basic_dispatcher.go
  • downstreamadapter/sink/cloudstorage/dml_writers.go
  • pkg/common/event/dml_event.go
  • pkg/common/event/dml_event_test.go

@ti-chi-bot ti-chi-bot bot added release-note-none Denotes a PR that doesn't merit a release note. and removed release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 24, 2026
@3AceShowHand
Copy link
Collaborator Author

/test all

@ti-chi-bot ti-chi-bot bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Feb 25, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
pkg/common/event/dml_event.go (1)

693-704: ClearPostEnqueueFunc inconsistency: method name suggests reset but doesn't reset the postEnqueueCalled guard.

While ClearPostEnqueueFunc clears the callback slice, it doesn't reset the atomic guard that prevents re-execution. If callbacks are cleared after PostEnqueue() has already fired and new ones are added, they will never execute. However, this method is not used anywhere in the codebase and DML events follow a create-once pattern, so this has no practical impact. Consider either documenting that ClearPostEnqueueFunc is a one-way operation (callbacks cannot be re-armed after enqueue), or reset the atomic flag if re-use is intended.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/common/event/dml_event.go` around lines 693 - 704, ClearPostEnqueueFunc
currently clears the PostTxnEnqueued slice but does not reset the atomic guard
(postEnqueueCalled) that PostEnqueue() uses to prevent re-execution, so clearing
then adding callbacks will never run them; update ClearPostEnqueueFunc to also
reset the atomic flag (postEnqueueCalled) to its zero/false state when intended
to allow re-use, or if re-arm is not desired, document that ClearPostEnqueueFunc
is one-way—reference the ClearPostEnqueueFunc method, the PostTxnEnqueued slice,
the postEnqueueCalled atomic guard, and the PostEnqueue() execution path when
making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@pkg/common/event/dml_event.go`:
- Around line 693-704: ClearPostEnqueueFunc currently clears the PostTxnEnqueued
slice but does not reset the atomic guard (postEnqueueCalled) that PostEnqueue()
uses to prevent re-execution, so clearing then adding callbacks will never run
them; update ClearPostEnqueueFunc to also reset the atomic flag
(postEnqueueCalled) to its zero/false state when intended to allow re-use, or if
re-arm is not desired, document that ClearPostEnqueueFunc is one-way—reference
the ClearPostEnqueueFunc method, the PostTxnEnqueued slice, the
postEnqueueCalled atomic guard, and the PostEnqueue() execution path when making
the change.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92ee0e1 and 01bd19a.

📒 Files selected for processing (6)
  • downstreamadapter/sink/cloudstorage/defragmenter.go
  • downstreamadapter/sink/cloudstorage/dml_writers_test.go
  • pkg/common/event/active_active.go
  • pkg/common/event/active_active_test.go
  • pkg/common/event/dml_event.go
  • pkg/common/event/dml_event_test.go

@3AceShowHand
Copy link
Collaborator Author

/test all

@3AceShowHand
Copy link
Collaborator Author

/test all

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
downstreamadapter/sink/cloudstorage/dml_writers.go (1)

116-116: Fix typo in the inline comment.

Line 116 says encoupled; please change it to coupled for clarity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/dml_writers.go` at line 116, Update the
inline comment that currently reads "emit a TxnCallbackableEvent encoupled with
a sequence number starting from one." to correct the typo by replacing
"encoupled" with "coupled" in the comment near the TxnCallbackableEvent emission
in dml_writers.go so it reads "emit a TxnCallbackableEvent coupled with a
sequence number starting from one."
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/dml_writers.go`:
- Line 116: Update the inline comment that currently reads "emit a
TxnCallbackableEvent encoupled with a sequence number starting from one." to
correct the typo by replacing "encoupled" with "coupled" in the comment near the
TxnCallbackableEvent emission in dml_writers.go so it reads "emit a
TxnCallbackableEvent coupled with a sequence number starting from one."

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 01bd19a and 6a1be2d.

📒 Files selected for processing (3)
  • downstreamadapter/sink/cloudstorage/dml_writers.go
  • pkg/sink/cloudstorage/config.go
  • pkg/sink/cloudstorage/config_test.go

@3AceShowHand
Copy link
Collaborator Author

/test all

@3AceShowHand
Copy link
Collaborator Author

/test all

- extract shared txn post-flush row callback helper for kafka/pulsar/redo\n- keep sink-specific wake behavior explicit via dispatcher test\n- streamline callback coverage to avoid redundant sink-level assertions
@ti-chi-bot ti-chi-bot bot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Feb 26, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
downstreamadapter/sink/mock_sink.go (1)

31-38: ⚠️ Potential issue | 🟠 Major

Do not invoke PostEnqueue while holding s.mu.

event.PostEnqueue() runs external callbacks; executing it under the sink mutex can deadlock via re-entrant/mock interactions.

💡 Proposed fix
 func (s *mockSink) AddDMLEvent(event *commonEvent.DMLEvent) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
+	s.mu.Lock()
 	s.dmls = append(s.dmls, event)
+	shouldPostEnqueue := s.sinkType == common.CloudStorageSinkType
+	s.mu.Unlock()
+
 	// CloudStorage sink wakes dispatcher on enqueue stage.
-	if s.sinkType == common.CloudStorageSinkType {
+	if shouldPostEnqueue {
 		event.PostEnqueue()
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/mock_sink.go` around lines 31 - 38, The AddDMLEvent
method currently calls event.PostEnqueue() while holding s.mu, which can
deadlock because PostEnqueue runs external callbacks; modify
mockSink.AddDMLEvent so it only holds s.mu to append to s.dmls and determine
whether PostEnqueue needs to be called (check s.sinkType ==
common.CloudStorageSinkType), then release the lock and call event.PostEnqueue()
outside the critical section; reference the mockSink.AddDMLEvent method, s.mu,
s.dmls, s.sinkType and PostEnqueue to locate the change.
🧹 Nitpick comments (1)
downstreamadapter/sink/helper/row_callback_test.go (1)

24-42: Add a regression test for zero-row callback count.

Please add a totalCount == 0 case to verify txn flush completion behavior explicitly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/helper/row_callback_test.go` around lines 24 - 42, Add
a test case in TestTxnPostFlushRowCallback to cover totalCount == 0: create an
event and register the post-flush hook via event.AddPostFlushFunc (same
flushCount variable), then construct rowCallback :=
NewTxnPostFlushRowCallback(event, 0), invoke rowCallback() once and assert
flushCount == 1 to verify flush completion behavior for zero-row transactions;
keep this adjacent to the existing cases for clarity.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/helper/row_callback.go`:
- Around line 23-30: NewTxnPostFlushRowCallback currently never calls
event.PostFlush when totalCount == 0; update the function so that if totalCount
is zero it triggers event.PostFlush immediately (or returns a no-op func after
calling PostFlush) to ensure txn flush completion is not missed. Locate
NewTxnPostFlushRowCallback and modify the logic around totalCount and
calledCount so a zero totalCount results in an immediate PostFlush call (or the
returned closure becomes a harmless no-op) while preserving the existing atomic
increment behavior for totalCount > 0.

---

Outside diff comments:
In `@downstreamadapter/sink/mock_sink.go`:
- Around line 31-38: The AddDMLEvent method currently calls event.PostEnqueue()
while holding s.mu, which can deadlock because PostEnqueue runs external
callbacks; modify mockSink.AddDMLEvent so it only holds s.mu to append to s.dmls
and determine whether PostEnqueue needs to be called (check s.sinkType ==
common.CloudStorageSinkType), then release the lock and call event.PostEnqueue()
outside the critical section; reference the mockSink.AddDMLEvent method, s.mu,
s.dmls, s.sinkType and PostEnqueue to locate the change.

---

Nitpick comments:
In `@downstreamadapter/sink/helper/row_callback_test.go`:
- Around line 24-42: Add a test case in TestTxnPostFlushRowCallback to cover
totalCount == 0: create an event and register the post-flush hook via
event.AddPostFlushFunc (same flushCount variable), then construct rowCallback :=
NewTxnPostFlushRowCallback(event, 0), invoke rowCallback() once and assert
flushCount == 1 to verify flush completion behavior for zero-row transactions;
keep this adjacent to the existing cases for clarity.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6a1be2d and 2f8b51c.

📒 Files selected for processing (8)
  • downstreamadapter/dispatcher/basic_dispatcher.go
  • downstreamadapter/dispatcher/event_dispatcher_test.go
  • downstreamadapter/sink/helper/row_callback.go
  • downstreamadapter/sink/helper/row_callback_test.go
  • downstreamadapter/sink/kafka/sink.go
  • downstreamadapter/sink/mock_sink.go
  • downstreamadapter/sink/pulsar/sink.go
  • downstreamadapter/sink/redo/sink.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • downstreamadapter/dispatcher/basic_dispatcher.go

@3AceShowHand
Copy link
Collaborator Author

/test all

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 26, 2026

@3AceShowHand: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-mysql-integration-heavy 841178b link true /test pull-cdc-mysql-integration-heavy
pull-cdc-pulsar-integration-heavy 841178b link false /test pull-cdc-pulsar-integration-heavy
pull-cdc-pulsar-integration-light 841178b link false /test pull-cdc-pulsar-integration-light
pull-cdc-storage-integration-heavy 841178b link true /test pull-cdc-storage-integration-heavy
pull-cdc-storage-integration-light 841178b link true /test pull-cdc-storage-integration-light

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note-none Denotes a PR that doesn't merit a release note. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

dispatcher,event,cloudstorage: fix enqueue callback semantics and race

1 participant