Skip to content

Comments

cdc: add active-active failpoint to construct and record data loss, data inconsistence, data redandunt and lww violation#4245

Open
Leavrth wants to merge 8 commits intopingcap:masterfrom
Leavrth:active-active-failpoint-master
Open

cdc: add active-active failpoint to construct and record data loss, data inconsistence, data redandunt and lww violation#4245
Leavrth wants to merge 8 commits intopingcap:masterfrom
Leavrth:active-active-failpoint-master

Conversation

@Leavrth
Copy link

@Leavrth Leavrth commented Feb 24, 2026

What problem does this PR solve?

Issue Number: ref #4244

What is changed and how it works?

add active-active failpoint to construct and record data loss, data inconsistence, data redandunt and lww violation

Check List

Tests

  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

None

Summary by CodeRabbit

  • New Features

    • Added a configurable failpoint recording system that logs structured failpoint events (time, failpoint name, rows with commit/origin timestamps and primary keys) to a JSONL file.
    • Added failpoint-driven behaviors for cloud storage sinks to drop or mutate individual messages and record affected rows.
    • Added failpoint-driven recording for an Active-Active MySQL path to capture affected rows.
  • Tests

    • Added tests validating cloud storage failpoint mutation/drop behavior and failpoint recording classification.

Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
@ti-chi-bot ti-chi-bot bot added first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. do-not-merge/needs-triage-completed release-note-none Denotes a PR that doesn't merit a release note. labels Feb 24, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 24, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign hongyunyan for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Feb 24, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 24, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds failpoint-driven per-message failpoints for CloudStorage sink (drop/mutate), helpers to split/convert DML events to row records, a failpoint recorder that writes JSONL row records, canal-json mutation logic for tests, and MySQL sink integration to emit row records under an LWW bypass failpoint.

Changes

Cohort / File(s) Summary
CloudStorage sink encoding & writer
downstreamadapter/sink/cloudstorage/encoding_group.go, downstreamadapter/sink/cloudstorage/writer.go, downstreamadapter/sink/cloudstorage/writer_test.go
Adds global failpoint switch and applyFailpointsOnEncodedMessages to split DMLEvent -> per-message RowRecords and apply per-message failpoints (drop or mutate). Implements canal‑json message mutation, helpers to select/mutate columns, splitting/conversion helpers, and tests verifying mutation/classification.
Failpoint recording infrastructure
pkg/sink/failpointrecord/record.go
New package that defines RowRecord/Record and writes per-failpoint JSONL records to a file controlled by TICDC_FAILPOINT_RECORD_FILE with lazy init, mutexed writes, and structured logging on errors.
MySQL sink failpoint integration
pkg/sink/mysql/mysql_writer_dml.go
Adds mysqlSinkBypassLWW failpoint path that converts DMLEvents to RowRecords and writes them via failpointrecord; introduces PK discovery and DML->RowRecord helpers while keeping normal SQL generation when inactive.

Sequence Diagram

sequenceDiagram
    participant Enc as CloudStorageEncoder
    participant FP as FailpointSwitch
    participant Ext as RowExtractor
    participant Mut as MessageMutator
    participant Rec as FailpointRecorder
    participant File as JSONLFile

    Enc->>FP: check cloudStorageSinkMessageFailpointSwitch
    alt failpoint active
        FP->>Ext: dmlEventToRowRecords / splitRowRecordsByMessages
        Ext-->>FP: per-message RowRecord slices
        FP->>Mut: applyFailpointsOnEncodedMessages (drop/mutate)
        Mut-->>FP: mutated/dropped messages + RowRecords
        FP->>Rec: Write(failpoint, rows)
        Rec->>File: ensureFile() and append JSONL
        File-->>Rec: write ACK
        Rec-->>FP: done
    else inactive
        FP-->>Enc: continue normal flow
    end
    Enc-->>Downstream: send message(s)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested reviewers

  • hongyunyan
  • wk989898
  • lidezhu
  • flowbehappy

Poem

🐰 I nibble bytes beneath the moon,
I split the rows and hum a tune,
I drop a line, I tweak a field,
A JSON hop, a tiny schoom—
Tests pass, and carrots are revealed.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 31.25% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The description provides an issue reference and brief explanation but lacks detail about what is changed and how it works, and the 'No code' test selection is inconsistent with the substantial code additions. Expand the 'What is changed and how it works' section with details about the new failpoint infrastructure, and clarify which test types apply to these changes.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the primary change—adding active-active failpoint functionality to construct and record various data inconsistency scenarios.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @Leavrth, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the testing capabilities for active-active replication scenarios by introducing several failpoints across the TiCDC sink components. These failpoints are designed to programmatically inject conditions that simulate various data integrity issues, such as data loss, inconsistency, redundancy, and Last-Write-Wins violations. The changes also include a new utility to record these simulated events, facilitating the development and validation of multi-cluster consistency checkers.

Highlights

  • Failpoint for Cloud Storage Sink: Introduced a new failpoint (cloudStorageSinkMessageFailpointSwitch) in the cloud storage sink to enable or disable message manipulation for testing data loss and inconsistency scenarios. This includes functions to drop messages or mutate their values.
  • Failpoint for MySQL Sink LWW Bypass: Added a failpoint (mysqlSinkBypassLWW) in the MySQL sink to bypass the Last-Write-Wins (LWW) UPSERT logic. This allows for simulating LWW violations by falling back to normal SQL (REPLACE INTO or INSERT) without the LWW condition.
  • Failpoint Event Recording Utility: Implemented a new failpointrecord package that provides a lightweight utility to record failpoint-triggered events (e.g., primary keys, commit timestamps) to a JSONL file. This enables external tools, like consistency checkers, to consume and verify these events.
  • Message Value Mutation for Inconsistency: Developed a mutateMessageValueForFailpoint function to rewrite non-primary-key column values in Canal-JSON encoded messages. This specifically helps in simulating data inconsistency where the original row appears 'lost' and the mutated row appears 'redundant' to a consistency checker.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • downstreamadapter/sink/cloudstorage/encoding_group.go
    • Added imports for failpoint, log, commonEvent, failpointrecord, and zap.
    • Injected a cloudStorageSinkMessageFailpointSwitch to conditionally apply failpoints on encoded messages.
    • Implemented applyFailpointsOnEncodedMessages to handle message dropping and value mutation failpoints.
    • Added splitRowRecordsByMessages to distribute row records across messages.
    • Included dmlEventToRowRecords to convert DML events into a format suitable for failpoint recording.
  • downstreamadapter/sink/cloudstorage/writer.go
    • Imported the encoding/json package.
    • Added the mutateMessageValueForFailpoint function to modify non-primary-key column values in Canal-JSON encoded messages for inconsistency testing.
  • pkg/sink/failpointrecord/record.go
    • Created a new package failpointrecord.
    • Defined RowRecord and Record structs for capturing and structuring failpoint event data.
    • Implemented ensureFile to manage the opening and initialization of the failpoint record file, controlled by an environment variable.
    • Provided a Write function to persist failpoint events to a JSONL file, with concurrency safety and no-op behavior if disabled.
  • pkg/sink/mysql/mysql_writer_dml.go
    • Added imports for failpoint, failpointrecord, and mysql.
    • Injected a mysqlSinkBypassLWW failpoint within genActiveActiveSQL to bypass LWW UPSERTs and record the event.
    • Defined pkColInfo struct to hold primary key column information.
    • Implemented findPKColumns to extract primary key column details from table information.
    • Added dmlEventsToRowRecords to convert DML events into failpointrecord.RowRecord format for recording.
Activity
  • The pull request introduces new failpoints and a recording utility to simulate and track data integrity issues in active-active replication scenarios. No specific review comments or discussions have been recorded yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces several failpoints to simulate various data consistency issues in active-active replication scenarios, such as data loss, inconsistency, and LWW violations. A new package failpointrecord is added to log these injected failures, which is a good approach for external verification. The changes are well-contained and focused on improving testability.

