cdc: add active-active failpoint to construct and record data loss, data inconsistence, data redandunt and lww violation#4245
Conversation
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds failpoint-driven per-message failpoints for CloudStorage sink (drop/mutate), helpers to split/convert DML events to row records, a failpoint recorder that writes JSONL row records, canal-json mutation logic for tests, and MySQL sink integration to emit row records under an LWW bypass failpoint. Changes
Sequence DiagramsequenceDiagram
participant Enc as CloudStorageEncoder
participant FP as FailpointSwitch
participant Ext as RowExtractor
participant Mut as MessageMutator
participant Rec as FailpointRecorder
participant File as JSONLFile
Enc->>FP: check cloudStorageSinkMessageFailpointSwitch
alt failpoint active
FP->>Ext: dmlEventToRowRecords / splitRowRecordsByMessages
Ext-->>FP: per-message RowRecord slices
FP->>Mut: applyFailpointsOnEncodedMessages (drop/mutate)
Mut-->>FP: mutated/dropped messages + RowRecords
FP->>Rec: Write(failpoint, rows)
Rec->>File: ensureFile() and append JSONL
File-->>Rec: write ACK
Rec-->>FP: done
else inactive
FP-->>Enc: continue normal flow
end
Enc-->>Downstream: send message(s)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @Leavrth, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the testing capabilities for active-active replication scenarios by introducing several failpoints across the TiCDC sink components. These failpoints are designed to programmatically inject conditions that simulate various data integrity issues, such as data loss, inconsistency, redundancy, and Last-Write-Wins violations. The changes also include a new utility to record these simulated events, facilitating the development and validation of multi-cluster consistency checkers. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces several failpoints to simulate various data consistency issues in active-active replication scenarios, such as data loss, inconsistency, and LWW violations. A new package failpointrecord is added to log these injected failures, which is a good approach for external verification. The changes are well-contained and focused on improving testability.
I've found a couple of areas for improvement:
- A data race in the new
failpointrecordpackage that should be fixed. - A potential issue in the
mutateMessageValueForFailpointlogic where an ignored error could lead to incorrect test behavior.
Details are in the specific comments. Overall, this is a valuable addition for testing the robustness of the system.
| func Write(failpoint string, rows []RowRecord) { | ||
| if disabled { | ||
| return | ||
| } | ||
| ensureFile() | ||
| if file == nil { | ||
| return | ||
| } |
There was a problem hiding this comment.
There's a data race on the disabled package-level variable. It's read on line 83 without synchronization, while it might be written concurrently within ensureFile (lines 64, 72). This can lead to unpredictable behavior.
You can fix this race and simplify the code by removing the disabled variable entirely. The check if file == nil is sufficient to guard against writing when the file isn't opened, as os.OpenFile returns nil on failure.
To fix this, please apply the suggestion below, and also remove the disabled variable declaration on line 57 and the assignments to it on lines 64 and 72.
func Write(failpoint string, rows []RowRecord) {
ensureFile()
if file == nil {
return
}| if raw, ok := m["pkNames"]; ok { | ||
| _ = json.Unmarshal(raw, &pkNames) | ||
| } |
There was a problem hiding this comment.
The error from json.Unmarshal is ignored here. If pkNames is present in the JSON but malformed, unmarshaling will fail, pkNames will be empty, and pkSet will be empty. This will cause the logic below to potentially mutate a primary key column, which contradicts the goal of this failpoint (mutating a non-PK column to cause a 'lost' and 'redundant' row with the same PK).
To ensure the failpoint works as intended, you should handle this error, for example by skipping this message part.
if raw, ok := m["pkNames"]; ok {
if err := json.Unmarshal(raw, &pkNames); err != nil {
continue
}
}There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (4)
downstreamadapter/sink/cloudstorage/writer.go (1)
265-279: Non-deterministic column selection due to Go map iteration order.
for col := range rowiterates map keys in random order, so the "first non-PK column" mutated will differ across runs. This is acceptable for a failpoint (the goal is to introduce some inconsistency), but if reproducibility matters for debugging, consider sorting the keys first.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 265 - 279, The loop that picks "the first non-PK column" uses Go map iteration (for col := range row) which is intentionally non-deterministic; make selection deterministic by listing the keys of each row into a slice, sorting that slice, then iterating the sorted keys to find the first key not in pkSet and set row[col] = nil (update the existing mutated flag logic). Locate the block using variables/data names "data", "row", "pkSet", and "mutated" and replace the map-range with a sorted-key approach so the same column is chosen consistently across runs.pkg/sink/mysql/mysql_writer_dml.go (1)
194-222: Duplicated row-record conversion logic across packages.
dmlEventsToRowRecordshere anddmlEventToRowRecordsinencoding_group.godo essentially the same thing — extract PK values from DML events intofailpointrecord.RowRecordslices. They differ only in how PK columns are discovered (findPKColumnsvsPrimaryKeyColumn()) and singular vs plural event input.Consider extracting a shared helper (e.g., into the
failpointrecordpackage or a common utility) to keep these in sync and avoid divergence.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 194 - 222, Both dmlEventsToRowRecords and dmlEventToRowRecords duplicate logic for extracting primary key values into failpointrecord.RowRecord; extract a single helper (e.g., failpointrecord.FromDMLEvents or failpointrecord.FromDMLEvent) that accepts either a single *commonEvent.DMLEvent or a slice and a PK-discovery function, move the loop and PK extraction into that helper, and then update dmlEventsToRowRecords and dmlEventToRowRecords to call the new helper; unify PK column discovery by using a single accessor (wrap findPKColumns/PrimaryKeyColumn behind a small adapter if needed) so both callers pass the same PK column format to the helper.downstreamadapter/sink/cloudstorage/encoding_group.go (1)
128-146: BothcloudStorageSinkDropMessageandcloudStorageSinkMutateValuefire on every message.When both failpoints are enabled, every message is first dropped (Key/Value set to nil, row count zeroed) and then mutated. Mutating a nil Value is a no-op in
mutateMessageValueForFailpoint(it returns early on empty Value), so the second failpoint becomes dead code when the first is active. This is likely fine for independent use, but worth documenting that these two failpoints are mutually exclusive in practice.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 128 - 146, Both failpoints cloudStorageSinkDropMessage and cloudStorageSinkMutateValue currently run for every message, causing cloudStorageSinkMutateValue to be a no-op when cloudStorageSinkDropMessage has already nulled the message; fix this by making the mutate failpoint conditional so it only runs when the message still has a payload (e.g., check msg.Value != nil and msg.RowsCount()>0 before calling mutateMessageValueForFailpoint), or alternatively swap/chain the failpoints so mutate runs before drop and document the mutual exclusivity in comments next to cloudStorageSinkDropMessage/cloudStorageSinkMutateValue.pkg/sink/failpointrecord/record.go (1)
53-77: NoClosefunction — file handle leaks until process exit.The file opened by
ensureFileis never closed. While for a long-running daemon this is mostly benign (the OS reclaims on exit), adding aClose()function would be good hygiene and helpful for tests that want to verify file contents afterWritecalls.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/failpointrecord/record.go` around lines 53 - 77, Add a safe Close function that releases the opened file handle to avoid leaks: implement Close() which acquires the package mutex (mu), checks if the package-level file is non-nil, calls file.Close(), sets file = nil (and optionally disabled = true if you want to prevent further writes), and returns any close error; make Close idempotent so repeated calls are safe. Locate the package-level vars (initOnce, mu, file, disabled) and the ensureFile function to place Close in the same file; if you need ensureFile to be able to reopen after Close, reset initOnce (sync.Once) accordingly before reusing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 172-201: The loop in dmlEventToRowRecords advances via
event.GetNextRow() but never resets the event cursor, so if the event was
previously consumed (e.g., by encoder.AppendTxnEvent/Build) the loop will
immediately end; call event.Rewind() on the event (use the existing event
variable) before entering the for { row, ok := event.GetNextRow() ... } loop so
iteration starts from the first row; keep the existing nil checks and return
behavior.
In `@pkg/sink/failpointrecord/record.go`:
- Around line 53-58: The read of the package-level variable `disabled` is
unsynchronized and can race with its write inside the `initOnce.Do` used by
`ensureFile()`; to fix this move the call to `ensureFile()` before any bare
reads of `disabled` (or use atomic loads/stores on `disabled`), e.g., ensure
callers of `ensureFile()` (the code reading `disabled` around the current checks
at lines referencing `disabled`) always call `ensureFile()` first so the
`sync.Once` establishes the happens‑before relationship and the subsequent read
of `disabled` is safe; update all affected spots (the checks around `disabled`
and the `ensureFile()`/`initOnce.Do` usage) accordingly.
---
Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 128-146: Both failpoints cloudStorageSinkDropMessage and
cloudStorageSinkMutateValue currently run for every message, causing
cloudStorageSinkMutateValue to be a no-op when cloudStorageSinkDropMessage has
already nulled the message; fix this by making the mutate failpoint conditional
so it only runs when the message still has a payload (e.g., check msg.Value !=
nil and msg.RowsCount()>0 before calling mutateMessageValueForFailpoint), or
alternatively swap/chain the failpoints so mutate runs before drop and document
the mutual exclusivity in comments next to
cloudStorageSinkDropMessage/cloudStorageSinkMutateValue.
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 265-279: The loop that picks "the first non-PK column" uses Go map
iteration (for col := range row) which is intentionally non-deterministic; make
selection deterministic by listing the keys of each row into a slice, sorting
that slice, then iterating the sorted keys to find the first key not in pkSet
and set row[col] = nil (update the existing mutated flag logic). Locate the
block using variables/data names "data", "row", "pkSet", and "mutated" and
replace the map-range with a sorted-key approach so the same column is chosen
consistently across runs.
In `@pkg/sink/failpointrecord/record.go`:
- Around line 53-77: Add a safe Close function that releases the opened file
handle to avoid leaks: implement Close() which acquires the package mutex (mu),
checks if the package-level file is non-nil, calls file.Close(), sets file = nil
(and optionally disabled = true if you want to prevent further writes), and
returns any close error; make Close idempotent so repeated calls are safe.
Locate the package-level vars (initOnce, mu, file, disabled) and the ensureFile
function to place Close in the same file; if you need ensureFile to be able to
reopen after Close, reset initOnce (sync.Once) accordingly before reusing.
In `@pkg/sink/mysql/mysql_writer_dml.go`:
- Around line 194-222: Both dmlEventsToRowRecords and dmlEventToRowRecords
duplicate logic for extracting primary key values into
failpointrecord.RowRecord; extract a single helper (e.g.,
failpointrecord.FromDMLEvents or failpointrecord.FromDMLEvent) that accepts
either a single *commonEvent.DMLEvent or a slice and a PK-discovery function,
move the loop and PK extraction into that helper, and then update
dmlEventsToRowRecords and dmlEventToRowRecords to call the new helper; unify PK
column discovery by using a single accessor (wrap findPKColumns/PrimaryKeyColumn
behind a small adapter if needed) so both callers pass the same PK column format
to the helper.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
downstreamadapter/sink/cloudstorage/encoding_group.godownstreamadapter/sink/cloudstorage/writer.gopkg/sink/failpointrecord/record.gopkg/sink/mysql/mysql_writer_dml.go
| var ( | ||
| initOnce sync.Once | ||
| mu sync.Mutex | ||
| file *os.File | ||
| disabled bool | ||
| ) |
There was a problem hiding this comment.
Data race on disabled: unsynchronized read before ensureFile().
disabled is written inside initOnce.Do (line 64/72) but read on line 83 outside any synchronization. The Go memory model only guarantees visibility for subsequent once.Do callers, not for bare reads of disabled. The -race detector will flag this.
The simplest fix: always call ensureFile() first, then check disabled.
🐛 Proposed fix
-var (
- initOnce sync.Once
- mu sync.Mutex
- file *os.File
- disabled bool
-)
+var (
+ initOnce sync.Once
+ mu sync.Mutex
+ file *os.File
+ disabled atomic.Bool
+)
func ensureFile() {
initOnce.Do(func() {
path := os.Getenv(envKey)
if path == "" {
- disabled = true
+ disabled.Store(true)
return
}
var err error
file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Warn("failed to open failpoint record file, recording disabled",
zap.String("path", path), zap.Error(err))
- disabled = true
+ disabled.Store(true)
return
}
log.Info("failpoint record file opened", zap.String("path", path))
})
}
func Write(failpoint string, rows []RowRecord) {
- if disabled {
+ if disabled.Load() {
return
}
ensureFile()
- if file == nil {
+ if disabled.Load() || file == nil {
return
}This requires adding "sync/atomic" to imports (or using go.uber.org/atomic which is already a transitive dep in this repo).
Alternatively, the simpler approach: just move ensureFile() before the disabled check and drop the early-return shortcut, relying on sync.Once for the fast path:
♻️ Simpler alternative
func Write(failpoint string, rows []RowRecord) {
- if disabled {
- return
- }
ensureFile()
- if file == nil {
+ if disabled || file == nil {
return
}After ensureFile() returns, disabled is safely visible because sync.Once guarantees the happens-before relationship for subsequent Do calls (which are no-ops but still synchronize).
Also applies to: 82-88
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/sink/failpointrecord/record.go` around lines 53 - 58, The read of the
package-level variable `disabled` is unsynchronized and can race with its write
inside the `initOnce.Do` used by `ensureFile()`; to fix this move the call to
`ensureFile()` before any bare reads of `disabled` (or use atomic loads/stores
on `disabled`), e.g., ensure callers of `ensureFile()` (the code reading
`disabled` around the current checks at lines referencing `disabled`) always
call `ensureFile()` first so the `sync.Once` establishes the happens‑before
relationship and the subsequent read of `disabled` is safe; update all affected
spots (the checks around `disabled` and the `ensureFile()`/`initOnce.Do` usage)
accordingly.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
downstreamadapter/sink/cloudstorage/encoding_group.go (1)
178-188:⚠️ Potential issue | 🔴 CriticalRewind must happen before row iteration.
dmlEventToRowRecordsstill starts reading from the current cursor. Ifeventwas already consumed before this call, the firstGetNextRow()returnsok == false, and you return no records for failpoint writing.🐛 Proposed fix
func dmlEventToRowRecords(event *commonEvent.DMLEvent) []failpointrecord.RowRecord { if event == nil || event.TableInfo == nil { return nil } + event.Rewind() indexes, columns := (&commonEvent.RowEvent{TableInfo: event.TableInfo}).PrimaryKeyColumn() rowRecords := make([]failpointrecord.RowRecord, 0, event.Len()) for { row, ok := event.GetNextRow() if !ok { event.Rewind() break }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 178 - 188, The function dmlEventToRowRecords reads rows starting from the event's current cursor, but Rewind() is called only after iteration, so if the event was already consumed it yields no records; call event.Rewind() before the loop (before the first GetNextRow()) to reset the cursor, then iterate with GetNextRow() and keep the existing event.Rewind() placement if you still need to reset after reading; update dmlEventToRowRecords accordingly (references: dmlEventToRowRecords, event.GetNextRow, event.Rewind).
🧹 Nitpick comments (2)
downstreamadapter/sink/cloudstorage/writer.go (1)
226-228: Align function comment with actual mutation strategy.The comment says the code mutates the first non-PK column in
data[0], but implementation selects a non-PK column viaselectColumnToMutateand can mutate a later row if needed.📝 Proposed comment fix
-// 2. Pick the first non-PK column in data[0] and replace its value with nil. +// 2. Find the first row containing a non-PK column, choose one non-PK column, +// and replace its value with nil.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 226 - 228, The function comment is inaccurate: instead of always mutating the first non-PK column in data[0], the code uses selectColumnToMutate to choose a column which may be found in a later row and then mutates that selected row/column. Update the comment in writer.go to state that the JSON is parsed to extract "pkNames" and "data", selectColumnToMutate is used to pick a non-PK column (which may come from a later row), and the chosen cell's value is set to nil before re-marshaling the message, referencing selectColumnToMutate and the mutation step so future readers understand the actual mutation strategy.downstreamadapter/sink/cloudstorage/encoding_group.go (1)
129-144: Use neutral log messages and put failpoint identity in fields.These warning messages currently embed failpoint/function names in the log text. Move that into a structured field and keep message text generic.
As per coding guidelines: "Use structured logs via `github.com/pingcap/log` with `zap` fields in Go; log message strings should not include function names and should avoid hyphens (use spaces instead)".♻️ Proposed fix
- log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss", + log.Warn("dropping message to simulate data loss", + zap.String("failpoint", "cloudStorageSinkDropMessage"), zap.String("keyspace", eg.changeFeedID.Keyspace()), zap.Stringer("changefeed", eg.changeFeedID.ID()), zap.Any("rows", rowRecords)) @@ - log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency", + log.Warn("mutating message value to simulate data inconsistency", + zap.String("failpoint", "cloudStorageSinkMutateValue"), zap.String("keyspace", eg.changeFeedID.Keyspace()), zap.Stringer("changefeed", eg.changeFeedID.ID()), zap.Any("rows", rowRecords))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 129 - 144, The log messages inside the failpoint handlers in encoding_group.go (the failpoint.Inject blocks for "cloudStorageSinkDropMessage" and "cloudStorageSinkMutateValue") embed the failpoint/function names in the message text; change them to neutral, hyphen-free messages (e.g., "simulating data loss" and "simulating data inconsistency") and move the failpoint identity into a structured zap field such as zap.String("failpoint","cloudStorageSinkDropMessage") or zap.String("failpoint","cloudStorageSinkMutateValue"), keeping existing structured fields like zap.String("keyspace", eg.changeFeedID.Keyspace()), zap.Stringer("changefeed", eg.changeFeedID.ID()), and zap.Any("rows", rowRecords) unchanged; ensure you update both log.Warn calls referenced above to follow the same pattern.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 178-188: The function dmlEventToRowRecords reads rows starting
from the event's current cursor, but Rewind() is called only after iteration, so
if the event was already consumed it yields no records; call event.Rewind()
before the loop (before the first GetNextRow()) to reset the cursor, then
iterate with GetNextRow() and keep the existing event.Rewind() placement if you
still need to reset after reading; update dmlEventToRowRecords accordingly
(references: dmlEventToRowRecords, event.GetNextRow, event.Rewind).
---
Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 129-144: The log messages inside the failpoint handlers in
encoding_group.go (the failpoint.Inject blocks for "cloudStorageSinkDropMessage"
and "cloudStorageSinkMutateValue") embed the failpoint/function names in the
message text; change them to neutral, hyphen-free messages (e.g., "simulating
data loss" and "simulating data inconsistency") and move the failpoint identity
into a structured zap field such as
zap.String("failpoint","cloudStorageSinkDropMessage") or
zap.String("failpoint","cloudStorageSinkMutateValue"), keeping existing
structured fields like zap.String("keyspace", eg.changeFeedID.Keyspace()),
zap.Stringer("changefeed", eg.changeFeedID.ID()), and zap.Any("rows",
rowRecords) unchanged; ensure you update both log.Warn calls referenced above to
follow the same pattern.
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 226-228: The function comment is inaccurate: instead of always
mutating the first non-PK column in data[0], the code uses selectColumnToMutate
to choose a column which may be found in a later row and then mutates that
selected row/column. Update the comment in writer.go to state that the JSON is
parsed to extract "pkNames" and "data", selectColumnToMutate is used to pick a
non-PK column (which may come from a later row), and the chosen cell's value is
set to nil before re-marshaling the message, referencing selectColumnToMutate
and the mutation step so future readers understand the actual mutation strategy.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
downstreamadapter/sink/cloudstorage/encoding_group.godownstreamadapter/sink/cloudstorage/writer.godownstreamadapter/sink/cloudstorage/writer_test.go
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
downstreamadapter/sink/cloudstorage/writer_test.go (1)
136-169: Test only covers thestringrepresentation of_tidb_origin_ts; the numeric JSON case is untestedThe
_tidb_origin_tsvalue in the test is"100"(a quoted JSON string). A real canal-json payload may encode it as a JSON number (100, no quotes), whichjson.Unmarshaldecodes asfloat64. Thefloat64branch ofincrementOriginTSValuereturnsfloat64(101), whichjson.Marshalrenders as101— sobytes.Contains(msg.Value, []byte("_tidb_origin_ts":"101"))would fail for that input.Consider adding a second sub-test (or a table-driven case) with
"_tidb_origin_ts":100(unquoted) to exercise that branch and pin the expected serialization format.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/writer_test.go` around lines 136 - 169, Add a second sub-test to TestMutateMessageValueForFailpointRecordClassification that supplies an unquoted numeric _tidb_origin_ts (e.g. ..."_tidb_origin_ts":100...) to exercise the float64 branch in incrementOriginTSValue and mutateMessageValueForFailpoint; assert mutatedRows and originTsMutatedRows lengths and contents as in the existing assertions, and change the bytes.Contains check for the numeric case to expect the serialized numeric form (e.g. `"_tidb_origin_ts":101` without quotes) so the test verifies both string and numeric JSON encodings.downstreamadapter/sink/cloudstorage/writer.go (1)
332-349:crypto/randis unnecessarily heavy for a failpoint-only path
crypto/randis an OS-level entropy source intended for cryptographic use. Inside a failpoint block that deliberately corrupts data for testing,math/rand(or even a fixed index) is sufficient and avoids the overhead and potential error path ofcryptorand.Int.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 332 - 349, The function selectColumnToMutate uses crypto/rand which is unnecessary for a failpoint path; replace the cryptorand.Int call with math/rand's fast non-crypto RNG (e.g., rand.Intn(len(columns))) or a deterministic/fixed index for tests, remove the cryptorand error branch and simply pick an index from columns, and update imports to use math/rand (and seed it at init if needed for variability); refer to selectColumnToMutate and the cryptorand.Int usage when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 251-273: The code currently continues on
parse/unmarshal/data-missing errors without advancing rowOffset, which
desynchronizes it from the parallel rowRecords slice; update the error/continue
paths so rowOffset is always advanced: when json.Unmarshal(part, &m) fails
(failed part), increment rowOffset by 1 before continue; when "data" is missing
or unmarshal of data fails, likewise increment rowOffset by 1 before continue;
when data is successfully unmarshaled, after processing that part increment
rowOffset by len(data) (or otherwise by the number of rows that part
represents). Apply these changes around the json.Unmarshal(part, &m) block and
the "data" extraction so mutatedRowOffset + rowOffset stays aligned for
extractMutatedRowRecordsByOffset and references to rowRecords.
- Around line 293-298: The conditional "if mutated { break }" is dead because
"mutated" is set just above; replace that guarded break with a plain "break"
(remove the redundant if) in the loop where mutated, mutatedRowOffset, and
mutatedColumn are assigned (the block using rowIdx and col) so control exits
immediately without the always-true guard.
- Around line 368-407: The function incrementOriginTSValue contains many
unreachable integer/uint/json.Number branches; keep only the cases that
json.Unmarshal can produce (string and float64) and handle float64 safely: in
incrementOriginTSValue, remove the int*/uint*/json.Number cases, keep the string
case (parsing with strconv.ParseUint) and the float64 case but change it to
check that the float64 represents an exact non-negative integer within uint64
safe range (no fractional part and <= math.MaxInt53 or use 1<<53-1), then
increment and return as a string (or json.Number) formatted without a decimal
point; if the float64 is not an exact integer or out of safe range, return
nil,false. Ensure you reference the incrementOriginTSValue function and update
only its switch handling.
---
Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/writer_test.go`:
- Around line 136-169: Add a second sub-test to
TestMutateMessageValueForFailpointRecordClassification that supplies an unquoted
numeric _tidb_origin_ts (e.g. ..."_tidb_origin_ts":100...) to exercise the
float64 branch in incrementOriginTSValue and mutateMessageValueForFailpoint;
assert mutatedRows and originTsMutatedRows lengths and contents as in the
existing assertions, and change the bytes.Contains check for the numeric case to
expect the serialized numeric form (e.g. `"_tidb_origin_ts":101` without quotes)
so the test verifies both string and numeric JSON encodings.
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 332-349: The function selectColumnToMutate uses crypto/rand which
is unnecessary for a failpoint path; replace the cryptorand.Int call with
math/rand's fast non-crypto RNG (e.g., rand.Intn(len(columns))) or a
deterministic/fixed index for tests, remove the cryptorand error branch and
simply pick an index from columns, and update imports to use math/rand (and seed
it at init if needed for variability); refer to selectColumnToMutate and the
cryptorand.Int usage when making the change.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
downstreamadapter/sink/cloudstorage/writer.godownstreamadapter/sink/cloudstorage/writer_test.go
| if err := json.Unmarshal(part, &m); err != nil { | ||
| continue | ||
| } | ||
|
|
||
| // Extract pkNames so we can skip PK columns. | ||
| var pkNames []string | ||
| if raw, ok := m["pkNames"]; ok { | ||
| _ = json.Unmarshal(raw, &pkNames) | ||
| } | ||
| pkSet := make(map[string]struct{}, len(pkNames)) | ||
| for _, pk := range pkNames { | ||
| pkSet[pk] = struct{}{} | ||
| } | ||
|
|
||
| // Extract the "data" array. | ||
| rawData, ok := m["data"] | ||
| if !ok { | ||
| continue | ||
| } | ||
| var data []map[string]any | ||
| if err := json.Unmarshal(rawData, &data); err != nil || len(data) == 0 { | ||
| continue | ||
| } |
There was a problem hiding this comment.
rowOffset is not incremented when a part fails to parse, corrupting subsequent offset lookups
When json.Unmarshal(part, &m) fails (line 251), when "data" is absent (line 267), or when the data array fails to unmarshal or is empty (line 271), the code continues without adding any value to rowOffset. For every subsequent successfully-parsed part, rowOffset + mutatedRowOffset will be smaller than the actual row position, causing extractMutatedRowRecordsByOffset to return the wrong RowRecord.
Because rowRecords is built in parallel with parts by the caller, a failed parse implies the matching rowRecords entry still occupies a slot. The offset must always advance by the expected number of rows (or at least by 1 per failed part) to stay aligned.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 251 - 273, The
code currently continues on parse/unmarshal/data-missing errors without
advancing rowOffset, which desynchronizes it from the parallel rowRecords slice;
update the error/continue paths so rowOffset is always advanced: when
json.Unmarshal(part, &m) fails (failed part), increment rowOffset by 1 before
continue; when "data" is missing or unmarshal of data fails, likewise increment
rowOffset by 1 before continue; when data is successfully unmarshaled, after
processing that part increment rowOffset by len(data) (or otherwise by the
number of rows that part represents). Apply these changes around the
json.Unmarshal(part, &m) block and the "data" extraction so mutatedRowOffset +
rowOffset stays aligned for extractMutatedRowRecordsByOffset and references to
rowRecords.
| mutated = true | ||
| mutatedRowOffset = rowIdx | ||
| mutatedColumn = col | ||
| if mutated { | ||
| break | ||
| } |
There was a problem hiding this comment.
Dead code: if mutated { break } is always true
mutated is unconditionally set to true on line 293; the if mutated guard is always satisfied. This is a no-op conditional that should be replaced with a plain break.
🐛 Proposed fix
mutated = true
mutatedRowOffset = rowIdx
mutatedColumn = col
- if mutated {
- break
- }
+ break📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| mutated = true | |
| mutatedRowOffset = rowIdx | |
| mutatedColumn = col | |
| if mutated { | |
| break | |
| } | |
| mutated = true | |
| mutatedRowOffset = rowIdx | |
| mutatedColumn = col | |
| break |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 293 - 298, The
conditional "if mutated { break }" is dead because "mutated" is set just above;
replace that guarded break with a plain "break" (remove the redundant if) in the
loop where mutated, mutatedRowOffset, and mutatedColumn are assigned (the block
using rowIdx and col) so control exits immediately without the always-true
guard.
| func incrementOriginTSValue(v any) (any, bool) { | ||
| switch value := v.(type) { | ||
| case string: | ||
| originTS, err := strconv.ParseUint(value, 10, 64) | ||
| if err != nil { | ||
| return nil, false | ||
| } | ||
| return strconv.FormatUint(originTS+1, 10), true | ||
| case float64: | ||
| return value + 1, true | ||
| case json.Number: | ||
| originTS, err := value.Int64() | ||
| if err != nil { | ||
| return nil, false | ||
| } | ||
| return json.Number(strconv.FormatInt(originTS+1, 10)), true | ||
| case int: | ||
| return value + 1, true | ||
| case int8: | ||
| return value + 1, true | ||
| case int16: | ||
| return value + 1, true | ||
| case int32: | ||
| return value + 1, true | ||
| case int64: | ||
| return value + 1, true | ||
| case uint: | ||
| return value + 1, true | ||
| case uint8: | ||
| return value + 1, true | ||
| case uint16: | ||
| return value + 1, true | ||
| case uint32: | ||
| return value + 1, true | ||
| case uint64: | ||
| return value + 1, true | ||
| default: | ||
| return nil, false | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Most branches in incrementOriginTSValue are unreachable
json.Unmarshal into a map[string]any always decodes JSON numbers as float64; it never produces json.Number, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, or uint64. The json.Number type is only produced by json.Decoder when UseNumber() is set, which is not used here.
Consequently, only the string case (line 370) and the float64 case (line 376) are ever reached at runtime; the remaining ten branches are dead code and add maintenance burden.
A secondary concern: the float64 path can silently lose precision for _tidb_origin_ts values larger than 2^53 (the float64 mantissa limit), and json.Marshal may render it with a decimal point (e.g., 101 vs 101.0) depending on the value.
♻️ Proposed simplification
func incrementOriginTSValue(v any) (any, bool) {
switch value := v.(type) {
case string:
originTS, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return nil, false
}
return strconv.FormatUint(originTS+1, 10), true
case float64:
- return value + 1, true
- case json.Number:
- originTS, err := value.Int64()
- if err != nil {
- return nil, false
- }
- return json.Number(strconv.FormatInt(originTS+1, 10)), true
- case int:
- return value + 1, true
- case int8:
- return value + 1, true
- case int16:
- return value + 1, true
- case int32:
- return value + 1, true
- case int64:
- return value + 1, true
- case uint:
- return value + 1, true
- case uint8:
- return value + 1, true
- case uint16:
- return value + 1, true
- case uint32:
- return value + 1, true
- case uint64:
- return value + 1, true
+ // json.Unmarshal into map[string]any always uses float64 for numbers.
+ // Format as integer string to avoid floating-point representation noise.
+ return strconv.FormatUint(uint64(value)+1, 10), true
default:
return nil, false
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func incrementOriginTSValue(v any) (any, bool) { | |
| switch value := v.(type) { | |
| case string: | |
| originTS, err := strconv.ParseUint(value, 10, 64) | |
| if err != nil { | |
| return nil, false | |
| } | |
| return strconv.FormatUint(originTS+1, 10), true | |
| case float64: | |
| return value + 1, true | |
| case json.Number: | |
| originTS, err := value.Int64() | |
| if err != nil { | |
| return nil, false | |
| } | |
| return json.Number(strconv.FormatInt(originTS+1, 10)), true | |
| case int: | |
| return value + 1, true | |
| case int8: | |
| return value + 1, true | |
| case int16: | |
| return value + 1, true | |
| case int32: | |
| return value + 1, true | |
| case int64: | |
| return value + 1, true | |
| case uint: | |
| return value + 1, true | |
| case uint8: | |
| return value + 1, true | |
| case uint16: | |
| return value + 1, true | |
| case uint32: | |
| return value + 1, true | |
| case uint64: | |
| return value + 1, true | |
| default: | |
| return nil, false | |
| } | |
| } | |
| func incrementOriginTSValue(v any) (any, bool) { | |
| switch value := v.(type) { | |
| case string: | |
| originTS, err := strconv.ParseUint(value, 10, 64) | |
| if err != nil { | |
| return nil, false | |
| } | |
| return strconv.FormatUint(originTS+1, 10), true | |
| case float64: | |
| // json.Unmarshal into map[string]any always uses float64 for numbers. | |
| // Format as integer string to avoid floating-point representation noise. | |
| return strconv.FormatUint(uint64(value)+1, 10), true | |
| default: | |
| return nil, false | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 368 - 407, The
function incrementOriginTSValue contains many unreachable
integer/uint/json.Number branches; keep only the cases that json.Unmarshal can
produce (string and float64) and handle float64 safely: in
incrementOriginTSValue, remove the int*/uint*/json.Number cases, keep the string
case (parsing with strconv.ParseUint) and the float64 case but change it to
check that the float64 represents an exact non-negative integer within uint64
safe range (no fractional part and <= math.MaxInt53 or use 1<<53-1), then
increment and return as a string (or json.Number) formatted without a decimal
point; if the float64 is not an exact integer or out of safe range, return
nil,false. Ensure you reference the incrementOriginTSValue function and update
only its switch handling.
There was a problem hiding this comment.
♻️ Duplicate comments (3)
downstreamadapter/sink/cloudstorage/writer.go (3)
294-299:if mutated { break }is dead code — existing issue still unresolved
mutatedis unconditionally set totrueat line 294 immediately before the guard. Theif mutatedbranch is always taken.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 294 - 299, The code currently sets the flag mutated = true unconditionally just before checking if mutated to break, making the guard dead; update the logic so mutated is only set when an actual mutation occurs (i.e., set mutated = true and assign mutatedRowOffset/mutatedColumn inside the branch that detects/permforms a mutation) or, if the intent was to always break, remove the redundant if-check and keep a direct break; locate the variables mutated, mutatedRowOffset, and mutatedColumn in the writer loop and move the flag assignment into the mutation branch (or simplify by removing the conditional) so the break behaves as intended.
380-419: Unreachable branches inincrementOriginTSValue— existing issue still unresolved
json.Unmarshalintomap[string]anyalways decodes JSON numbers asfloat64; branches forjson.Number,int,int8,int16,int32,int64,uint,uint8,uint16,uint32,uint64are never reachable through this call path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 380 - 419, The function incrementOriginTSValue contains many integer and uint type branches (json.Number, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64) that are unreachable when JSON is unmarshaled into map[string]any because numbers become float64; remove those dead branches and simplify the implementation to handle string, float64 and json.Number only (or, alternatively, if you intended to support json.Number, change the JSON decoding path to use json.Decoder with UseNumber so json.Number values are produced). Update incrementOriginTSValue to convert/increment float64 and string safely and keep json.Number handling only if upstream decoding uses UseNumber.
252-274:rowOffsetnot incremented on parse/unmarshal failures — existing issue still unresolvedThe
json.Unmarshalfailure path (line 253), missing"data"field path (lines 268–270), and data-unmarshal/empty-data path (lines 272–274) allcontinuewithout advancingrowOffset, desynchronizing it from therowRecordsslice for all subsequent successfully-parsed parts.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 252 - 274, The loop skips malformed parts without advancing rowOffset, causing rowOffset to fall out of sync with rowRecords; fix by ensuring rowOffset is advanced on every early-continue and properly advanced after successfully parsing a part: in the branches where json.Unmarshal(part, &m) fails, where m["data"] is missing, and where unmarshalling/empty data causes a continue, increment rowOffset (e.g., rowOffset++ or rowOffset += expected count) before the continue; conversely, when data is parsed successfully, advance rowOffset by len(data) after you consume/process those rows so rowOffset and rowRecords remain aligned (references: variables rowOffset, rowRecords, part, m, data, pkNames).
🧹 Nitpick comments (2)
downstreamadapter/sink/cloudstorage/writer_test.go (1)
136-169:TestMutateMessageValueForFailpointRecordClassificationonly exercises thestringcase ofincrementOriginTSValueThe second message encodes
_tidb_origin_tsas a JSON string ("100"), sojson.Unmarshaldecodes it as a Gostring, hitting thestringbranch ofincrementOriginTSValueand producing"101"(a JSON string). Thefloat64path — triggered when the source has a bare JSON number ("_tidb_origin_ts":100) — returnsfloat64(101), which marshals as the JSON number101(not"101"). This distinct serialized form isn't tested and could silently break downstream consumers expecting a consistent type.Consider adding a second sub-case where the
_tidb_origin_tsvalue is a JSON number:// Add a complementary message variant with a numeric _tidb_origin_ts: // `{"pkNames":["id"],"data":[{"id":"3","_tidb_origin_ts":100}]}` // and assert bytes.Contains(msg.Value, []byte(`"_tidb_origin_ts":101`)) (no quotes).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/writer_test.go` around lines 136 - 169, The test TestMutateMessageValueForFailpointRecordClassification only covers the string branch of incrementOriginTSValue; add a complementary sub-case in the same test that supplies a message whose second data object encodes _tidb_origin_ts as a JSON number (e.g. {"pkNames":["id"],"data":[{"id":"3","_tidb_origin_ts":100}]) and call mutateMessageValueForFailpoint to assert that the mutated msg.Value now contains the numeric form `_tidb_origin_ts`:101 (without quotes), and also verify the mutatedRows/originTsMutatedRows expectations for that numeric case so both the string and float64 branches of incrementOriginTSValue are exercised.downstreamadapter/sink/cloudstorage/writer.go (1)
355-360:math/bigis heavyweight for a simple bounded random index
cryptorand.Int(cryptorand.Reader, big.NewInt(int64(len(columns))))importsmath/bigsolely to pass a modulus. Consider usingcrypto/rand.Readwith a simple rejection-sampling or usingrand/v2(Go 1.22+) which offersrand.IntN. Since this is failpoint-only code a best-effort fallback already exists, so the overhead is cosmetic — but themath/bigdependency bloats the import graph.♻️ Proposed simplification using `crypto/rand.Read`
-import ( - cryptorand "crypto/rand" - "math/big" - ... -) -idx, err := cryptorand.Int(cryptorand.Reader, big.NewInt(int64(len(columns)))) -if err != nil { - return columns[0], true -} -return columns[idx.Int64()], true +var b [8]byte +if _, err := cryptorand.Read(b[:]); err != nil { + return columns[0], true +} +idx := int(binary.LittleEndian.Uint64(b[:]) % uint64(len(columns))) +return columns[idx], true(Add
"encoding/binary"to imports and rename the import aliascryptorand "crypto/rand"→"crypto/rand"used asrand.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 355 - 360, Replace the heavy math/big usage in the selection of a random column (the cryptorand.Int(cryptorand.Reader, big.NewInt(int64(len(columns)))) call) with a bounded crypto/rand.Read-based approach: import encoding/binary and crypto/rand, read a uint64, map it into [0,len(columns)) using rejection sampling (discard values >= floor(2^64/len)*len to avoid bias), then return columns[idx] as before; keep the existing fallback return columns[0], true in the error path. This removes the math/big dependency while preserving cryptographic randomness in the code that picks the index.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 294-299: The code currently sets the flag mutated = true
unconditionally just before checking if mutated to break, making the guard dead;
update the logic so mutated is only set when an actual mutation occurs (i.e.,
set mutated = true and assign mutatedRowOffset/mutatedColumn inside the branch
that detects/permforms a mutation) or, if the intent was to always break, remove
the redundant if-check and keep a direct break; locate the variables mutated,
mutatedRowOffset, and mutatedColumn in the writer loop and move the flag
assignment into the mutation branch (or simplify by removing the conditional) so
the break behaves as intended.
- Around line 380-419: The function incrementOriginTSValue contains many integer
and uint type branches (json.Number, int, int8, int16, int32, int64, uint,
uint8, uint16, uint32, uint64) that are unreachable when JSON is unmarshaled
into map[string]any because numbers become float64; remove those dead branches
and simplify the implementation to handle string, float64 and json.Number only
(or, alternatively, if you intended to support json.Number, change the JSON
decoding path to use json.Decoder with UseNumber so json.Number values are
produced). Update incrementOriginTSValue to convert/increment float64 and string
safely and keep json.Number handling only if upstream decoding uses UseNumber.
- Around line 252-274: The loop skips malformed parts without advancing
rowOffset, causing rowOffset to fall out of sync with rowRecords; fix by
ensuring rowOffset is advanced on every early-continue and properly advanced
after successfully parsing a part: in the branches where json.Unmarshal(part,
&m) fails, where m["data"] is missing, and where unmarshalling/empty data causes
a continue, increment rowOffset (e.g., rowOffset++ or rowOffset += expected
count) before the continue; conversely, when data is parsed successfully,
advance rowOffset by len(data) after you consume/process those rows so rowOffset
and rowRecords remain aligned (references: variables rowOffset, rowRecords,
part, m, data, pkNames).
---
Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/writer_test.go`:
- Around line 136-169: The test
TestMutateMessageValueForFailpointRecordClassification only covers the string
branch of incrementOriginTSValue; add a complementary sub-case in the same test
that supplies a message whose second data object encodes _tidb_origin_ts as a
JSON number (e.g. {"pkNames":["id"],"data":[{"id":"3","_tidb_origin_ts":100}])
and call mutateMessageValueForFailpoint to assert that the mutated msg.Value now
contains the numeric form `_tidb_origin_ts`:101 (without quotes), and also
verify the mutatedRows/originTsMutatedRows expectations for that numeric case so
both the string and float64 branches of incrementOriginTSValue are exercised.
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 355-360: Replace the heavy math/big usage in the selection of a
random column (the cryptorand.Int(cryptorand.Reader,
big.NewInt(int64(len(columns)))) call) with a bounded crypto/rand.Read-based
approach: import encoding/binary and crypto/rand, read a uint64, map it into
[0,len(columns)) using rejection sampling (discard values >= floor(2^64/len)*len
to avoid bias), then return columns[idx] as before; keep the existing fallback
return columns[0], true in the error path. This removes the math/big dependency
while preserving cryptographic randomness in the code that picks the index.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
downstreamadapter/sink/cloudstorage/writer.godownstreamadapter/sink/cloudstorage/writer_test.go
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
pkg/sink/failpointrecord/record.go (1)
148-153:⚠️ Potential issue | 🔴 CriticalRace condition:
disabledis read beforeensureFile()synchronization.
Writedoes a bare read ofdisabledbeforeensureFile(), whileensureFile()may concurrently write it insideinitOnce.Do.🐛 Proposed fix
func Write(failpoint string, rows []RowRecord) { - if disabled { - return - } ensureFile() - if file == nil { + if disabled || file == nil { return }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/failpointrecord/record.go` around lines 148 - 153, The Write path reads the package-level variable `disabled` before calling `ensureFile()`, causing a race because `ensureFile()` (via `initOnce.Do`) may set `disabled`; to fix, move the `disabled` check to after `ensureFile()` and/or protect reads/writes to `disabled` with the same synchronization used in `ensureFile()` (e.g., perform `initOnce.Do` first inside `ensureFile()` and only then read `disabled`), updating the `Write` function to call `ensureFile()` before checking `disabled` (and ensure any assignment to `disabled` remains inside the `initOnce.Do` path) so no unsynchronized read occurs.downstreamadapter/sink/cloudstorage/encoding_group.go (1)
178-191:⚠️ Potential issue | 🔴 CriticalMissing pre-loop
event.Rewind()can yield zero records.If the event cursor was already consumed before this function runs, the loop exits immediately and records nothing.
🐛 Proposed fix
func dmlEventToRowRecords(event *commonEvent.DMLEvent) []failpointrecord.RowRecord { if event == nil || event.TableInfo == nil { return nil } + event.Rewind() indexes, columns := (&commonEvent.RowEvent{TableInfo: event.TableInfo}).PrimaryKeyColumn() originTsCol, hasOriginTsCol := event.TableInfo.GetColumnInfoByName(commonEvent.OriginTsColumn) originTsOffset, hasOriginTsOffset := event.TableInfo.GetColumnOffsetByName(commonEvent.OriginTsColumn) rowRecords := make([]failpointrecord.RowRecord, 0, event.Len()) for { row, ok := event.GetNextRow() if !ok { - event.Rewind() break } @@ } + event.Rewind() return rowRecords }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 178 - 191, In dmlEventToRowRecords, ensure the event cursor is reset before iterating: call event.Rewind() (or otherwise reset the iterator) right after the nil checks and before the loop so GetNextRow() always starts from the first row; this prevents returning zero records when the event cursor was previously consumed—update the function dmlEventToRowRecords to invoke event.Rewind() prior to the for { row, ok := event.GetNextRow() ... } loop (retain the existing Rewind() call after the loop).
🧹 Nitpick comments (2)
pkg/sink/mysql/mysql_writer_dml.go (1)
194-231: Consider deduplicating row-record conversion logic across sinks.
dmlEventsToRowRecordshere anddmlEventToRowRecordsindownstreamadapter/sink/cloudstorage/encoding_group.goare very similar. A shared helper would reduce drift and future bug reintroduction.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 194 - 231, Extract the duplicated conversion logic in dmlEventsToRowRecords and dmlEventToRowRecords into a single shared helper (e.g., ConvertDMLEventsToRowRecords) that accepts the same inputs (tableInfo *common.TableInfo, events []*commonEvent.DMLEvent) and returns []failpointrecord.RowRecord; move the core loop that extracts pkCols (using findPKColumns), originTs (using tableInfo.GetColumnInfoByName/GetColumnOffsetByName and failpointrecord.NormalizeOriginTs), and builds RowRecord (CommitTs, OriginTs, PrimaryKeys) into that helper, replace both dmlEventsToRowRecords and dmlEventToRowRecords bodies with calls to the new helper, and ensure all callers are updated and imports adjusted so behavior remains identical.downstreamadapter/sink/cloudstorage/encoding_group.go (1)
156-176: Add mismatch handling for unassigned row records in splitter.When
sum(msg.GetRowsCount()) < len(rows), tail rows are dropped silently. A warning or explicit remainder handling would make failpoint evidence more reliable.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 156 - 176, splitRowRecordsByMessages currently drops trailing rows when sum(msg.GetRowsCount()) < len(rows); after the loop check if rowIdx < len(rows) and handle the remainder explicitly: either append the leftover rows as an additional entry in ret (so callers can see unassigned rows) or append them to the last message slice, and emit a warning log that includes the counts (len(messages), total requested rows sum, and len(rows)) so evidence isn't lost; update splitRowRecordsByMessages to perform this remainder-check using the existing variables (messages, rows, ret, rowIdx) and call the project's logger or return a clear signal to the caller about the leftover rows.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/sink/failpointrecord/record.go`:
- Around line 27-36: The file fails the repository formatting checks
(gofumports/gci) — update formatting for the import block and entire file by
running the repository formatter: execute make fmt (which runs gci, gofumports,
shfmt) and then re-run make check; ensure the import block in record.go is
reordered/normalized and commit the resulting changes so the CI formatting check
passes.
In `@pkg/sink/mysql/mysql_writer_dml.go`:
- Around line 184-191: findPKColumns currently only checks HasPriKeyFlag and so
misses not-null unique-key handle columns; change it to call
tableInfo.GetOrderedHandleKeyColumnIDs() and iterate those column IDs to build
the []pkColInfo (using the existing tableInfo.GetColumns() slice to map each
column ID to its index and Name.O). Ensure you preserve the pkColInfo fields
(index and name) and return the columns in the ordered handle-key order
(matching the pattern used in mysql_writer_helper.go and elsewhere) so UK-only
tables are included in PrimaryKeys.
---
Duplicate comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 178-191: In dmlEventToRowRecords, ensure the event cursor is reset
before iterating: call event.Rewind() (or otherwise reset the iterator) right
after the nil checks and before the loop so GetNextRow() always starts from the
first row; this prevents returning zero records when the event cursor was
previously consumed—update the function dmlEventToRowRecords to invoke
event.Rewind() prior to the for { row, ok := event.GetNextRow() ... } loop
(retain the existing Rewind() call after the loop).
In `@pkg/sink/failpointrecord/record.go`:
- Around line 148-153: The Write path reads the package-level variable
`disabled` before calling `ensureFile()`, causing a race because `ensureFile()`
(via `initOnce.Do`) may set `disabled`; to fix, move the `disabled` check to
after `ensureFile()` and/or protect reads/writes to `disabled` with the same
synchronization used in `ensureFile()` (e.g., perform `initOnce.Do` first inside
`ensureFile()` and only then read `disabled`), updating the `Write` function to
call `ensureFile()` before checking `disabled` (and ensure any assignment to
`disabled` remains inside the `initOnce.Do` path) so no unsynchronized read
occurs.
---
Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 156-176: splitRowRecordsByMessages currently drops trailing rows
when sum(msg.GetRowsCount()) < len(rows); after the loop check if rowIdx <
len(rows) and handle the remainder explicitly: either append the leftover rows
as an additional entry in ret (so callers can see unassigned rows) or append
them to the last message slice, and emit a warning log that includes the counts
(len(messages), total requested rows sum, and len(rows)) so evidence isn't lost;
update splitRowRecordsByMessages to perform this remainder-check using the
existing variables (messages, rows, ret, rowIdx) and call the project's logger
or return a clear signal to the caller about the leftover rows.
In `@pkg/sink/mysql/mysql_writer_dml.go`:
- Around line 194-231: Extract the duplicated conversion logic in
dmlEventsToRowRecords and dmlEventToRowRecords into a single shared helper
(e.g., ConvertDMLEventsToRowRecords) that accepts the same inputs (tableInfo
*common.TableInfo, events []*commonEvent.DMLEvent) and returns
[]failpointrecord.RowRecord; move the core loop that extracts pkCols (using
findPKColumns), originTs (using
tableInfo.GetColumnInfoByName/GetColumnOffsetByName and
failpointrecord.NormalizeOriginTs), and builds RowRecord (CommitTs, OriginTs,
PrimaryKeys) into that helper, replace both dmlEventsToRowRecords and
dmlEventToRowRecords bodies with calls to the new helper, and ensure all callers
are updated and imports adjusted so behavior remains identical.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
downstreamadapter/sink/cloudstorage/encoding_group.gopkg/sink/failpointrecord/record.gopkg/sink/mysql/mysql_writer_dml.go
| import ( | ||
| "encoding/json" | ||
| "os" | ||
| "strconv" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/pingcap/log" | ||
| "go.uber.org/zap" | ||
| ) |
There was a problem hiding this comment.
CI is failing on formatting for this file.
make check already reports gofumports failure here; please run make fmt before merge.
As per coding guidelines **/*.go: Use gofmt to keep Go code clean; run make fmt before pushing to format code with gci, gofumports, and shfmt, plus log-style checks.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/sink/failpointrecord/record.go` around lines 27 - 36, The file fails the
repository formatting checks (gofumports/gci) — update formatting for the import
block and entire file by running the repository formatter: execute make fmt
(which runs gci, gofumports, shfmt) and then re-run make check; ensure the
import block in record.go is reordered/normalized and commit the resulting
changes so the CI formatting check passes.
| func findPKColumns(tableInfo *common.TableInfo) []pkColInfo { | ||
| var cols []pkColInfo | ||
| for i, col := range tableInfo.GetColumns() { | ||
| if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) { | ||
| cols = append(cols, pkColInfo{index: i, name: col.Name.O}) | ||
| } | ||
| } | ||
| return cols |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the actual file content around the lines mentioned
cat -n pkg/sink/mysql/mysql_writer_dml.go | sed -n '180,195p'Repository: pingcap/ticdc
Length of output: 650
🏁 Script executed:
# Check if the file exists and get its total size
wc -l pkg/sink/mysql/mysql_writer_dml.goRepository: pingcap/ticdc
Length of output: 95
🏁 Script executed:
# Search for the findPKColumns function and surrounding context
rg "findPKColumns" -A 10 -B 5Repository: pingcap/ticdc
Length of output: 2129
🏁 Script executed:
# Search for the suggested methods to verify they exist
rg "GetOrderedHandleKeyColumnIDs|GetColumnInfo|GetColumnOffsetByName" --max-count 20Repository: pingcap/ticdc
Length of output: 4308
🏁 Script executed:
# Look for how handle keys are used elsewhere in the codebase
rg "GetHandleKeyColumnIDs|HandleKeyColumnIDs" -A 3 -B 3Repository: pingcap/ticdc
Length of output: 5062
🏁 Script executed:
# Check the common.TableInfo definition to understand available methods
fd "common" --type f -name "*.go" | head -20Repository: pingcap/ticdc
Length of output: 287
🏁 Script executed:
# Search for pkColInfo definition to understand its usage
rg "pkColInfo" -B 5 -A 10Repository: pingcap/ticdc
Length of output: 1885
🏁 Script executed:
# Check how dmlEventsToRowRecords uses the pkCols result
rg "dmlEventsToRowRecords" -A 30 | head -60Repository: pingcap/ticdc
Length of output: 4400
🏁 Script executed:
# Check if PrimaryKeys field in failpointrecord.RowRecord can be empty
rg "type RowRecord" -A 10 pkg/Repository: pingcap/ticdc
Length of output: 813
🏁 Script executed:
# See the full dmlEventsToRowRecords function to understand the impact of empty pkCols
cat -n pkg/sink/mysql/mysql_writer_dml.go | sed -n '194,230p'Repository: pingcap/ticdc
Length of output: 1584
🏁 Script executed:
# Check if there are tests for findPKColumns or dmlEventsToRowRecords
rg "TestFindPKColumns|TestDmlEventsToRowRecords|findPKColumns|dmlEventsToRowRecords" -l pkg/sink/mysql/Repository: pingcap/ticdc
Length of output: 91
findPKColumns must use GetOrderedHandleKeyColumnIDs() to include UK handle keys.
This helper only checks HasPriKeyFlag, which excludes not-null unique key handle keys. For UK-only tables, PrimaryKeys in the failpoint record becomes empty, breaking downstream violation analysis that relies on row identity matching.
Use GetOrderedHandleKeyColumnIDs() instead, which correctly includes both PK and UK handle keys, matching the pattern already used elsewhere in this file and mysql_writer_helper.go.
Proposed fix
func findPKColumns(tableInfo *common.TableInfo) []pkColInfo {
- var cols []pkColInfo
- for i, col := range tableInfo.GetColumns() {
- if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) {
- cols = append(cols, pkColInfo{index: i, name: col.Name.O})
- }
- }
+ handleColIDs := tableInfo.GetOrderedHandleKeyColumnIDs()
+ cols := make([]pkColInfo, 0, len(handleColIDs))
+ for _, colID := range handleColIDs {
+ col, exist := tableInfo.GetColumnInfo(colID)
+ if !exist || col == nil {
+ continue
+ }
+ offset, ok := tableInfo.GetColumnOffsetByName(col.Name.O)
+ if !ok {
+ continue
+ }
+ cols = append(cols, pkColInfo{index: offset, name: col.Name.O})
+ }
return cols
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func findPKColumns(tableInfo *common.TableInfo) []pkColInfo { | |
| var cols []pkColInfo | |
| for i, col := range tableInfo.GetColumns() { | |
| if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) { | |
| cols = append(cols, pkColInfo{index: i, name: col.Name.O}) | |
| } | |
| } | |
| return cols | |
| func findPKColumns(tableInfo *common.TableInfo) []pkColInfo { | |
| handleColIDs := tableInfo.GetOrderedHandleKeyColumnIDs() | |
| cols := make([]pkColInfo, 0, len(handleColIDs)) | |
| for _, colID := range handleColIDs { | |
| col, exist := tableInfo.GetColumnInfo(colID) | |
| if !exist || col == nil { | |
| continue | |
| } | |
| offset, ok := tableInfo.GetColumnOffsetByName(col.Name.O) | |
| if !ok { | |
| continue | |
| } | |
| cols = append(cols, pkColInfo{index: offset, name: col.Name.O}) | |
| } | |
| return cols | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 184 - 191, findPKColumns
currently only checks HasPriKeyFlag and so misses not-null unique-key handle
columns; change it to call tableInfo.GetOrderedHandleKeyColumnIDs() and iterate
those column IDs to build the []pkColInfo (using the existing
tableInfo.GetColumns() slice to map each column ID to its index and Name.O).
Ensure you preserve the pkColInfo fields (index and name) and return the columns
in the ordered handle-key order (matching the pattern used in
mysql_writer_helper.go and elsewhere) so UK-only tables are included in
PrimaryKeys.
What problem does this PR solve?
Issue Number: ref #4244
What is changed and how it works?
add active-active failpoint to construct and record data loss, data inconsistence, data redandunt and lww violation
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Tests