dispatcher,event,cloudstorage: add DML two-stage ack#4263
dispatcher,event,cloudstorage: add DML two-stage ack#42633AceShowHand wants to merge 7 commits intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a two-stage acknowledgment mechanism for DML events, separating the acknowledgment of an event being enqueued into the sink pipeline from its final flushing to the downstream. This change aims to improve the responsiveness of the dispatcher by allowing it to process subsequent events earlier, while still ensuring checkpoint advancement relies on full flushing. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a new PostEnqueue stage to DMLEvent (atomic, idempotent), invokes enqueue callbacks at enqueue completion across sinks/dispatchers, preserves enqueue state through filtering, replaces several inline row-callbacks with a shared helper, and adds tests for ordering and concurrency. Changes
Sequence DiagramsequenceDiagram
participant Dispatcher as Dispatcher
participant Worker as Worker
participant DMLEvent as DMLEvent
participant Sink as Sink
Dispatcher->>Worker: enqueue DML fragment (AddPostEnqueueFunc(wake))
Worker->>DMLEvent: PostEnqueue()
DMLEvent->>Dispatcher: execute enqueue callbacks (once)
Worker->>Sink: deliver/process event
Worker->>DMLEvent: PostFlush()
DMLEvent->>DMLEvent: ensure PostEnqueue() ran (no-op if already)
DMLEvent->>Sink: execute flush callbacks
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a two-stage acknowledgement mechanism for DML events by adding a PostEnqueue callback, which is triggered earlier than the existing PostFlush callback. This allows for better pipelining and potentially higher throughput. The changes are implemented across the dispatcher, event, and cloudstorage sink components.
The core logic change appears correct and is supported by new unit tests. However, I've identified a critical race condition in the implementation of PostEnqueue which could lead to callbacks being executed multiple times under concurrent access. I've provided suggestions to fix this using atomic operations. I've also suggested enhancing the tests to cover concurrent scenarios to prevent similar issues in the future.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/common/event/dml_event.go`:
- Around line 395-402: The boolean guard postEnqueueCalled in the DML event
struct is not concurrency-safe and can cause double-invokes/races; replace the
bool guard with a sync.Once (or an atomic CAS) and update the PostEnqueue
execution paths to call once.Do(...) so callbacks in PostTxnEnqueued and
PostTxnFlushed are invoked exactly once; apply the same change to the other
occurrence referenced around lines 645-653, ensuring the struct holds a
sync.Once (e.g., postEnqueueOnce) and all code paths use that Once to run the
callbacks instead of checking/setting postEnqueueCalled.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
downstreamadapter/dispatcher/basic_dispatcher.godownstreamadapter/sink/cloudstorage/dml_writers.gopkg/common/event/dml_event.gopkg/common/event/dml_event_test.go
|
/test all |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/common/event/dml_event.go (1)
693-704: ClearPostEnqueueFunc inconsistency: method name suggests reset but doesn't reset thepostEnqueueCalledguard.While
ClearPostEnqueueFuncclears the callback slice, it doesn't reset the atomic guard that prevents re-execution. If callbacks are cleared afterPostEnqueue()has already fired and new ones are added, they will never execute. However, this method is not used anywhere in the codebase and DML events follow a create-once pattern, so this has no practical impact. Consider either documenting thatClearPostEnqueueFuncis a one-way operation (callbacks cannot be re-armed after enqueue), or reset the atomic flag if re-use is intended.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/common/event/dml_event.go` around lines 693 - 704, ClearPostEnqueueFunc currently clears the PostTxnEnqueued slice but does not reset the atomic guard (postEnqueueCalled) that PostEnqueue() uses to prevent re-execution, so clearing then adding callbacks will never run them; update ClearPostEnqueueFunc to also reset the atomic flag (postEnqueueCalled) to its zero/false state when intended to allow re-use, or if re-arm is not desired, document that ClearPostEnqueueFunc is one-way—reference the ClearPostEnqueueFunc method, the PostTxnEnqueued slice, the postEnqueueCalled atomic guard, and the PostEnqueue() execution path when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/common/event/dml_event.go`:
- Around line 693-704: ClearPostEnqueueFunc currently clears the PostTxnEnqueued
slice but does not reset the atomic guard (postEnqueueCalled) that PostEnqueue()
uses to prevent re-execution, so clearing then adding callbacks will never run
them; update ClearPostEnqueueFunc to also reset the atomic flag
(postEnqueueCalled) to its zero/false state when intended to allow re-use, or if
re-arm is not desired, document that ClearPostEnqueueFunc is one-way—reference
the ClearPostEnqueueFunc method, the PostTxnEnqueued slice, the
postEnqueueCalled atomic guard, and the PostEnqueue() execution path when making
the change.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
downstreamadapter/sink/cloudstorage/defragmenter.godownstreamadapter/sink/cloudstorage/dml_writers_test.gopkg/common/event/active_active.gopkg/common/event/active_active_test.gopkg/common/event/dml_event.gopkg/common/event/dml_event_test.go
|
/test all |
|
/test all |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
downstreamadapter/sink/cloudstorage/dml_writers.go (1)
116-116: Fix typo in the inline comment.Line 116 says
encoupled; please change it tocoupledfor clarity.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/dml_writers.go` at line 116, Update the inline comment that currently reads "emit a TxnCallbackableEvent encoupled with a sequence number starting from one." to correct the typo by replacing "encoupled" with "coupled" in the comment near the TxnCallbackableEvent emission in dml_writers.go so it reads "emit a TxnCallbackableEvent coupled with a sequence number starting from one."
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/dml_writers.go`:
- Line 116: Update the inline comment that currently reads "emit a
TxnCallbackableEvent encoupled with a sequence number starting from one." to
correct the typo by replacing "encoupled" with "coupled" in the comment near the
TxnCallbackableEvent emission in dml_writers.go so it reads "emit a
TxnCallbackableEvent coupled with a sequence number starting from one."
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
downstreamadapter/sink/cloudstorage/dml_writers.gopkg/sink/cloudstorage/config.gopkg/sink/cloudstorage/config_test.go
|
/test all |
|
/test all |
- extract shared txn post-flush row callback helper for kafka/pulsar/redo\n- keep sink-specific wake behavior explicit via dispatcher test\n- streamline callback coverage to avoid redundant sink-level assertions
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
downstreamadapter/sink/mock_sink.go (1)
31-38:⚠️ Potential issue | 🟠 MajorDo not invoke
PostEnqueuewhile holdings.mu.
event.PostEnqueue()runs external callbacks; executing it under the sink mutex can deadlock via re-entrant/mock interactions.💡 Proposed fix
func (s *mockSink) AddDMLEvent(event *commonEvent.DMLEvent) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.Lock() s.dmls = append(s.dmls, event) + shouldPostEnqueue := s.sinkType == common.CloudStorageSinkType + s.mu.Unlock() + // CloudStorage sink wakes dispatcher on enqueue stage. - if s.sinkType == common.CloudStorageSinkType { + if shouldPostEnqueue { event.PostEnqueue() } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/mock_sink.go` around lines 31 - 38, The AddDMLEvent method currently calls event.PostEnqueue() while holding s.mu, which can deadlock because PostEnqueue runs external callbacks; modify mockSink.AddDMLEvent so it only holds s.mu to append to s.dmls and determine whether PostEnqueue needs to be called (check s.sinkType == common.CloudStorageSinkType), then release the lock and call event.PostEnqueue() outside the critical section; reference the mockSink.AddDMLEvent method, s.mu, s.dmls, s.sinkType and PostEnqueue to locate the change.
🧹 Nitpick comments (1)
downstreamadapter/sink/helper/row_callback_test.go (1)
24-42: Add a regression test for zero-row callback count.Please add a
totalCount == 0case to verify txn flush completion behavior explicitly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/helper/row_callback_test.go` around lines 24 - 42, Add a test case in TestTxnPostFlushRowCallback to cover totalCount == 0: create an event and register the post-flush hook via event.AddPostFlushFunc (same flushCount variable), then construct rowCallback := NewTxnPostFlushRowCallback(event, 0), invoke rowCallback() once and assert flushCount == 1 to verify flush completion behavior for zero-row transactions; keep this adjacent to the existing cases for clarity.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/helper/row_callback.go`:
- Around line 23-30: NewTxnPostFlushRowCallback currently never calls
event.PostFlush when totalCount == 0; update the function so that if totalCount
is zero it triggers event.PostFlush immediately (or returns a no-op func after
calling PostFlush) to ensure txn flush completion is not missed. Locate
NewTxnPostFlushRowCallback and modify the logic around totalCount and
calledCount so a zero totalCount results in an immediate PostFlush call (or the
returned closure becomes a harmless no-op) while preserving the existing atomic
increment behavior for totalCount > 0.
---
Outside diff comments:
In `@downstreamadapter/sink/mock_sink.go`:
- Around line 31-38: The AddDMLEvent method currently calls event.PostEnqueue()
while holding s.mu, which can deadlock because PostEnqueue runs external
callbacks; modify mockSink.AddDMLEvent so it only holds s.mu to append to s.dmls
and determine whether PostEnqueue needs to be called (check s.sinkType ==
common.CloudStorageSinkType), then release the lock and call event.PostEnqueue()
outside the critical section; reference the mockSink.AddDMLEvent method, s.mu,
s.dmls, s.sinkType and PostEnqueue to locate the change.
---
Nitpick comments:
In `@downstreamadapter/sink/helper/row_callback_test.go`:
- Around line 24-42: Add a test case in TestTxnPostFlushRowCallback to cover
totalCount == 0: create an event and register the post-flush hook via
event.AddPostFlushFunc (same flushCount variable), then construct rowCallback :=
NewTxnPostFlushRowCallback(event, 0), invoke rowCallback() once and assert
flushCount == 1 to verify flush completion behavior for zero-row transactions;
keep this adjacent to the existing cases for clarity.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
downstreamadapter/dispatcher/basic_dispatcher.godownstreamadapter/dispatcher/event_dispatcher_test.godownstreamadapter/sink/helper/row_callback.godownstreamadapter/sink/helper/row_callback_test.godownstreamadapter/sink/kafka/sink.godownstreamadapter/sink/mock_sink.godownstreamadapter/sink/pulsar/sink.godownstreamadapter/sink/redo/sink.go
🚧 Files skipped from review as they are similar to previous changes (1)
- downstreamadapter/dispatcher/basic_dispatcher.go
|
/test all |
|
@3AceShowHand: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #4269
The dispatcher wake callback for DML was previously tied to
PostFlush, so wake latency could be dominated by sink flush latency. This coupled scheduling progress too tightly with remote flush speed.This PR introduces a two-stage ack model for DML:
It also fixes three follow-up correctness/semantics issues tracked in #4269:
PostEnqueueexactly-once guard was not concurrency-safe.What is changed and how it works?
DMLEvent:PostTxnEnqueued []func()PostEnqueue()AddPostEnqueueFunc(...)ClearPostEnqueueFunc(...)PostFlush()still callsPostEnqueue()first as fallback, butPostEnqueuenow uses atomic CAS to guarantee exactly-once execution under concurrency.AddPostFlushFunc(...)toAddPostEnqueueFunc(...)inBasicDispatcher.handleEvents.FilterDMLEventrebuild path now preserves enqueue callbacks/state when constructing filteredDMLEvent.PostEnqueue/PostFlushrace and exactly-once behaviorBehavioral intent:
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No known compatibility break:
Expected performance effect:
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit
New Features
Improvements
Tests