I've found a couple of areas for improvement:

  1. A data race in the new failpointrecord package that should be fixed.
  2. A potential issue in the mutateMessageValueForFailpoint logic where an ignored error could lead to incorrect test behavior.

Details are in the specific comments. Overall, this is a valuable addition for testing the robustness of the system.

Comment on lines +82 to +89
func Write(failpoint string, rows []RowRecord) {
if disabled {
return
}
ensureFile()
if file == nil {
return
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There's a data race on the disabled package-level variable. It's read on line 83 without synchronization, while it might be written concurrently within ensureFile (lines 64, 72). This can lead to unpredictable behavior.

You can fix this race and simplify the code by removing the disabled variable entirely. The check if file == nil is sufficient to guard against writing when the file isn't opened, as os.OpenFile returns nil on failure.

To fix this, please apply the suggestion below, and also remove the disabled variable declaration on line 57 and the assignments to it on lines 64 and 72.

func Write(failpoint string, rows []RowRecord) {
	ensureFile()
	if file == nil {
		return
	}

Comment on lines +247 to +249
if raw, ok := m["pkNames"]; ok {
_ = json.Unmarshal(raw, &pkNames)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error from json.Unmarshal is ignored here. If pkNames is present in the JSON but malformed, unmarshaling will fail, pkNames will be empty, and pkSet will be empty. This will cause the logic below to potentially mutate a primary key column, which contradicts the goal of this failpoint (mutating a non-PK column to cause a 'lost' and 'redundant' row with the same PK).

To ensure the failpoint works as intended, you should handle this error, for example by skipping this message part.

		if raw, ok := m["pkNames"]; ok {
			if err := json.Unmarshal(raw, &pkNames); err != nil {
				continue
			}
		}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
downstreamadapter/sink/cloudstorage/writer.go (1)

265-279: Non-deterministic column selection due to Go map iteration order.

for col := range row iterates map keys in random order, so the "first non-PK column" mutated will differ across runs. This is acceptable for a failpoint (the goal is to introduce some inconsistency), but if reproducibility matters for debugging, consider sorting the keys first.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 265 - 279, The
loop that picks "the first non-PK column" uses Go map iteration (for col :=
range row) which is intentionally non-deterministic; make selection
deterministic by listing the keys of each row into a slice, sorting that slice,
then iterating the sorted keys to find the first key not in pkSet and set
row[col] = nil (update the existing mutated flag logic). Locate the block using
variables/data names "data", "row", "pkSet", and "mutated" and replace the
map-range with a sorted-key approach so the same column is chosen consistently
across runs.
pkg/sink/mysql/mysql_writer_dml.go (1)

194-222: Duplicated row-record conversion logic across packages.

dmlEventsToRowRecords here and dmlEventToRowRecords in encoding_group.go do essentially the same thing — extract PK values from DML events into failpointrecord.RowRecord slices. They differ only in how PK columns are discovered (findPKColumns vs PrimaryKeyColumn()) and singular vs plural event input.

Consider extracting a shared helper (e.g., into the failpointrecord package or a common utility) to keep these in sync and avoid divergence.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 194 - 222, Both
dmlEventsToRowRecords and dmlEventToRowRecords duplicate logic for extracting
primary key values into failpointrecord.RowRecord; extract a single helper
(e.g., failpointrecord.FromDMLEvents or failpointrecord.FromDMLEvent) that
accepts either a single *commonEvent.DMLEvent or a slice and a PK-discovery
function, move the loop and PK extraction into that helper, and then update
dmlEventsToRowRecords and dmlEventToRowRecords to call the new helper; unify PK
column discovery by using a single accessor (wrap findPKColumns/PrimaryKeyColumn
behind a small adapter if needed) so both callers pass the same PK column format
to the helper.
downstreamadapter/sink/cloudstorage/encoding_group.go (1)

128-146: Both cloudStorageSinkDropMessage and cloudStorageSinkMutateValue fire on every message.

When both failpoints are enabled, every message is first dropped (Key/Value set to nil, row count zeroed) and then mutated. Mutating a nil Value is a no-op in mutateMessageValueForFailpoint (it returns early on empty Value), so the second failpoint becomes dead code when the first is active. This is likely fine for independent use, but worth documenting that these two failpoints are mutually exclusive in practice.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 128 -
146, Both failpoints cloudStorageSinkDropMessage and cloudStorageSinkMutateValue
currently run for every message, causing cloudStorageSinkMutateValue to be a
no-op when cloudStorageSinkDropMessage has already nulled the message; fix this
by making the mutate failpoint conditional so it only runs when the message
still has a payload (e.g., check msg.Value != nil and msg.RowsCount()>0 before
calling mutateMessageValueForFailpoint), or alternatively swap/chain the
failpoints so mutate runs before drop and document the mutual exclusivity in
comments next to cloudStorageSinkDropMessage/cloudStorageSinkMutateValue.
pkg/sink/failpointrecord/record.go (1)

53-77: No Close function — file handle leaks until process exit.

The file opened by ensureFile is never closed. While for a long-running daemon this is mostly benign (the OS reclaims on exit), adding a Close() function would be good hygiene and helpful for tests that want to verify file contents after Write calls.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/failpointrecord/record.go` around lines 53 - 77, Add a safe Close
function that releases the opened file handle to avoid leaks: implement Close()
which acquires the package mutex (mu), checks if the package-level file is
non-nil, calls file.Close(), sets file = nil (and optionally disabled = true if
you want to prevent further writes), and returns any close error; make Close
idempotent so repeated calls are safe. Locate the package-level vars (initOnce,
mu, file, disabled) and the ensureFile function to place Close in the same file;
if you need ensureFile to be able to reopen after Close, reset initOnce
(sync.Once) accordingly before reusing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 172-201: The loop in dmlEventToRowRecords advances via
event.GetNextRow() but never resets the event cursor, so if the event was
previously consumed (e.g., by encoder.AppendTxnEvent/Build) the loop will
immediately end; call event.Rewind() on the event (use the existing event
variable) before entering the for { row, ok := event.GetNextRow() ... } loop so
iteration starts from the first row; keep the existing nil checks and return
behavior.

In `@pkg/sink/failpointrecord/record.go`:
- Around line 53-58: The read of the package-level variable `disabled` is
unsynchronized and can race with its write inside the `initOnce.Do` used by
`ensureFile()`; to fix this move the call to `ensureFile()` before any bare
reads of `disabled` (or use atomic loads/stores on `disabled`), e.g., ensure
callers of `ensureFile()` (the code reading `disabled` around the current checks
at lines referencing `disabled`) always call `ensureFile()` first so the
`sync.Once` establishes the happens‑before relationship and the subsequent read
of `disabled` is safe; update all affected spots (the checks around `disabled`
and the `ensureFile()`/`initOnce.Do` usage) accordingly.

---

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 128-146: Both failpoints cloudStorageSinkDropMessage and
cloudStorageSinkMutateValue currently run for every message, causing
cloudStorageSinkMutateValue to be a no-op when cloudStorageSinkDropMessage has
already nulled the message; fix this by making the mutate failpoint conditional
so it only runs when the message still has a payload (e.g., check msg.Value !=
nil and msg.RowsCount()>0 before calling mutateMessageValueForFailpoint), or
alternatively swap/chain the failpoints so mutate runs before drop and document
the mutual exclusivity in comments next to
cloudStorageSinkDropMessage/cloudStorageSinkMutateValue.

In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 265-279: The loop that picks "the first non-PK column" uses Go map
iteration (for col := range row) which is intentionally non-deterministic; make
selection deterministic by listing the keys of each row into a slice, sorting
that slice, then iterating the sorted keys to find the first key not in pkSet
and set row[col] = nil (update the existing mutated flag logic). Locate the
block using variables/data names "data", "row", "pkSet", and "mutated" and
replace the map-range with a sorted-key approach so the same column is chosen
consistently across runs.

In `@pkg/sink/failpointrecord/record.go`:
- Around line 53-77: Add a safe Close function that releases the opened file
handle to avoid leaks: implement Close() which acquires the package mutex (mu),
checks if the package-level file is non-nil, calls file.Close(), sets file = nil
(and optionally disabled = true if you want to prevent further writes), and
returns any close error; make Close idempotent so repeated calls are safe.
Locate the package-level vars (initOnce, mu, file, disabled) and the ensureFile
function to place Close in the same file; if you need ensureFile to be able to
reopen after Close, reset initOnce (sync.Once) accordingly before reusing.

In `@pkg/sink/mysql/mysql_writer_dml.go`:
- Around line 194-222: Both dmlEventsToRowRecords and dmlEventToRowRecords
duplicate logic for extracting primary key values into
failpointrecord.RowRecord; extract a single helper (e.g.,
failpointrecord.FromDMLEvents or failpointrecord.FromDMLEvent) that accepts
either a single *commonEvent.DMLEvent or a slice and a PK-discovery function,
move the loop and PK extraction into that helper, and then update
dmlEventsToRowRecords and dmlEventToRowRecords to call the new helper; unify PK
column discovery by using a single accessor (wrap findPKColumns/PrimaryKeyColumn
behind a small adapter if needed) so both callers pass the same PK column format
to the helper.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9f8e3ee and bb8ce03.

📒 Files selected for processing (4)
  • downstreamadapter/sink/cloudstorage/encoding_group.go
  • downstreamadapter/sink/cloudstorage/writer.go
  • pkg/sink/failpointrecord/record.go
  • pkg/sink/mysql/mysql_writer_dml.go

Comment on lines +53 to +58
var (
initOnce sync.Once
mu sync.Mutex
file *os.File
disabled bool
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Data race on disabled: unsynchronized read before ensureFile().

disabled is written inside initOnce.Do (line 64/72) but read on line 83 outside any synchronization. The Go memory model only guarantees visibility for subsequent once.Do callers, not for bare reads of disabled. The -race detector will flag this.

The simplest fix: always call ensureFile() first, then check disabled.

🐛 Proposed fix
-var (
-	initOnce sync.Once
-	mu       sync.Mutex
-	file     *os.File
-	disabled bool
-)
+var (
+	initOnce sync.Once
+	mu       sync.Mutex
+	file     *os.File
+	disabled atomic.Bool
+)

 func ensureFile() {
 	initOnce.Do(func() {
 		path := os.Getenv(envKey)
 		if path == "" {
-			disabled = true
+			disabled.Store(true)
 			return
 		}
 		var err error
 		file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
 		if err != nil {
 			log.Warn("failed to open failpoint record file, recording disabled",
 				zap.String("path", path), zap.Error(err))
-			disabled = true
+			disabled.Store(true)
 			return
 		}
 		log.Info("failpoint record file opened", zap.String("path", path))
 	})
 }

 func Write(failpoint string, rows []RowRecord) {
-	if disabled {
+	if disabled.Load() {
 		return
 	}
 	ensureFile()
-	if file == nil {
+	if disabled.Load() || file == nil {
 		return
 	}

This requires adding "sync/atomic" to imports (or using go.uber.org/atomic which is already a transitive dep in this repo).

Alternatively, the simpler approach: just move ensureFile() before the disabled check and drop the early-return shortcut, relying on sync.Once for the fast path:

♻️ Simpler alternative
 func Write(failpoint string, rows []RowRecord) {
-	if disabled {
-		return
-	}
 	ensureFile()
-	if file == nil {
+	if disabled || file == nil {
 		return
 	}

After ensureFile() returns, disabled is safely visible because sync.Once guarantees the happens-before relationship for subsequent Do calls (which are no-ops but still synchronize).

Also applies to: 82-88

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/failpointrecord/record.go` around lines 53 - 58, The read of the
package-level variable `disabled` is unsynchronized and can race with its write
inside the `initOnce.Do` used by `ensureFile()`; to fix this move the call to
`ensureFile()` before any bare reads of `disabled` (or use atomic loads/stores
on `disabled`), e.g., ensure callers of `ensureFile()` (the code reading
`disabled` around the current checks at lines referencing `disabled`) always
call `ensureFile()` first so the `sync.Once` establishes the happens‑before
relationship and the subsequent read of `disabled` is safe; update all affected
spots (the checks around `disabled` and the `ensureFile()`/`initOnce.Do` usage)
accordingly.

Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
downstreamadapter/sink/cloudstorage/encoding_group.go (1)

178-188: ⚠️ Potential issue | 🔴 Critical

Rewind must happen before row iteration.

dmlEventToRowRecords still starts reading from the current cursor. If event was already consumed before this call, the first GetNextRow() returns ok == false, and you return no records for failpoint writing.

🐛 Proposed fix
 func dmlEventToRowRecords(event *commonEvent.DMLEvent) []failpointrecord.RowRecord {
 	if event == nil || event.TableInfo == nil {
 		return nil
 	}
+	event.Rewind()
 	indexes, columns := (&commonEvent.RowEvent{TableInfo: event.TableInfo}).PrimaryKeyColumn()
 	rowRecords := make([]failpointrecord.RowRecord, 0, event.Len())
 	for {
 		row, ok := event.GetNextRow()
 		if !ok {
 			event.Rewind()
 			break
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 178 -
188, The function dmlEventToRowRecords reads rows starting from the event's
current cursor, but Rewind() is called only after iteration, so if the event was
already consumed it yields no records; call event.Rewind() before the loop
(before the first GetNextRow()) to reset the cursor, then iterate with
GetNextRow() and keep the existing event.Rewind() placement if you still need to
reset after reading; update dmlEventToRowRecords accordingly (references:
dmlEventToRowRecords, event.GetNextRow, event.Rewind).
🧹 Nitpick comments (2)
downstreamadapter/sink/cloudstorage/writer.go (1)

226-228: Align function comment with actual mutation strategy.

The comment says the code mutates the first non-PK column in data[0], but implementation selects a non-PK column via selectColumnToMutate and can mutate a later row if needed.

📝 Proposed comment fix
-//  2. Pick the first non-PK column in data[0] and replace its value with nil.
+//  2. Find the first row containing a non-PK column, choose one non-PK column,
+//     and replace its value with nil.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 226 - 228, The
function comment is inaccurate: instead of always mutating the first non-PK
column in data[0], the code uses selectColumnToMutate to choose a column which
may be found in a later row and then mutates that selected row/column. Update
the comment in writer.go to state that the JSON is parsed to extract "pkNames"
and "data", selectColumnToMutate is used to pick a non-PK column (which may come
from a later row), and the chosen cell's value is set to nil before
re-marshaling the message, referencing selectColumnToMutate and the mutation
step so future readers understand the actual mutation strategy.
downstreamadapter/sink/cloudstorage/encoding_group.go (1)

129-144: Use neutral log messages and put failpoint identity in fields.

These warning messages currently embed failpoint/function names in the log text. Move that into a structured field and keep message text generic.

♻️ Proposed fix
-			log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss",
+			log.Warn("dropping message to simulate data loss",
+				zap.String("failpoint", "cloudStorageSinkDropMessage"),
 				zap.String("keyspace", eg.changeFeedID.Keyspace()),
 				zap.Stringer("changefeed", eg.changeFeedID.ID()),
 				zap.Any("rows", rowRecords))
@@
-			log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency",
+			log.Warn("mutating message value to simulate data inconsistency",
+				zap.String("failpoint", "cloudStorageSinkMutateValue"),
 				zap.String("keyspace", eg.changeFeedID.Keyspace()),
 				zap.Stringer("changefeed", eg.changeFeedID.ID()),
 				zap.Any("rows", rowRecords))
As per coding guidelines: "Use structured logs via `github.com/pingcap/log` with `zap` fields in Go; log message strings should not include function names and should avoid hyphens (use spaces instead)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 129 -
144, The log messages inside the failpoint handlers in encoding_group.go (the
failpoint.Inject blocks for "cloudStorageSinkDropMessage" and
"cloudStorageSinkMutateValue") embed the failpoint/function names in the message
text; change them to neutral, hyphen-free messages (e.g., "simulating data loss"
and "simulating data inconsistency") and move the failpoint identity into a
structured zap field such as
zap.String("failpoint","cloudStorageSinkDropMessage") or
zap.String("failpoint","cloudStorageSinkMutateValue"), keeping existing
structured fields like zap.String("keyspace", eg.changeFeedID.Keyspace()),
zap.Stringer("changefeed", eg.changeFeedID.ID()), and zap.Any("rows",
rowRecords) unchanged; ensure you update both log.Warn calls referenced above to
follow the same pattern.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 178-188: The function dmlEventToRowRecords reads rows starting
from the event's current cursor, but Rewind() is called only after iteration, so
if the event was already consumed it yields no records; call event.Rewind()
before the loop (before the first GetNextRow()) to reset the cursor, then
iterate with GetNextRow() and keep the existing event.Rewind() placement if you
still need to reset after reading; update dmlEventToRowRecords accordingly
(references: dmlEventToRowRecords, event.GetNextRow, event.Rewind).

---

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 129-144: The log messages inside the failpoint handlers in
encoding_group.go (the failpoint.Inject blocks for "cloudStorageSinkDropMessage"
and "cloudStorageSinkMutateValue") embed the failpoint/function names in the
message text; change them to neutral, hyphen-free messages (e.g., "simulating
data loss" and "simulating data inconsistency") and move the failpoint identity
into a structured zap field such as
zap.String("failpoint","cloudStorageSinkDropMessage") or
zap.String("failpoint","cloudStorageSinkMutateValue"), keeping existing
structured fields like zap.String("keyspace", eg.changeFeedID.Keyspace()),
zap.Stringer("changefeed", eg.changeFeedID.ID()), and zap.Any("rows",
rowRecords) unchanged; ensure you update both log.Warn calls referenced above to
follow the same pattern.

In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 226-228: The function comment is inaccurate: instead of always
mutating the first non-PK column in data[0], the code uses selectColumnToMutate
to choose a column which may be found in a later row and then mutates that
selected row/column. Update the comment in writer.go to state that the JSON is
parsed to extract "pkNames" and "data", selectColumnToMutate is used to pick a
non-PK column (which may come from a later row), and the chosen cell's value is
set to nil before re-marshaling the message, referencing selectColumnToMutate
and the mutation step so future readers understand the actual mutation strategy.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bb8ce03 and 888a12f.

📒 Files selected for processing (3)
  • downstreamadapter/sink/cloudstorage/encoding_group.go
  • downstreamadapter/sink/cloudstorage/writer.go
  • downstreamadapter/sink/cloudstorage/writer_test.go

Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
downstreamadapter/sink/cloudstorage/writer_test.go (1)

136-169: Test only covers the string representation of _tidb_origin_ts; the numeric JSON case is untested

The _tidb_origin_ts value in the test is "100" (a quoted JSON string). A real canal-json payload may encode it as a JSON number (100, no quotes), which json.Unmarshal decodes as float64. The float64 branch of incrementOriginTSValue returns float64(101), which json.Marshal renders as 101 — so bytes.Contains(msg.Value, []byte("_tidb_origin_ts":"101")) would fail for that input.

Consider adding a second sub-test (or a table-driven case) with "_tidb_origin_ts":100 (unquoted) to exercise that branch and pin the expected serialization format.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer_test.go` around lines 136 - 169,
Add a second sub-test to TestMutateMessageValueForFailpointRecordClassification
that supplies an unquoted numeric _tidb_origin_ts (e.g.
..."_tidb_origin_ts":100...) to exercise the float64 branch in
incrementOriginTSValue and mutateMessageValueForFailpoint; assert mutatedRows
and originTsMutatedRows lengths and contents as in the existing assertions, and
change the bytes.Contains check for the numeric case to expect the serialized
numeric form (e.g. `"_tidb_origin_ts":101` without quotes) so the test verifies
both string and numeric JSON encodings.
downstreamadapter/sink/cloudstorage/writer.go (1)

332-349: crypto/rand is unnecessarily heavy for a failpoint-only path

crypto/rand is an OS-level entropy source intended for cryptographic use. Inside a failpoint block that deliberately corrupts data for testing, math/rand (or even a fixed index) is sufficient and avoids the overhead and potential error path of cryptorand.Int.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 332 - 349, The
function selectColumnToMutate uses crypto/rand which is unnecessary for a
failpoint path; replace the cryptorand.Int call with math/rand's fast non-crypto
RNG (e.g., rand.Intn(len(columns))) or a deterministic/fixed index for tests,
remove the cryptorand error branch and simply pick an index from columns, and
update imports to use math/rand (and seed it at init if needed for variability);
refer to selectColumnToMutate and the cryptorand.Int usage when making the
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 251-273: The code currently continues on
parse/unmarshal/data-missing errors without advancing rowOffset, which
desynchronizes it from the parallel rowRecords slice; update the error/continue
paths so rowOffset is always advanced: when json.Unmarshal(part, &m) fails
(failed part), increment rowOffset by 1 before continue; when "data" is missing
or unmarshal of data fails, likewise increment rowOffset by 1 before continue;
when data is successfully unmarshaled, after processing that part increment
rowOffset by len(data) (or otherwise by the number of rows that part
represents). Apply these changes around the json.Unmarshal(part, &m) block and
the "data" extraction so mutatedRowOffset + rowOffset stays aligned for
extractMutatedRowRecordsByOffset and references to rowRecords.
- Around line 293-298: The conditional "if mutated { break }" is dead because
"mutated" is set just above; replace that guarded break with a plain "break"
(remove the redundant if) in the loop where mutated, mutatedRowOffset, and
mutatedColumn are assigned (the block using rowIdx and col) so control exits
immediately without the always-true guard.
- Around line 368-407: The function incrementOriginTSValue contains many
unreachable integer/uint/json.Number branches; keep only the cases that
json.Unmarshal can produce (string and float64) and handle float64 safely: in
incrementOriginTSValue, remove the int*/uint*/json.Number cases, keep the string
case (parsing with strconv.ParseUint) and the float64 case but change it to
check that the float64 represents an exact non-negative integer within uint64
safe range (no fractional part and <= math.MaxInt53 or use 1<<53-1), then
increment and return as a string (or json.Number) formatted without a decimal
point; if the float64 is not an exact integer or out of safe range, return
nil,false. Ensure you reference the incrementOriginTSValue function and update
only its switch handling.

---

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/writer_test.go`:
- Around line 136-169: Add a second sub-test to
TestMutateMessageValueForFailpointRecordClassification that supplies an unquoted
numeric _tidb_origin_ts (e.g. ..."_tidb_origin_ts":100...) to exercise the
float64 branch in incrementOriginTSValue and mutateMessageValueForFailpoint;
assert mutatedRows and originTsMutatedRows lengths and contents as in the
existing assertions, and change the bytes.Contains check for the numeric case to
expect the serialized numeric form (e.g. `"_tidb_origin_ts":101` without quotes)
so the test verifies both string and numeric JSON encodings.

In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 332-349: The function selectColumnToMutate uses crypto/rand which
is unnecessary for a failpoint path; replace the cryptorand.Int call with
math/rand's fast non-crypto RNG (e.g., rand.Intn(len(columns))) or a
deterministic/fixed index for tests, remove the cryptorand error branch and
simply pick an index from columns, and update imports to use math/rand (and seed
it at init if needed for variability); refer to selectColumnToMutate and the
cryptorand.Int usage when making the change.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 888a12f and 0c86f95.

📒 Files selected for processing (2)
  • downstreamadapter/sink/cloudstorage/writer.go
  • downstreamadapter/sink/cloudstorage/writer_test.go

Comment on lines +251 to +273
if err := json.Unmarshal(part, &m); err != nil {
continue
}

// Extract pkNames so we can skip PK columns.
var pkNames []string
if raw, ok := m["pkNames"]; ok {
_ = json.Unmarshal(raw, &pkNames)
}
pkSet := make(map[string]struct{}, len(pkNames))
for _, pk := range pkNames {
pkSet[pk] = struct{}{}
}

// Extract the "data" array.
rawData, ok := m["data"]
if !ok {
continue
}
var data []map[string]any
if err := json.Unmarshal(rawData, &data); err != nil || len(data) == 0 {
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

rowOffset is not incremented when a part fails to parse, corrupting subsequent offset lookups

When json.Unmarshal(part, &m) fails (line 251), when "data" is absent (line 267), or when the data array fails to unmarshal or is empty (line 271), the code continues without adding any value to rowOffset. For every subsequent successfully-parsed part, rowOffset + mutatedRowOffset will be smaller than the actual row position, causing extractMutatedRowRecordsByOffset to return the wrong RowRecord.

Because rowRecords is built in parallel with parts by the caller, a failed parse implies the matching rowRecords entry still occupies a slot. The offset must always advance by the expected number of rows (or at least by 1 per failed part) to stay aligned.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 251 - 273, The
code currently continues on parse/unmarshal/data-missing errors without
advancing rowOffset, which desynchronizes it from the parallel rowRecords slice;
update the error/continue paths so rowOffset is always advanced: when
json.Unmarshal(part, &m) fails (failed part), increment rowOffset by 1 before
continue; when "data" is missing or unmarshal of data fails, likewise increment
rowOffset by 1 before continue; when data is successfully unmarshaled, after
processing that part increment rowOffset by len(data) (or otherwise by the
number of rows that part represents). Apply these changes around the
json.Unmarshal(part, &m) block and the "data" extraction so mutatedRowOffset +
rowOffset stays aligned for extractMutatedRowRecordsByOffset and references to
rowRecords.

Comment on lines +293 to +298
mutated = true
mutatedRowOffset = rowIdx
mutatedColumn = col
if mutated {
break
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Dead code: if mutated { break } is always true

mutated is unconditionally set to true on line 293; the if mutated guard is always satisfied. This is a no-op conditional that should be replaced with a plain break.

🐛 Proposed fix
         mutated = true
         mutatedRowOffset = rowIdx
         mutatedColumn = col
-        if mutated {
-            break
-        }
+        break
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
mutated = true
mutatedRowOffset = rowIdx
mutatedColumn = col
if mutated {
break
}
mutated = true
mutatedRowOffset = rowIdx
mutatedColumn = col
break
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 293 - 298, The
conditional "if mutated { break }" is dead because "mutated" is set just above;
replace that guarded break with a plain "break" (remove the redundant if) in the
loop where mutated, mutatedRowOffset, and mutatedColumn are assigned (the block
using rowIdx and col) so control exits immediately without the always-true
guard.

Comment on lines +368 to +407
func incrementOriginTSValue(v any) (any, bool) {
switch value := v.(type) {
case string:
originTS, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return nil, false
}
return strconv.FormatUint(originTS+1, 10), true
case float64:
return value + 1, true
case json.Number:
originTS, err := value.Int64()
if err != nil {
return nil, false
}
return json.Number(strconv.FormatInt(originTS+1, 10)), true
case int:
return value + 1, true
case int8:
return value + 1, true
case int16:
return value + 1, true
case int32:
return value + 1, true
case int64:
return value + 1, true
case uint:
return value + 1, true
case uint8:
return value + 1, true
case uint16:
return value + 1, true
case uint32:
return value + 1, true
case uint64:
return value + 1, true
default:
return nil, false
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Most branches in incrementOriginTSValue are unreachable

json.Unmarshal into a map[string]any always decodes JSON numbers as float64; it never produces json.Number, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, or uint64. The json.Number type is only produced by json.Decoder when UseNumber() is set, which is not used here.

Consequently, only the string case (line 370) and the float64 case (line 376) are ever reached at runtime; the remaining ten branches are dead code and add maintenance burden.

A secondary concern: the float64 path can silently lose precision for _tidb_origin_ts values larger than 2^53 (the float64 mantissa limit), and json.Marshal may render it with a decimal point (e.g., 101 vs 101.0) depending on the value.

♻️ Proposed simplification
 func incrementOriginTSValue(v any) (any, bool) {
     switch value := v.(type) {
     case string:
         originTS, err := strconv.ParseUint(value, 10, 64)
         if err != nil {
             return nil, false
         }
         return strconv.FormatUint(originTS+1, 10), true
     case float64:
-        return value + 1, true
-    case json.Number:
-        originTS, err := value.Int64()
-        if err != nil {
-            return nil, false
-        }
-        return json.Number(strconv.FormatInt(originTS+1, 10)), true
-    case int:
-        return value + 1, true
-    case int8:
-        return value + 1, true
-    case int16:
-        return value + 1, true
-    case int32:
-        return value + 1, true
-    case int64:
-        return value + 1, true
-    case uint:
-        return value + 1, true
-    case uint8:
-        return value + 1, true
-    case uint16:
-        return value + 1, true
-    case uint32:
-        return value + 1, true
-    case uint64:
-        return value + 1, true
+        // json.Unmarshal into map[string]any always uses float64 for numbers.
+        // Format as integer string to avoid floating-point representation noise.
+        return strconv.FormatUint(uint64(value)+1, 10), true
     default:
         return nil, false
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func incrementOriginTSValue(v any) (any, bool) {
switch value := v.(type) {
case string:
originTS, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return nil, false
}
return strconv.FormatUint(originTS+1, 10), true
case float64:
return value + 1, true
case json.Number:
originTS, err := value.Int64()
if err != nil {
return nil, false
}
return json.Number(strconv.FormatInt(originTS+1, 10)), true
case int:
return value + 1, true
case int8:
return value + 1, true
case int16:
return value + 1, true
case int32:
return value + 1, true
case int64:
return value + 1, true
case uint:
return value + 1, true
case uint8:
return value + 1, true
case uint16:
return value + 1, true
case uint32:
return value + 1, true
case uint64:
return value + 1, true
default:
return nil, false
}
}
func incrementOriginTSValue(v any) (any, bool) {
switch value := v.(type) {
case string:
originTS, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return nil, false
}
return strconv.FormatUint(originTS+1, 10), true
case float64:
// json.Unmarshal into map[string]any always uses float64 for numbers.
// Format as integer string to avoid floating-point representation noise.
return strconv.FormatUint(uint64(value)+1, 10), true
default:
return nil, false
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 368 - 407, The
function incrementOriginTSValue contains many unreachable
integer/uint/json.Number branches; keep only the cases that json.Unmarshal can
produce (string and float64) and handle float64 safely: in
incrementOriginTSValue, remove the int*/uint*/json.Number cases, keep the string
case (parsing with strconv.ParseUint) and the float64 case but change it to
check that the float64 represents an exact non-negative integer within uint64
safe range (no fractional part and <= math.MaxInt53 or use 1<<53-1), then
increment and return as a string (or json.Number) formatted without a decimal
point; if the float64 is not an exact integer or out of safe range, return
nil,false. Ensure you reference the incrementOriginTSValue function and update
only its switch handling.

Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
@ti-chi-bot ti-chi-bot bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Feb 25, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
downstreamadapter/sink/cloudstorage/writer.go (3)

294-299: if mutated { break } is dead code — existing issue still unresolved

mutated is unconditionally set to true at line 294 immediately before the guard. The if mutated branch is always taken.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 294 - 299, The
code currently sets the flag mutated = true unconditionally just before checking
if mutated to break, making the guard dead; update the logic so mutated is only
set when an actual mutation occurs (i.e., set mutated = true and assign
mutatedRowOffset/mutatedColumn inside the branch that detects/permforms a
mutation) or, if the intent was to always break, remove the redundant if-check
and keep a direct break; locate the variables mutated, mutatedRowOffset, and
mutatedColumn in the writer loop and move the flag assignment into the mutation
branch (or simplify by removing the conditional) so the break behaves as
intended.

380-419: Unreachable branches in incrementOriginTSValue — existing issue still unresolved

json.Unmarshal into map[string]any always decodes JSON numbers as float64; branches for json.Number, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64 are never reachable through this call path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 380 - 419, The
function incrementOriginTSValue contains many integer and uint type branches
(json.Number, int, int8, int16, int32, int64, uint, uint8, uint16, uint32,
uint64) that are unreachable when JSON is unmarshaled into map[string]any
because numbers become float64; remove those dead branches and simplify the
implementation to handle string, float64 and json.Number only (or,
alternatively, if you intended to support json.Number, change the JSON decoding
path to use json.Decoder with UseNumber so json.Number values are produced).
Update incrementOriginTSValue to convert/increment float64 and string safely and
keep json.Number handling only if upstream decoding uses UseNumber.

252-274: rowOffset not incremented on parse/unmarshal failures — existing issue still unresolved

The json.Unmarshal failure path (line 253), missing "data" field path (lines 268–270), and data-unmarshal/empty-data path (lines 272–274) all continue without advancing rowOffset, desynchronizing it from the rowRecords slice for all subsequent successfully-parsed parts.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 252 - 274, The
loop skips malformed parts without advancing rowOffset, causing rowOffset to
fall out of sync with rowRecords; fix by ensuring rowOffset is advanced on every
early-continue and properly advanced after successfully parsing a part: in the
branches where json.Unmarshal(part, &m) fails, where m["data"] is missing, and
where unmarshalling/empty data causes a continue, increment rowOffset (e.g.,
rowOffset++ or rowOffset += expected count) before the continue; conversely,
when data is parsed successfully, advance rowOffset by len(data) after you
consume/process those rows so rowOffset and rowRecords remain aligned
(references: variables rowOffset, rowRecords, part, m, data, pkNames).
🧹 Nitpick comments (2)
downstreamadapter/sink/cloudstorage/writer_test.go (1)

136-169: TestMutateMessageValueForFailpointRecordClassification only exercises the string case of incrementOriginTSValue

The second message encodes _tidb_origin_ts as a JSON string ("100"), so json.Unmarshal decodes it as a Go string, hitting the string branch of incrementOriginTSValue and producing "101" (a JSON string). The float64 path — triggered when the source has a bare JSON number ("_tidb_origin_ts":100) — returns float64(101), which marshals as the JSON number 101 (not "101"). This distinct serialized form isn't tested and could silently break downstream consumers expecting a consistent type.

Consider adding a second sub-case where the _tidb_origin_ts value is a JSON number:

// Add a complementary message variant with a numeric _tidb_origin_ts:
// `{"pkNames":["id"],"data":[{"id":"3","_tidb_origin_ts":100}]}`
// and assert bytes.Contains(msg.Value, []byte(`"_tidb_origin_ts":101`)) (no quotes).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer_test.go` around lines 136 - 169,
The test TestMutateMessageValueForFailpointRecordClassification only covers the
string branch of incrementOriginTSValue; add a complementary sub-case in the
same test that supplies a message whose second data object encodes
_tidb_origin_ts as a JSON number (e.g.
{"pkNames":["id"],"data":[{"id":"3","_tidb_origin_ts":100}]) and call
mutateMessageValueForFailpoint to assert that the mutated msg.Value now contains
the numeric form `_tidb_origin_ts`:101 (without quotes), and also verify the
mutatedRows/originTsMutatedRows expectations for that numeric case so both the
string and float64 branches of incrementOriginTSValue are exercised.
downstreamadapter/sink/cloudstorage/writer.go (1)

355-360: math/big is heavyweight for a simple bounded random index

cryptorand.Int(cryptorand.Reader, big.NewInt(int64(len(columns)))) imports math/big solely to pass a modulus. Consider using crypto/rand.Read with a simple rejection-sampling or using rand/v2 (Go 1.22+) which offers rand.IntN. Since this is failpoint-only code a best-effort fallback already exists, so the overhead is cosmetic — but the math/big dependency bloats the import graph.

♻️ Proposed simplification using `crypto/rand.Read`
-import (
-    cryptorand "crypto/rand"
-    "math/big"
-    ...
-)

-idx, err := cryptorand.Int(cryptorand.Reader, big.NewInt(int64(len(columns))))
-if err != nil {
-    return columns[0], true
-}
-return columns[idx.Int64()], true
+var b [8]byte
+if _, err := cryptorand.Read(b[:]); err != nil {
+    return columns[0], true
+}
+idx := int(binary.LittleEndian.Uint64(b[:]) % uint64(len(columns)))
+return columns[idx], true

(Add "encoding/binary" to imports and rename the import alias cryptorand "crypto/rand""crypto/rand" used as rand.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 355 - 360,
Replace the heavy math/big usage in the selection of a random column (the
cryptorand.Int(cryptorand.Reader, big.NewInt(int64(len(columns)))) call) with a
bounded crypto/rand.Read-based approach: import encoding/binary and crypto/rand,
read a uint64, map it into [0,len(columns)) using rejection sampling (discard
values >= floor(2^64/len)*len to avoid bias), then return columns[idx] as
before; keep the existing fallback return columns[0], true in the error path.
This removes the math/big dependency while preserving cryptographic randomness
in the code that picks the index.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 294-299: The code currently sets the flag mutated = true
unconditionally just before checking if mutated to break, making the guard dead;
update the logic so mutated is only set when an actual mutation occurs (i.e.,
set mutated = true and assign mutatedRowOffset/mutatedColumn inside the branch
that detects/permforms a mutation) or, if the intent was to always break, remove
the redundant if-check and keep a direct break; locate the variables mutated,
mutatedRowOffset, and mutatedColumn in the writer loop and move the flag
assignment into the mutation branch (or simplify by removing the conditional) so
the break behaves as intended.
- Around line 380-419: The function incrementOriginTSValue contains many integer
and uint type branches (json.Number, int, int8, int16, int32, int64, uint,
uint8, uint16, uint32, uint64) that are unreachable when JSON is unmarshaled
into map[string]any because numbers become float64; remove those dead branches
and simplify the implementation to handle string, float64 and json.Number only
(or, alternatively, if you intended to support json.Number, change the JSON
decoding path to use json.Decoder with UseNumber so json.Number values are
produced). Update incrementOriginTSValue to convert/increment float64 and string
safely and keep json.Number handling only if upstream decoding uses UseNumber.
- Around line 252-274: The loop skips malformed parts without advancing
rowOffset, causing rowOffset to fall out of sync with rowRecords; fix by
ensuring rowOffset is advanced on every early-continue and properly advanced
after successfully parsing a part: in the branches where json.Unmarshal(part,
&m) fails, where m["data"] is missing, and where unmarshalling/empty data causes
a continue, increment rowOffset (e.g., rowOffset++ or rowOffset += expected
count) before the continue; conversely, when data is parsed successfully,
advance rowOffset by len(data) after you consume/process those rows so rowOffset
and rowRecords remain aligned (references: variables rowOffset, rowRecords,
part, m, data, pkNames).

---

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/writer_test.go`:
- Around line 136-169: The test
TestMutateMessageValueForFailpointRecordClassification only covers the string
branch of incrementOriginTSValue; add a complementary sub-case in the same test
that supplies a message whose second data object encodes _tidb_origin_ts as a
JSON number (e.g. {"pkNames":["id"],"data":[{"id":"3","_tidb_origin_ts":100}])
and call mutateMessageValueForFailpoint to assert that the mutated msg.Value now
contains the numeric form `_tidb_origin_ts`:101 (without quotes), and also
verify the mutatedRows/originTsMutatedRows expectations for that numeric case so
both the string and float64 branches of incrementOriginTSValue are exercised.

In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 355-360: Replace the heavy math/big usage in the selection of a
random column (the cryptorand.Int(cryptorand.Reader,
big.NewInt(int64(len(columns)))) call) with a bounded crypto/rand.Read-based
approach: import encoding/binary and crypto/rand, read a uint64, map it into
[0,len(columns)) using rejection sampling (discard values >= floor(2^64/len)*len
to avoid bias), then return columns[idx] as before; keep the existing fallback
return columns[0], true in the error path. This removes the math/big dependency
while preserving cryptographic randomness in the code that picks the index.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0c86f95 and ddb1a01.

📒 Files selected for processing (2)
  • downstreamadapter/sink/cloudstorage/writer.go
  • downstreamadapter/sink/cloudstorage/writer_test.go

Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
pkg/sink/failpointrecord/record.go (1)

148-153: ⚠️ Potential issue | 🔴 Critical

Race condition: disabled is read before ensureFile() synchronization.

Write does a bare read of disabled before ensureFile(), while ensureFile() may concurrently write it inside initOnce.Do.

🐛 Proposed fix
 func Write(failpoint string, rows []RowRecord) {
-	if disabled {
-		return
-	}
 	ensureFile()
-	if file == nil {
+	if disabled || file == nil {
 		return
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/failpointrecord/record.go` around lines 148 - 153, The Write path
reads the package-level variable `disabled` before calling `ensureFile()`,
causing a race because `ensureFile()` (via `initOnce.Do`) may set `disabled`; to
fix, move the `disabled` check to after `ensureFile()` and/or protect
reads/writes to `disabled` with the same synchronization used in `ensureFile()`
(e.g., perform `initOnce.Do` first inside `ensureFile()` and only then read
`disabled`), updating the `Write` function to call `ensureFile()` before
checking `disabled` (and ensure any assignment to `disabled` remains inside the
`initOnce.Do` path) so no unsynchronized read occurs.
downstreamadapter/sink/cloudstorage/encoding_group.go (1)

178-191: ⚠️ Potential issue | 🔴 Critical

Missing pre-loop event.Rewind() can yield zero records.

If the event cursor was already consumed before this function runs, the loop exits immediately and records nothing.

🐛 Proposed fix
 func dmlEventToRowRecords(event *commonEvent.DMLEvent) []failpointrecord.RowRecord {
 	if event == nil || event.TableInfo == nil {
 		return nil
 	}
+	event.Rewind()
 	indexes, columns := (&commonEvent.RowEvent{TableInfo: event.TableInfo}).PrimaryKeyColumn()
 	originTsCol, hasOriginTsCol := event.TableInfo.GetColumnInfoByName(commonEvent.OriginTsColumn)
 	originTsOffset, hasOriginTsOffset := event.TableInfo.GetColumnOffsetByName(commonEvent.OriginTsColumn)
 	rowRecords := make([]failpointrecord.RowRecord, 0, event.Len())
 	for {
 		row, ok := event.GetNextRow()
 		if !ok {
-			event.Rewind()
 			break
 		}
@@
 	}
+	event.Rewind()
 	return rowRecords
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 178 -
191, In dmlEventToRowRecords, ensure the event cursor is reset before iterating:
call event.Rewind() (or otherwise reset the iterator) right after the nil checks
and before the loop so GetNextRow() always starts from the first row; this
prevents returning zero records when the event cursor was previously
consumed—update the function dmlEventToRowRecords to invoke event.Rewind() prior
to the for { row, ok := event.GetNextRow() ... } loop (retain the existing
Rewind() call after the loop).
🧹 Nitpick comments (2)
pkg/sink/mysql/mysql_writer_dml.go (1)

194-231: Consider deduplicating row-record conversion logic across sinks.

dmlEventsToRowRecords here and dmlEventToRowRecords in downstreamadapter/sink/cloudstorage/encoding_group.go are very similar. A shared helper would reduce drift and future bug reintroduction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 194 - 231, Extract the
duplicated conversion logic in dmlEventsToRowRecords and dmlEventToRowRecords
into a single shared helper (e.g., ConvertDMLEventsToRowRecords) that accepts
the same inputs (tableInfo *common.TableInfo, events []*commonEvent.DMLEvent)
and returns []failpointrecord.RowRecord; move the core loop that extracts pkCols
(using findPKColumns), originTs (using
tableInfo.GetColumnInfoByName/GetColumnOffsetByName and
failpointrecord.NormalizeOriginTs), and builds RowRecord (CommitTs, OriginTs,
PrimaryKeys) into that helper, replace both dmlEventsToRowRecords and
dmlEventToRowRecords bodies with calls to the new helper, and ensure all callers
are updated and imports adjusted so behavior remains identical.
downstreamadapter/sink/cloudstorage/encoding_group.go (1)

156-176: Add mismatch handling for unassigned row records in splitter.

When sum(msg.GetRowsCount()) < len(rows), tail rows are dropped silently. A warning or explicit remainder handling would make failpoint evidence more reliable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 156 -
176, splitRowRecordsByMessages currently drops trailing rows when
sum(msg.GetRowsCount()) < len(rows); after the loop check if rowIdx < len(rows)
and handle the remainder explicitly: either append the leftover rows as an
additional entry in ret (so callers can see unassigned rows) or append them to
the last message slice, and emit a warning log that includes the counts
(len(messages), total requested rows sum, and len(rows)) so evidence isn't lost;
update splitRowRecordsByMessages to perform this remainder-check using the
existing variables (messages, rows, ret, rowIdx) and call the project's logger
or return a clear signal to the caller about the leftover rows.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/sink/failpointrecord/record.go`:
- Around line 27-36: The file fails the repository formatting checks
(gofumports/gci) — update formatting for the import block and entire file by
running the repository formatter: execute make fmt (which runs gci, gofumports,
shfmt) and then re-run make check; ensure the import block in record.go is
reordered/normalized and commit the resulting changes so the CI formatting check
passes.

In `@pkg/sink/mysql/mysql_writer_dml.go`:
- Around line 184-191: findPKColumns currently only checks HasPriKeyFlag and so
misses not-null unique-key handle columns; change it to call
tableInfo.GetOrderedHandleKeyColumnIDs() and iterate those column IDs to build
the []pkColInfo (using the existing tableInfo.GetColumns() slice to map each
column ID to its index and Name.O). Ensure you preserve the pkColInfo fields
(index and name) and return the columns in the ordered handle-key order
(matching the pattern used in mysql_writer_helper.go and elsewhere) so UK-only
tables are included in PrimaryKeys.

---

Duplicate comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 178-191: In dmlEventToRowRecords, ensure the event cursor is reset
before iterating: call event.Rewind() (or otherwise reset the iterator) right
after the nil checks and before the loop so GetNextRow() always starts from the
first row; this prevents returning zero records when the event cursor was
previously consumed—update the function dmlEventToRowRecords to invoke
event.Rewind() prior to the for { row, ok := event.GetNextRow() ... } loop
(retain the existing Rewind() call after the loop).

In `@pkg/sink/failpointrecord/record.go`:
- Around line 148-153: The Write path reads the package-level variable
`disabled` before calling `ensureFile()`, causing a race because `ensureFile()`
(via `initOnce.Do`) may set `disabled`; to fix, move the `disabled` check to
after `ensureFile()` and/or protect reads/writes to `disabled` with the same
synchronization used in `ensureFile()` (e.g., perform `initOnce.Do` first inside
`ensureFile()` and only then read `disabled`), updating the `Write` function to
call `ensureFile()` before checking `disabled` (and ensure any assignment to
`disabled` remains inside the `initOnce.Do` path) so no unsynchronized read
occurs.

---

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 156-176: splitRowRecordsByMessages currently drops trailing rows
when sum(msg.GetRowsCount()) < len(rows); after the loop check if rowIdx <
len(rows) and handle the remainder explicitly: either append the leftover rows
as an additional entry in ret (so callers can see unassigned rows) or append
them to the last message slice, and emit a warning log that includes the counts
(len(messages), total requested rows sum, and len(rows)) so evidence isn't lost;
update splitRowRecordsByMessages to perform this remainder-check using the
existing variables (messages, rows, ret, rowIdx) and call the project's logger
or return a clear signal to the caller about the leftover rows.

In `@pkg/sink/mysql/mysql_writer_dml.go`:
- Around line 194-231: Extract the duplicated conversion logic in
dmlEventsToRowRecords and dmlEventToRowRecords into a single shared helper
(e.g., ConvertDMLEventsToRowRecords) that accepts the same inputs (tableInfo
*common.TableInfo, events []*commonEvent.DMLEvent) and returns
[]failpointrecord.RowRecord; move the core loop that extracts pkCols (using
findPKColumns), originTs (using
tableInfo.GetColumnInfoByName/GetColumnOffsetByName and
failpointrecord.NormalizeOriginTs), and builds RowRecord (CommitTs, OriginTs,
PrimaryKeys) into that helper, replace both dmlEventsToRowRecords and
dmlEventToRowRecords bodies with calls to the new helper, and ensure all callers
are updated and imports adjusted so behavior remains identical.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ddb1a01 and b67f722.

📒 Files selected for processing (3)
  • downstreamadapter/sink/cloudstorage/encoding_group.go
  • pkg/sink/failpointrecord/record.go
  • pkg/sink/mysql/mysql_writer_dml.go

Comment on lines +27 to +36
import (
"encoding/json"
"os"
"strconv"
"sync"
"time"

"github.com/pingcap/log"
"go.uber.org/zap"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

CI is failing on formatting for this file.

make check already reports gofumports failure here; please run make fmt before merge.

As per coding guidelines **/*.go: Use gofmt to keep Go code clean; run make fmt before pushing to format code with gci, gofumports, and shfmt, plus log-style checks.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/failpointrecord/record.go` around lines 27 - 36, The file fails the
repository formatting checks (gofumports/gci) — update formatting for the import
block and entire file by running the repository formatter: execute make fmt
(which runs gci, gofumports, shfmt) and then re-run make check; ensure the
import block in record.go is reordered/normalized and commit the resulting
changes so the CI formatting check passes.

Comment on lines +184 to +191
func findPKColumns(tableInfo *common.TableInfo) []pkColInfo {
var cols []pkColInfo
for i, col := range tableInfo.GetColumns() {
if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) {
cols = append(cols, pkColInfo{index: i, name: col.Name.O})
}
}
return cols
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the actual file content around the lines mentioned
cat -n pkg/sink/mysql/mysql_writer_dml.go | sed -n '180,195p'

Repository: pingcap/ticdc

Length of output: 650


🏁 Script executed:

# Check if the file exists and get its total size
wc -l pkg/sink/mysql/mysql_writer_dml.go

Repository: pingcap/ticdc

Length of output: 95


🏁 Script executed:

# Search for the findPKColumns function and surrounding context
rg "findPKColumns" -A 10 -B 5

Repository: pingcap/ticdc

Length of output: 2129


🏁 Script executed:

# Search for the suggested methods to verify they exist
rg "GetOrderedHandleKeyColumnIDs|GetColumnInfo|GetColumnOffsetByName" --max-count 20

Repository: pingcap/ticdc

Length of output: 4308


🏁 Script executed:

# Look for how handle keys are used elsewhere in the codebase
rg "GetHandleKeyColumnIDs|HandleKeyColumnIDs" -A 3 -B 3

Repository: pingcap/ticdc

Length of output: 5062


🏁 Script executed:

# Check the common.TableInfo definition to understand available methods
fd "common" --type f -name "*.go" | head -20

Repository: pingcap/ticdc

Length of output: 287


🏁 Script executed:

# Search for pkColInfo definition to understand its usage
rg "pkColInfo" -B 5 -A 10

Repository: pingcap/ticdc

Length of output: 1885


🏁 Script executed:

# Check how dmlEventsToRowRecords uses the pkCols result
rg "dmlEventsToRowRecords" -A 30 | head -60

Repository: pingcap/ticdc

Length of output: 4400


🏁 Script executed:

# Check if PrimaryKeys field in failpointrecord.RowRecord can be empty
rg "type RowRecord" -A 10 pkg/

Repository: pingcap/ticdc

Length of output: 813


🏁 Script executed:

# See the full dmlEventsToRowRecords function to understand the impact of empty pkCols
cat -n pkg/sink/mysql/mysql_writer_dml.go | sed -n '194,230p'

Repository: pingcap/ticdc

Length of output: 1584


🏁 Script executed:

# Check if there are tests for findPKColumns or dmlEventsToRowRecords
rg "TestFindPKColumns|TestDmlEventsToRowRecords|findPKColumns|dmlEventsToRowRecords" -l pkg/sink/mysql/

Repository: pingcap/ticdc

Length of output: 91


findPKColumns must use GetOrderedHandleKeyColumnIDs() to include UK handle keys.

This helper only checks HasPriKeyFlag, which excludes not-null unique key handle keys. For UK-only tables, PrimaryKeys in the failpoint record becomes empty, breaking downstream violation analysis that relies on row identity matching.

Use GetOrderedHandleKeyColumnIDs() instead, which correctly includes both PK and UK handle keys, matching the pattern already used elsewhere in this file and mysql_writer_helper.go.

Proposed fix
 func findPKColumns(tableInfo *common.TableInfo) []pkColInfo {
-	var cols []pkColInfo
-	for i, col := range tableInfo.GetColumns() {
-		if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) {
-			cols = append(cols, pkColInfo{index: i, name: col.Name.O})
-		}
-	}
+	handleColIDs := tableInfo.GetOrderedHandleKeyColumnIDs()
+	cols := make([]pkColInfo, 0, len(handleColIDs))
+	for _, colID := range handleColIDs {
+		col, exist := tableInfo.GetColumnInfo(colID)
+		if !exist || col == nil {
+			continue
+		}
+		offset, ok := tableInfo.GetColumnOffsetByName(col.Name.O)
+		if !ok {
+			continue
+		}
+		cols = append(cols, pkColInfo{index: offset, name: col.Name.O})
+	}
 	return cols
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func findPKColumns(tableInfo *common.TableInfo) []pkColInfo {
var cols []pkColInfo
for i, col := range tableInfo.GetColumns() {
if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) {
cols = append(cols, pkColInfo{index: i, name: col.Name.O})
}
}
return cols
func findPKColumns(tableInfo *common.TableInfo) []pkColInfo {
handleColIDs := tableInfo.GetOrderedHandleKeyColumnIDs()
cols := make([]pkColInfo, 0, len(handleColIDs))
for _, colID := range handleColIDs {
col, exist := tableInfo.GetColumnInfo(colID)
if !exist || col == nil {
continue
}
offset, ok := tableInfo.GetColumnOffsetByName(col.Name.O)
if !ok {
continue
}
cols = append(cols, pkColInfo{index: offset, name: col.Name.O})
}
return cols
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 184 - 191, findPKColumns
currently only checks HasPriKeyFlag and so misses not-null unique-key handle
columns; change it to call tableInfo.GetOrderedHandleKeyColumnIDs() and iterate
those column IDs to build the []pkColInfo (using the existing
tableInfo.GetColumns() slice to map each column ID to its index and Name.O).
Ensure you preserve the pkColInfo fields (index and name) and return the columns
in the ordered handle-key order (matching the pattern used in
mysql_writer_helper.go and elsewhere) so UK-only tables are included in
PrimaryKeys.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-triage-completed first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant