Skip to content

Support multi-clusters active-active data consistent checker#4246

Open
Leavrth wants to merge 6 commits intopingcap:masterfrom
Leavrth:multi-cluster-consistency-checker
Open

Support multi-clusters active-active data consistent checker#4246
Leavrth wants to merge 6 commits intopingcap:masterfrom
Leavrth:multi-cluster-consistency-checker

Conversation

@Leavrth
Copy link

@Leavrth Leavrth commented Feb 24, 2026

What problem does this PR solve?

Issue Number: close #4244

What is changed and how it works?

This PR introduces a consistency validation tool for multi-cluster active-active TiCDC deployments. It connects to each cluster’s PD(etcd) and S3 sink, verifies critical changefeed settings (for example canal-json, date-separator=day, and expected file index width), then continuously watches checkpoints and sink files to advance a unified time window across clusters. Within each window, the checker compares local writes and replicated records by primary key and timestamp to detect anomalies such as missing data, redundant data, out-of-order updates, and LWW violations. The recorder persists checkpoints and reports atomically, so the process is resumable after failures and suitable for long-running, auditable consistency monitoring.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

None

Summary by CodeRabbit

  • New Features

    • Multi-cluster consistency checker CLI for active-active setups (dry-run supported).
    • Time-window coordination across clusters with S3-backed incremental consumption and canal‑JSON decoding.
    • Comprehensive checks: data loss, inconsistency, redundancy, and Last-Writer-Wins violations with per-round reports.
  • Durability & Config

    • Durable checkpointing and atomic report rotation with corruption recovery.
    • New config loader and example TOML for multi-cluster setups.
  • Tests

    • Extensive unit, integration, and end-to-end test suites across checker, consumer, decoder, watcher, recorder, config, and integration harness.

This PR introduces a consistency validation tool for multi-cluster active-active TiCDC deployments. It connects to each cluster’s PD(etcd) and S3 sink, verifies critical changefeed settings (for example canal-json, date-separator=day, and expected file index width), then continuously watches checkpoints and sink files to advance a unified time window across clusters. Within each window, the checker compares local writes and replicated records by primary key and timestamp to detect anomalies such as missing data, redundant data, out-of-order updates, and LWW violations. The recorder persists checkpoints and reports atomically, so the process is resumable after failures and suitable for long-running, auditable consistency monitoring.
@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-triage-completed release-note-none Denotes a PR that doesn't merit a release note. first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. labels Feb 24, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 24, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign hicqu for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Feb 24, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 24, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a new multi-cluster active-active consistency checker: CLI, config, advancer, S3 consumer & decoder, checkpoint & S3 watchers, core checker, durable recorder, task orchestration, mocks/integration tests, and many unit/integration tests.

Changes

Cohort / File(s) Summary
Build
Makefile
Adds multi-cluster-consistency-checker build target producing bin/multi-cluster-consistency-checker.
CLI & Entry
cmd/multi-cluster-consistency-checker/main.go, cmd/.../main_test.go
New CLI entrypoint, exit codes, signal handling, error mapping and unit tests.
Configuration
cmd/.../config/config.go, cmd/.../config/config_test.go, cmd/.../config/config.example.toml
New TOML config structs, loader, validation (global, clusters, peer mappings), tests and example config.
Core Types
cmd/.../types/types.go, cmd/.../types/types_test.go
Domain types: PK, versioning, time-windows, incremental data and small unit tests.
Time-window Advancer
cmd/.../advancer/time_window_advancer.go, cmd/.../advancer/time_window_advancer_test.go
New TimeWindowAdvancer coordinating checkpoint watchers, PD clients, and S3 watchers to produce per-cluster TimeWindowData; concurrency and tests.
S3 Consumer
cmd/.../consumer/*, consumer_test.go
S3Consumer: schema & DML discovery, version/index tracking, parallel downloads, incremental data assembly, checkpoint init and tests.
Decoder
cmd/.../decoder/decoder.go, decoder_test.go, value_to_datum_test.go
Canal-JSON decoder producing Record objects, type conversions and extensive tests.
Watchers
cmd/.../watcher/checkpoint_watcher.go, cmd/.../watcher/s3_watcher.go, *_test.go
etcd-based CheckpointWatcher and S3Watcher adapter integrating checkpoint + S3 consumer; includes tests and error handling.
Checker (core)
cmd/.../checker/checker.go, checker_test.go, failpoint.go
DataChecker and per-cluster caches to detect data loss, redundancy, inconsistency, and LWW violations; failpoint recorder and tests.
Recorder & Reporting
cmd/.../recorder/*, recorder/types.go, *_test.go
Recorder for durable atomic checkpoint/report persistence, corruption detection, deterministic report marshaling and tests.
Task Orchestration
cmd/.../task.go
Client/watchers initialization, etcd-based S3 sink validation, lifecycle orchestration and cleanup.
Integration & Mocks
cmd/.../integration/*, cmd/.../integration/mock_cluster.go
End-to-end integration tests and MockMultiCluster scaffolding (mock PD, watchers, in-memory S3).
pkg/sink cloudstorage
pkg/sink/cloudstorage/path_key.go, pkg/sink/cloudstorage/path_key_test.go
Adds ParseDMLFilePath for DML file path parsing; adjusts related tests.

Sequence Diagram(s)

sequenceDiagram
    participant CLI as CLI Main
    participant Config as Config Loader
    participant Init as Client Init
    participant PD as PD/Etcd Clients
    participant Adv as TimeWindow Advancer
    participant S3 as S3 Watcher/Consumer
    participant Checker as Data Checker
    participant Recorder as Recorder

    CLI->>Config: LoadConfig(path)
    CLI->>Init: initClients(config)
    Init->>PD: create PD/etcd clients & watchers
    loop Each Round
        Adv->>PD: Advance/checkpoint & GetPDTS()
        Adv->>S3: AdvanceS3CheckpointTs / ConsumeNewFiles()
        S3-->>Adv: IncrementalData + MaxVersion
        Adv-->>Checker: TimeWindowData
        Checker->>Checker: decode & detect (loss, redundancy, inconsistency, LWW)
        Checker-->>Recorder: Report & time-window
        Recorder->>Recorder: persist checkpoint & reports
    end
Loading
sequenceDiagram
    participant Adv as TimeWindow Advancer
    participant CW as CheckpointWatcher
    participant S3 as S3Consumer
    participant PD as PD Client
    participant State as Internal State

    Adv->>CW: AdvanceCheckpointTs(minTs)
    CW-->>Adv: latest checkpoint when >= minTs
    Adv->>S3: ConsumeNewFiles()
    S3-->>Adv: incremental files + max versions
    Adv->>PD: getPDTsFromCluster / getPDTsFromOtherClusters
    PD-->>Adv: pd timestamps
    Adv->>State: update triplets, boundaries, PDTimestampAfterTimeWindow
    Adv-->>Adv: compose TimeWindowData
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~95 minutes

Suggested labels

lgtm, approved

Suggested reviewers

  • wk989898
  • hongyunyan
  • asddongmen
  • flowbehappy

Poem

🐰 I hopped through code where clusters align,

Windows advanced as S3 files combined,
Checkpoints sealed tight, PD timestamps in tow,
Reports tucked away where round-robin logs grow,
A nibble of carrot for checks that just flow 🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Support multi-clusters active-active data consistent checker' clearly and accurately summarizes the main feature addition in the changeset.
Description check ✅ Passed The PR description provides a clear explanation of the problem, the solution, and includes the required 'Issue Number: close #4244' statement, though it uses the template structure somewhat minimally.
Linked Issues check ✅ Passed The PR implements all core requirements from issue #4244: multi-cluster consistency checking, LWW violation detection, changefeed validation, time-window-based comparison, checkpoint/report persistence, and resumable operation.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing the multi-cluster consistency checker. The only tangential addition is a method in pkg/sink/cloudstorage/path_key.go for DML file path parsing, which is necessary infrastructure for the checker's file consumption.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @Leavrth, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a robust multi-cluster consistency checker for TiCDC's active-active replication. The tool systematically monitors data flow across clusters by analyzing S3 sink data and changefeed checkpoints. It's designed to proactively identify and report various data inconsistencies, ensuring data integrity and providing a reliable mechanism for continuous validation in complex distributed environments.

Highlights

  • New Consistency Checker Tool: Introduced a dedicated tool for validating data consistency in multi-cluster active-active TiCDC deployments.
  • Comprehensive Data Validation: The checker connects to PD and S3 sinks, verifies critical changefeed settings, and continuously monitors checkpoints and sink files to detect anomalies.
  • Anomaly Detection: Identifies various data discrepancies including missing data, redundant data, out-of-order updates, and Last Write Wins (LWW) violations by comparing local writes and replicated records.
  • Resumable and Auditable Monitoring: The tool features atomic persistence of checkpoints and reports, ensuring that the consistency monitoring process is resumable after failures and provides auditable results.
  • Robust Configuration and Client Management: Includes a flexible TOML-based configuration system with validation, and manages connections to PD, etcd, and S3 with graceful shutdown and error handling.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • Makefile
    • Added a new build target for the multi-cluster-consistency-checker.
  • cmd/multi-cluster-consistency-checker/advancer/time_window_advancer.go
    • Implemented the TimeWindowAdvancer to manage and advance time windows across multiple clusters, handling checkpoint and S3 data progression.
  • cmd/multi-cluster-consistency-checker/advancer/time_window_advancer_test.go
    • Added unit tests for TimeWindowAdvancer, including mocks for PD clients and watchers, and a comprehensive test for advancing multiple rounds.
  • cmd/multi-cluster-consistency-checker/checker/checker.go
    • Implemented the core DataChecker logic, including clusterViolationChecker for LWW, tableDataCache, and timeWindowDataCache for detecting data loss, inconsistency, and redundancy.
  • cmd/multi-cluster-consistency-checker/checker/checker_test.go
    • Added unit tests for the DataChecker, covering various scenarios like data loss, inconsistency, redundancy, and LWW violations across multiple rounds.
  • cmd/multi-cluster-consistency-checker/config/config.example.toml
    • Added an example TOML configuration file for the consistency checker, detailing global and cluster-specific settings.
  • cmd/multi-cluster-consistency-checker/config/config.go
    • Defined the configuration structures (Config, GlobalConfig, ClusterConfig) and implemented the LoadConfig function with validation logic.
  • cmd/multi-cluster-consistency-checker/config/config_test.go
    • Added unit tests for the configuration loading and validation logic, covering valid and invalid configurations.
  • cmd/multi-cluster-consistency-checker/consumer/consumer.go
    • Implemented the S3Consumer to read and process DML and schema files from S3, including logic for managing schema definitions and DML file indices.
  • cmd/multi-cluster-consistency-checker/consumer/consumer_test.go
    • Added unit tests for the S3Consumer, covering DML index mapping, schema definition handling, and checkpoint initialization.
  • cmd/multi-cluster-consistency-checker/decoder/decoder.go
    • Implemented a Canal-JSON decoder to parse change data capture events from S3 files, including logic for extracting primary keys and column values.
  • cmd/multi-cluster-consistency-checker/decoder/decoder_test.go
    • Added unit tests for the Canal-JSON decoder, including tests for parsing various data types and handling malformed messages.
  • cmd/multi-cluster-consistency-checker/decoder/value_to_datum_test.go
    • Added comprehensive unit tests for the valueToDatum utility function, covering conversion of various MySQL data types.
  • cmd/multi-cluster-consistency-checker/integration/integration_test.go
    • Added integration tests for the multi-cluster consistency checker, simulating various scenarios including consistent data flow, data loss, inconsistency, redundancy, and LWW violations.
  • cmd/multi-cluster-consistency-checker/integration/mock_cluster.go
    • Implemented mock components for multi-cluster testing, including mock PD clients, checkpoint watchers, S3 watchers, and in-memory S3 storage.
  • cmd/multi-cluster-consistency-checker/main.go
    • Implemented the main entry point for the consistency checker, including command-line argument parsing, logging setup, and graceful shutdown handling.
  • cmd/multi-cluster-consistency-checker/main_test.go
    • Added unit tests for the main package, including tests for ExitError handling and S3 bucket prefix validation.
  • cmd/multi-cluster-consistency-checker/recorder/recorder.go
    • Implemented the Recorder to manage and persist reports and checkpoints, ensuring atomic writes and cleanup of old reports.
  • cmd/multi-cluster-consistency-checker/recorder/recorder_test.go
    • Added unit tests for the Recorder, covering checkpoint persistence, report flushing, and error handling for corrupted checkpoints.
  • cmd/multi-cluster-consistency-checker/recorder/types.go
    • Defined data structures for reporting consistency issues (DataLossItem, DataInconsistentItem, DataRedundantItem, LWWViolationItem) and for checkpointing progress.
  • cmd/multi-cluster-consistency-checker/recorder/types_test.go
    • Added unit tests for the recorder's types, focusing on string representation and data structure integrity.
  • cmd/multi-cluster-consistency-checker/task.go
    • Implemented the main task logic for the consistency checker, including client initialization, configuration validation, and the main loop for advancing time windows and checking data.
  • cmd/multi-cluster-consistency-checker/types/types.go
    • Defined core types used across the consistency checker, such as PkType, CdcVersion, SchemaTableKey, VersionKey, TimeWindow, TimeWindowData, and IncrementalData.
  • cmd/multi-cluster-consistency-checker/types/types_test.go
    • Added unit tests for the core types, specifically CdcVersion.GetCompareTs.
  • cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go
    • Implemented the CheckpointWatcher to monitor changefeed checkpoint timestamps from etcd, supporting retries and task notification.
  • cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go
    • Added unit tests for the CheckpointWatcher, covering various scenarios like waiting for updates, context cancellation, and error handling.
  • cmd/multi-cluster-consistency-checker/watcher/s3_watcher.go
    • Implemented the S3Watcher to integrate with the S3Consumer and CheckpointWatcher for monitoring S3 sink data.
  • cmd/multi-cluster-consistency-checker/watcher/s3_watcher_test.go
    • Added unit tests for the S3Watcher, covering its initialization, closing, and delegation to underlying components.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive data consistency checker for multi-cluster active-active deployments. The implementation is robust, with thorough handling of configuration, state persistence via checkpoints, and graceful shutdown. The testing strategy is excellent, covering unit, integration, and various failure scenarios.

I've identified a few areas for improvement, mainly related to performance and code clarity. Specifically, there are opportunities to reduce redundant RPC calls to PD and to optimize S3 directory listings. I've also suggested a small refactoring to simplify some of the checking logic.

Overall, this is a high-quality contribution that adds a valuable tool for ensuring data consistency in complex deployments.

opt := &storage.WalkOption{
SubDir: metaSubDir,
ObjPrefix: "schema_",
// TODO: StartAfter: startVersionKey,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The WalkDir calls in this file do not use the StartAfter option in storage.WalkOption, which can lead to re-listing files that have already been processed in previous rounds. The author has noted this with a TODO.

Using StartAfter would be a good performance optimization. For example, in downloadSchemaFilesWithScanRange, you can set opt.StartAfter = startVersionKey. This applies to other WalkDir calls in this file as well, such as in getNewFilesForSchemaPathKeyWithEndPath and discoverAndDownloadNewTableVersions.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🧹 Nitpick comments (11)
cmd/multi-cluster-consistency-checker/recorder/types_test.go (1)

247-351: MarshalReport tests will become flaky if any multi-cluster scenario is added.

r.ClusterReports is a map[string]*ClusterReport, so iterating it in MarshalReport produces non-deterministic cluster ordering. All current tests are safe only because they include at most one non-empty cluster per report. Adding a second non-empty cluster to any test (or to the production code's output) will cause intermittent failures.

The production MarshalReport should sort cluster IDs before building the output string, e.g.:

// In MarshalReport (types.go), replace map range with sorted keys:
clusterIDs := make([]string, 0, len(r.ClusterReports))
for id := range r.ClusterReports {
    clusterIDs = append(clusterIDs, id)
}
slices.Sort(clusterIDs)
for _, id := range clusterIDs {
    cr := r.ClusterReports[id]
    // ...
}

Once the production method is stable, future tests with multiple non-empty clusters will be reliable. Based on learnings: favor deterministic tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go` around lines
247 - 351, MarshalReport iterates r.ClusterReports (a map) which yields
non-deterministic cluster ordering; make iteration deterministic by collecting
the map keys, sorting them (e.g., using slices.Sort or sort.Strings), and then
iterating over the sorted clusterIDs to fetch each ClusterReport (reference:
MarshalReport, r.ClusterReports, ClusterReport); update the loop in
MarshalReport to use the sorted keys so tests and output are stable across runs.
Makefile (1)

177-179: multi-cluster-consistency-checker missing from .PHONY.

Similar build targets cdc, kafka_consumer, storage_consumer, pulsar_consumer, and filter_helper are all declared .PHONY. The new target and the existing config-converter/oauth2_server targets are inconsistently absent. While functionally harmless (no such file exists in the root), it breaks the convention set by the other binary targets.

🔧 Suggested fix
-.PHONY: clean fmt check tidy \
+.PHONY: clean fmt check tidy multi-cluster-consistency-checker \
 	generate-protobuf generate_mock \
 	cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@Makefile` around lines 177 - 179, Add the new make target name
multi-cluster-consistency-checker to the .PHONY declaration so it follows the
existing convention; update the .PHONY list that currently includes cdc,
kafka_consumer, storage_consumer, pulsar_consumer, filter_helper (and note
config-converter/oauth2_server are also missing) to include
multi-cluster-consistency-checker to maintain consistency across binary build
targets.
cmd/multi-cluster-consistency-checker/config/config_test.go (1)

251-274: Missing test coverage: single cluster with no peers should arguably be rejected.

The LoadConfig code only rejects zero clusters; a configuration with exactly one cluster passes validation even though the checker is meaningless in that scenario. The test for "missing pd-addrs" happens to use a single cluster, but only because pd-addrs fires first — it doesn't exercise the "single cluster accepted" path. Consider adding a test case and a minimum-cluster count check (≥ 2) in LoadConfig.

📋 Suggested test case
t.Run("single cluster is rejected", func(t *testing.T) {
    t.Parallel()
    tmpDir := t.TempDir()
    configPath := filepath.Join(tmpDir, "config.toml")
    configContent := `
[global]
data-dir = "/tmp/data"
  [global.tables]
  schema1 = ["table1"]

[clusters]
  [clusters.cluster1]
  pd-addrs = ["127.0.0.1:2379"]
  s3-sink-uri = "s3://bucket/cluster1/"
  s3-changefeed-id = "s3-cf-1"
`
    require.NoError(t, os.WriteFile(configPath, []byte(configContent), 0644))
    cfg, err := LoadConfig(configPath)
    require.Error(t, err)
    require.Nil(t, cfg)
    require.Contains(t, err.Error(), "at least two clusters must be configured")
})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/config/config_test.go` around lines 251
- 274, The config loader currently allows a single cluster which makes the
checker meaningless; update LoadConfig to validate that the parsed clusters map
has length >= 2 and return an error like "at least two clusters must be
configured" when fewer are present, then add a unit test in config_test.go
(e.g., a new t.Run "single cluster is rejected") that writes a config with one
cluster (use pd-addrs, s3-sink-uri, s3-changefeed-id) and asserts LoadConfig
returns an error, nil cfg, and that the error message contains the new "at least
two clusters must be configured" text to cover the single-cluster rejection
path.
cmd/multi-cluster-consistency-checker/decoder/decoder_test.go (1)

81-88: Reversed require.Equal argument order.

testify/require.Equal convention is (t, expected, actual). The assertions here pass actualRecord as the first (expected) argument and expectedRecord as the second (actual). This won't affect pass/fail correctness, but failure messages will swap "expected" and "got", making debugging confusing.

The same pattern appears in TestCanalJSONDecoder2 (lines 121–127) and partially in TestCanalJSONDecoderWithInvalidMessage (lines 143–146).

Example fix for TestCanalJSONDecoder1
 	for i, actualRecord := range records {
 		expectedRecord := ExpectedRecords1[i]
-		require.Equal(t, actualRecord.Pk, expectedRecord.Pk)
-		require.Equal(t, actualRecord.PkStr, expectedRecord.PkStr)
-		require.Equal(t, actualRecord.ColumnValues, expectedRecord.ColumnValues)
-		require.Equal(t, actualRecord.CdcVersion.CommitTs, expectedRecord.CdcVersion.CommitTs)
-		require.Equal(t, actualRecord.CdcVersion.OriginTs, expectedRecord.CdcVersion.OriginTs)
+		require.Equal(t, expectedRecord.Pk, actualRecord.Pk)
+		require.Equal(t, expectedRecord.PkStr, actualRecord.PkStr)
+		require.Equal(t, expectedRecord.ColumnValues, actualRecord.ColumnValues)
+		require.Equal(t, expectedRecord.CdcVersion.CommitTs, actualRecord.CdcVersion.CommitTs)
+		require.Equal(t, expectedRecord.CdcVersion.OriginTs, actualRecord.CdcVersion.OriginTs)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/decoder/decoder_test.go` around lines
81 - 88, The require.Equal calls in the decoder tests pass actual then expected
which flips failure messages; update assertions in TestCanalJSONDecoder1 (the
loop using records and ExpectedRecords1), TestCanalJSONDecoder2 (the analogous
loop using ExpectedRecords2), and the checks in
TestCanalJSONDecoderWithInvalidMessage to call require.Equal(t, expected,
actual) — e.g., swap arguments so ExpectedRecordsX.* (ExpectedRecords1/2,
expectedRecord.Pk, expectedRecord.PkStr, expectedRecord.ColumnValues,
expectedRecord.CdcVersion.CommitTs, expectedRecord.CdcVersion.OriginTs) are the
second-to-last (expected) parameter and actualRecord.* are the last (actual)
parameter in each require.Equal invocation.
cmd/multi-cluster-consistency-checker/config/config.go (1)

78-81: Consider wrapping the TOML decode error with errors.Trace.

Per coding guidelines, errors from third-party/library calls should be wrapped with errors.Trace(err) to attach a stack trace. The toml.DecodeFile error on line 78 is wrapped with fmt.Errorf + %w instead.

This is minor since the caller (main.go) will print the error directly, but for consistency with the rest of the codebase it's worth aligning.

As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/config/config.go` around lines 78 - 81,
The toml.DecodeFile call returns a third-party error that should be wrapped with
errors.Trace to attach a stack trace; change the error return in the block
handling toml.DecodeFile (currently using fmt.Errorf with %w) to wrap the
original err via errors.Trace(err) (e.g. return nil, fmt.Errorf("failed to
decode config file: %w", errors.Trace(err))) so the DecodeFile error is traced
per project conventions; update the import if errors is not already imported.
cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go (2)

60-61: time.Sleep-based synchronization is inherently flaky.

There are ~16 time.Sleep calls used to synchronize with goroutines. While common in test code, these are non-deterministic and can cause flaky failures under CI load. Consider introducing a polling/retry helper or condition variable so tests wait for the precise state rather than guessing a duration.

This is not blocking, but worth addressing over time to reduce CI noise.

Also applies to: 98-99, 110-111, 165-166, 178-179, 219-220, 230-231, 285-286, 304-305, 324-325, 394-395, 440-441, 446-447, 488-489, 524-525, 536-537

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go`
around lines 60 - 61, The tests use time.Sleep calls to wait for the watcher to
initialize and other goroutine-driven states, which is flaky; replace these
sleeps with a deterministic polling/retry helper (e.g., waitForCondition or
waitUntil) that accepts a timeout and periodically checks a predicate until
true, and then use it wherever the file currently calls time.Sleep (instances in
checkpoint_watcher_test.go around watcher initialization and subsequent
synchronization points). Locate the places referencing the watcher
initialization and goroutine coordination in checkpoint_watcher_test.go and
update them to call the helper with a short tick interval and a reasonable
timeout so tests wait for the exact condition (e.g., watcher started, checkpoint
processed) instead of sleeping.

23-23: Deprecated gomock import path — migrate to maintained fork.

github.com/golang/mock was archived (June 27, 2023) and is no longer maintained. The actively maintained fork is go.uber.org/mock, maintained by Uber. Update the import from github.com/golang/mock/gomock to go.uber.org/mock/gomock, and regenerate mocks using go install go.uber.org/mock/mockgen@latest.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go` at
line 23, The test imports the archived gomock path; update the import in
checkpoint_watcher_test.go from "github.com/golang/mock/gomock" to
"go.uber.org/mock/gomock" and update any references to gomock.NewController or
gomock.Expect* accordingly, then regenerate mocks with "go install
go.uber.org/mock/mockgen@latest" and re-run mock generation (mockgen) so
generated mocks reference the new module path; ensure CI/go.mod/go.sum are
updated and tests compile.
cmd/multi-cluster-consistency-checker/watcher/s3_watcher.go (1)

64-65: Resolve the pending S3 index update TODO.

The TODO suggests missing functionality for index updates from S3; please either implement it or track it with an issue so the watcher doesn’t fall behind.

If you want, I can draft the implementation or open a follow-up issue.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/watcher/s3_watcher.go` around lines 64
- 65, The TODO indicates we never apply the S3-derived index updates after
calling sw.consumer.ConsumeNewFiles; implement logic immediately after the
ConsumeNewFiles call to merge newData into the watcher's index and advance
per-key versions using maxVersionMap (e.g., iterate newData and upsert into the
internal index structure, then set per-key versions from maxVersionMap), handle
and return errors from the index update, and add retries/logging as needed; if
you cannot implement now, replace the TODO with a created-issue reference and a
clear TODO note so the watcher does not silently fall behind.
cmd/multi-cluster-consistency-checker/checker/checker.go (1)

402-418: Zap field keys contain spaces — prefer camelCase.

Fields like zap.String("local cluster ID", ...) and zap.String("replicated cluster ID", ...) use spaces. The zap convention and structured logging best practice is to use camelCase keys (e.g., "localClusterID"), which are easier to query in log aggregation systems.

Example diff
-						zap.String("local cluster ID", cd.clusterID),
-						zap.String("replicated cluster ID", replicatedClusterID),
+						zap.String("localClusterID", cd.clusterID),
+						zap.String("replicatedClusterID", replicatedClusterID),

As per coding guidelines, "Use structured logs via github.com/pingcap/log with zap fields in Go; log message strings should not include function names and should avoid hyphens (use spaces instead)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/checker/checker.go` around lines 402 -
418, The zap log field keys in the log.Debug call inside the loop (involving
cd.timeWindowDataCaches, cd.checkedRecordsCount,
checker.FindClusterReplicatedData and cd.lwwSkippedRecordsCount) use
spaces—replace them with camelCase keys (e.g., change "local cluster ID" to
"localClusterID" and "replicated cluster ID" to "replicatedClusterID") so
structured logs follow conventions; update the zap.String field keys in that
log.Debug invocation to camelCase (keeping other keys like "schemaKey", "pk",
and "commitTs" consistent) and run a quick grep to ensure no other nearby log
usages use spaced keys.
cmd/multi-cluster-consistency-checker/recorder/types.go (1)

374-409: ToScanRange assumes tables present in older checkpoints are always in the newest.

Lines 392–396 and 402–407 return an error if a table from CheckpointItems[1] or [0] is not found in the result (seeded from [2]). This is correct under normal operation — tables shouldn't disappear from newer checkpoints. However, if a DDL drops a table between rounds, this would surface as an error on recovery. If table drops are expected, consider gracefully skipping missing keys.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/recorder/types.go` around lines 374 -
409, ToScanRange currently seeds result from c.CheckpointItems[2] and then
treats missing SchemaTableKey entries from c.CheckpointItems[1] and [0] as fatal
errors; change the logic in the loops that iterate over
c.CheckpointItems[1].ClusterInfo[clusterID].MaxVersion and
c.CheckpointItems[0].ClusterInfo[clusterID].MaxVersion to skip entries that are
not found in result (instead of returning errors.Errorf), so dropped tables do
not cause recovery to fail; keep the initial seeding from CheckpointItems[2] and
the updates to StartVersionKey/StartDataPath when present, and also ensure you
check for nil ClusterInfo[clusterID] before iterating to avoid panics.
cmd/multi-cluster-consistency-checker/consumer/consumer.go (1)

306-377: TODO comments for StartAfter optimization.

Lines 317, 429, 499, 550 contain // TODO: StartAfter: hints. These are important for performance in production S3 buckets with many objects — without StartAfter, every WalkDir scans from the beginning of the prefix.

Would you like me to open an issue to track implementing the StartAfter optimization across these WalkDir calls?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go` around lines 306
- 377, The WalkDir scans start from the prefix every time because
WalkOption.StartAfter is not set; update the WalkOption in
downloadSchemaFilesWithScanRange (and the other TODO'd WalkDir usages in
consumer.go) to set StartAfter to the last-known object key (use startVersionKey
or the previous highest schema key as appropriate) so S3 listing resumes after
that key; pass this StartAfter in the storage.WalkOption used by
c.s3Storage.WalkDir and ensure the logic still handles ErrWalkDirEnd and edge
cases where StartAfter may be empty.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/multi-cluster-consistency-checker/checker/checker.go`:
- Around line 547-559: The call to clusterDataChecker.InitializeFromCheckpoint
inside DataChecker.initializeFromCheckpoint currently ignores the returned
error, so parsing failures (e.g., from decoder.Decode) are swallowed; change
DataChecker.initializeFromCheckpoint to return an error and propagate any error
returned by clusterDataChecker.InitializeFromCheckpoint (referencing
DataChecker.initializeFromCheckpoint and
clusterDataChecker.InitializeFromCheckpoint), ensuring you preserve existing nil
checks on checkpoint and CheckpointItems[2]; then update NewDataChecker to
handle and return this error from initializeFromCheckpoint so callers can react
to corrupted checkpoints.
- Around line 344-377: The direct interface{} comparisons in diffColumns (and in
EqualReplicatedRecord) are unsafe because JSON may produce non-comparable types
(maps/slices); replace the != checks with a deep-equality check (e.g., use
reflect.DeepEqual) or otherwise normalize/serialize JSON values before
comparing; update the comparisons that build recorder.InconsistentColumn for
local.ColumnValues and replicated.ColumnValues to use reflect.DeepEqual (or an
equivalent canonical-JSON comparison) and ensure the reflect package is imported
and used consistently in both diffColumns and EqualReplicatedRecord to avoid
runtime panics when comparing slices/maps.

In `@cmd/multi-cluster-consistency-checker/consumer/consumer_test.go`:
- Around line 231-232: The test setup passes nil to SetSchemaDefinition in the
"remove with condition" subtests which risks panics if SetSchemaDefinition
dereferences td; update those calls to pass an empty table definition instead by
replacing nil with &cloudstorage.TableDefinition{} for both occurrences (the
SetSchemaDefinition invocations used alongside key1/key2 in the "remove with
condition" subtests) so the tests use a safe, consistent empty struct like the
existing "set and get empty table definition" subtest.
- Around line 395-397: The mockS3Storage.ReadFile implementation incorrectly
uses the zero map value and returns m.sortedFiles[0] for unknown names; update
ReadFile to first check for existence with the map lookup pattern (val, ok :=
m.fileOffset[name]) and if !ok return an explicit error (e.g., fmt.Errorf or
errors.New) indicating missing file name, otherwise return
m.sortedFiles[val].content, nil; reference the mockS3Storage.ReadFile method,
m.fileOffset map and m.sortedFiles slice when making the change.

In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go`:
- Around line 627-651: The loop spawns an unbounded goroutine per task (using
errgroup eg and egCtx) which can exhaust resources; call eg.SetLimit(N) before
starting goroutines to cap concurrent downloads (choose N from a new or existing
configurable field on the consumer, e.g., c.downloadConcurrency or a constant),
and adjust the fileContents channel buffer to be min(len(tasks), N) to avoid
blocking; apply the same pattern to the other download helpers
(downloadSchemaFiles, downloadDataFilesWithScanRange,
downloadNewFilesWithVersions), and keep using c.s3Storage.ReadFile/egCtx inside
the limited goroutines.

In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go`:
- Around line 190-193: The log message in the json.Unmarshal error path uses a
hyphenated string and includes a function name; update the log.Error calls (the
one around json.Unmarshal in decoder.go that sets d.msg = nil and the similar
call at the later occurrence around lines 328–329) to remove hyphens and any
function names—e.g., change "canal-json decoder unmarshal data failed" to a
plain descriptive message like "canal json decoder unmarshal failed" (preserve
the zap.Error(err) and zap.ByteString("data", encodedData) fields) so the
message follows the logging guidelines.

In `@cmd/multi-cluster-consistency-checker/integration/mock_cluster.go`:
- Around line 135-141: The calls to the external storage WriteFile (inside the
loop over mc.S3Storages and mc.Tables where path := fmt.Sprintf(... ) and the
later similar block at 156-159) return raw errors from storage.ExternalStorage;
wrap those errors immediately before returning (e.g., replace `return err` with
`return errors.Trace(err)` or `return errors.WrapError(err, "...")`) so
s3.WriteFile errors are traceable and include context; apply the same wrapping
to the other s3.WriteFile/Read/Delete call sites referenced in the same
function.

In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go`:
- Around line 106-129: The code currently ignores non-NotExist errors from
os.Stat for checkpointFile and bakFile and treats them as "missing"; update both
checks to explicitly handle errors: after calling os.Stat(checkpointFile) and
os.Stat(bakFile) if err != nil then if !os.IsNotExist(err) return
errors.Trace(err) so permission/I/O errors are propagated instead of falling
back or proceeding, preserving the existing ReadFile/json.Unmarshal logic when
os.IsNotExist(err) is true; adjust the blocks around checkpointFile and bakFile
checks (the variables checkpointFile, bakFile and the logic that
reads/unmarshals into r.checkpoint) accordingly.

In `@cmd/multi-cluster-consistency-checker/task.go`:
- Around line 282-328: The cleanupClients function currently closes PD and etcd
clients before closing checkpoint and S3 watchers; change it so
checkpointWatchers and s3Watchers are closed first (iterate checkpointWatchers
and call watcher.Close() with a debug log per watcher, and iterate s3Watchers
and call Close() with a debug log), then close etcd clients (as in the existing
etcdClients loop with error handling/logging), and finally close PD clients;
keep the existing nil checks and logging behavior but reorder the blocks inside
cleanupClients to ensure watchers are shut down before their underlying clients.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go`:
- Around line 175-180: When detecting the non-recoverable error (errors.Is(err,
errChangefeedKeyDeleted)) set cw.watchErr under cw.mu and then wake any
goroutines waiting in AdvanceCheckpointTs so they can observe the error and
return instead of hanging; specifically, after setting cw.watchErr in
checkpoint_watcher.go call the watcher’s wait/notify primitive (e.g.
cw.cond.Broadcast if a sync.Cond is used, or close a done channel if the watcher
uses a channel) while holding the lock so AdvanceCheckpointTs can see
cw.watchErr and unblock immediately.

In `@Makefile`:
- Around line 177-179: CI shows gofumports/gci formatting failures; run the
repository formatter and commit the results: execute make fmt (which runs gci,
gofumports and shfmt) to reformat all modified Go files introduced in this PR,
verify the Makefile target multi-cluster-consistency-checker is unchanged, then
add and commit the formatted files and push the commit so the formatting check
passes.

---

Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/checker/checker.go`:
- Around line 402-418: The zap log field keys in the log.Debug call inside the
loop (involving cd.timeWindowDataCaches, cd.checkedRecordsCount,
checker.FindClusterReplicatedData and cd.lwwSkippedRecordsCount) use
spaces—replace them with camelCase keys (e.g., change "local cluster ID" to
"localClusterID" and "replicated cluster ID" to "replicatedClusterID") so
structured logs follow conventions; update the zap.String field keys in that
log.Debug invocation to camelCase (keeping other keys like "schemaKey", "pk",
and "commitTs" consistent) and run a quick grep to ensure no other nearby log
usages use spaced keys.

In `@cmd/multi-cluster-consistency-checker/config/config_test.go`:
- Around line 251-274: The config loader currently allows a single cluster which
makes the checker meaningless; update LoadConfig to validate that the parsed
clusters map has length >= 2 and return an error like "at least two clusters
must be configured" when fewer are present, then add a unit test in
config_test.go (e.g., a new t.Run "single cluster is rejected") that writes a
config with one cluster (use pd-addrs, s3-sink-uri, s3-changefeed-id) and
asserts LoadConfig returns an error, nil cfg, and that the error message
contains the new "at least two clusters must be configured" text to cover the
single-cluster rejection path.

In `@cmd/multi-cluster-consistency-checker/config/config.go`:
- Around line 78-81: The toml.DecodeFile call returns a third-party error that
should be wrapped with errors.Trace to attach a stack trace; change the error
return in the block handling toml.DecodeFile (currently using fmt.Errorf with
%w) to wrap the original err via errors.Trace(err) (e.g. return nil,
fmt.Errorf("failed to decode config file: %w", errors.Trace(err))) so the
DecodeFile error is traced per project conventions; update the import if errors
is not already imported.

In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go`:
- Around line 306-377: The WalkDir scans start from the prefix every time
because WalkOption.StartAfter is not set; update the WalkOption in
downloadSchemaFilesWithScanRange (and the other TODO'd WalkDir usages in
consumer.go) to set StartAfter to the last-known object key (use startVersionKey
or the previous highest schema key as appropriate) so S3 listing resumes after
that key; pass this StartAfter in the storage.WalkOption used by
c.s3Storage.WalkDir and ensure the logic still handles ErrWalkDirEnd and edge
cases where StartAfter may be empty.

In `@cmd/multi-cluster-consistency-checker/decoder/decoder_test.go`:
- Around line 81-88: The require.Equal calls in the decoder tests pass actual
then expected which flips failure messages; update assertions in
TestCanalJSONDecoder1 (the loop using records and ExpectedRecords1),
TestCanalJSONDecoder2 (the analogous loop using ExpectedRecords2), and the
checks in TestCanalJSONDecoderWithInvalidMessage to call require.Equal(t,
expected, actual) — e.g., swap arguments so ExpectedRecordsX.*
(ExpectedRecords1/2, expectedRecord.Pk, expectedRecord.PkStr,
expectedRecord.ColumnValues, expectedRecord.CdcVersion.CommitTs,
expectedRecord.CdcVersion.OriginTs) are the second-to-last (expected) parameter
and actualRecord.* are the last (actual) parameter in each require.Equal
invocation.

In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go`:
- Around line 247-351: MarshalReport iterates r.ClusterReports (a map) which
yields non-deterministic cluster ordering; make iteration deterministic by
collecting the map keys, sorting them (e.g., using slices.Sort or sort.Strings),
and then iterating over the sorted clusterIDs to fetch each ClusterReport
(reference: MarshalReport, r.ClusterReports, ClusterReport); update the loop in
MarshalReport to use the sorted keys so tests and output are stable across runs.

In `@cmd/multi-cluster-consistency-checker/recorder/types.go`:
- Around line 374-409: ToScanRange currently seeds result from
c.CheckpointItems[2] and then treats missing SchemaTableKey entries from
c.CheckpointItems[1] and [0] as fatal errors; change the logic in the loops that
iterate over c.CheckpointItems[1].ClusterInfo[clusterID].MaxVersion and
c.CheckpointItems[0].ClusterInfo[clusterID].MaxVersion to skip entries that are
not found in result (instead of returning errors.Errorf), so dropped tables do
not cause recovery to fail; keep the initial seeding from CheckpointItems[2] and
the updates to StartVersionKey/StartDataPath when present, and also ensure you
check for nil ClusterInfo[clusterID] before iterating to avoid panics.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go`:
- Around line 60-61: The tests use time.Sleep calls to wait for the watcher to
initialize and other goroutine-driven states, which is flaky; replace these
sleeps with a deterministic polling/retry helper (e.g., waitForCondition or
waitUntil) that accepts a timeout and periodically checks a predicate until
true, and then use it wherever the file currently calls time.Sleep (instances in
checkpoint_watcher_test.go around watcher initialization and subsequent
synchronization points). Locate the places referencing the watcher
initialization and goroutine coordination in checkpoint_watcher_test.go and
update them to call the helper with a short tick interval and a reasonable
timeout so tests wait for the exact condition (e.g., watcher started, checkpoint
processed) instead of sleeping.
- Line 23: The test imports the archived gomock path; update the import in
checkpoint_watcher_test.go from "github.com/golang/mock/gomock" to
"go.uber.org/mock/gomock" and update any references to gomock.NewController or
gomock.Expect* accordingly, then regenerate mocks with "go install
go.uber.org/mock/mockgen@latest" and re-run mock generation (mockgen) so
generated mocks reference the new module path; ensure CI/go.mod/go.sum are
updated and tests compile.

In `@cmd/multi-cluster-consistency-checker/watcher/s3_watcher.go`:
- Around line 64-65: The TODO indicates we never apply the S3-derived index
updates after calling sw.consumer.ConsumeNewFiles; implement logic immediately
after the ConsumeNewFiles call to merge newData into the watcher's index and
advance per-key versions using maxVersionMap (e.g., iterate newData and upsert
into the internal index structure, then set per-key versions from
maxVersionMap), handle and return errors from the index update, and add
retries/logging as needed; if you cannot implement now, replace the TODO with a
created-issue reference and a clear TODO note so the watcher does not silently
fall behind.

In `@Makefile`:
- Around line 177-179: Add the new make target name
multi-cluster-consistency-checker to the .PHONY declaration so it follows the
existing convention; update the .PHONY list that currently includes cdc,
kafka_consumer, storage_consumer, pulsar_consumer, filter_helper (and note
config-converter/oauth2_server are also missing) to include
multi-cluster-consistency-checker to maintain consistency across binary build
targets.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9f8e3ee and f334d70.

📒 Files selected for processing (28)
  • Makefile
  • cmd/multi-cluster-consistency-checker/advancer/time_window_advancer.go
  • cmd/multi-cluster-consistency-checker/advancer/time_window_advancer_test.go
  • cmd/multi-cluster-consistency-checker/checker/checker.go
  • cmd/multi-cluster-consistency-checker/checker/checker_test.go
  • cmd/multi-cluster-consistency-checker/config/config.example.toml
  • cmd/multi-cluster-consistency-checker/config/config.go
  • cmd/multi-cluster-consistency-checker/config/config_test.go
  • cmd/multi-cluster-consistency-checker/consumer/consumer.go
  • cmd/multi-cluster-consistency-checker/consumer/consumer_test.go
  • cmd/multi-cluster-consistency-checker/decoder/decoder.go
  • cmd/multi-cluster-consistency-checker/decoder/decoder_test.go
  • cmd/multi-cluster-consistency-checker/decoder/value_to_datum_test.go
  • cmd/multi-cluster-consistency-checker/integration/integration_test.go
  • cmd/multi-cluster-consistency-checker/integration/mock_cluster.go
  • cmd/multi-cluster-consistency-checker/main.go
  • cmd/multi-cluster-consistency-checker/main_test.go
  • cmd/multi-cluster-consistency-checker/recorder/recorder.go
  • cmd/multi-cluster-consistency-checker/recorder/recorder_test.go
  • cmd/multi-cluster-consistency-checker/recorder/types.go
  • cmd/multi-cluster-consistency-checker/recorder/types_test.go
  • cmd/multi-cluster-consistency-checker/task.go
  • cmd/multi-cluster-consistency-checker/types/types.go
  • cmd/multi-cluster-consistency-checker/types/types_test.go
  • cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go
  • cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go
  • cmd/multi-cluster-consistency-checker/watcher/s3_watcher.go
  • cmd/multi-cluster-consistency-checker/watcher/s3_watcher_test.go

Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (4)
cmd/multi-cluster-consistency-checker/recorder/recorder.go (1)

106-135: ⚠️ Potential issue | 🟠 Major

os.Stat errors other than IsNotExist are silently swallowed.

If os.Stat fails due to a permission or I/O error (not "file not found"), the code falls through to the "fresh start" path at line 138, masking real failures and potentially losing an existing checkpoint.

Suggested fix
 	// If checkpoint.json exists, use it directly.
-	if _, err := os.Stat(checkpointFile); err == nil {
+	if _, statErr := os.Stat(checkpointFile); statErr == nil {
 		data, err := os.ReadFile(checkpointFile)
 		if err != nil {
 			return errors.Trace(err) // transient I/O error
 		}
 		if err := json.Unmarshal(data, r.checkpoint); err != nil {
 			return errors.Annotatef(ErrCheckpointCorruption, "failed to unmarshal checkpoint.json: %v", err)
 		}
 		return nil
+	} else if !os.IsNotExist(statErr) {
+		return errors.Trace(statErr)
 	}
 
 	// checkpoint.json is missing — try recovering from the backup.
-	if _, err := os.Stat(bakFile); err == nil {
+	if _, statErr := os.Stat(bakFile); statErr == nil {
 		log.Warn("checkpoint.json not found, recovering from checkpoint.json.bak")
 		...
 		return nil
+	} else if !os.IsNotExist(statErr) {
+		return errors.Trace(statErr)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go` around lines 106
- 135, The os.Stat checks for checkpointFile and bakFile swallow non-NotExist
errors; update the logic in the block handling checkpointFile and the block
handling bakFile so that after calling os.Stat(...) you explicitly check the
error: if err == nil proceed as now, else if os.IsNotExist(err) continue to the
next recovery/fresh-start path, else return/propagate the err
(wrap/trace/annotate consistently with existing error handling) to avoid masking
permission or I/O errors; apply this change around the checkpoint.json and
checkpoint.json.bak checks that populate r.checkpoint and perform os.Rename.
cmd/multi-cluster-consistency-checker/task.go (1)

285-331: ⚠️ Potential issue | 🟡 Minor

Cleanup order: watchers are closed after their underlying clients.

cleanupClients still closes PD and etcd clients (lines 299–318) before closing checkpoint watchers and S3 watchers (lines 320–330). Since watchers hold active references to the etcd client for their watch loops, tearing down the client first forces in-flight operations to fail noisily before the watcher's Close() can shut them down gracefully.

Close watchers first, then clients:

Suggested fix
 func cleanupClients(
 	pdClients map[string]pd.Client,
 	etcdClients map[string]*etcd.CDCEtcdClientImpl,
 	checkpointWatchers map[string]map[string]watcher.Watcher,
 	s3Watchers map[string]*watcher.S3Watcher,
 ) {
 	...
-	// Close PD clients
-	for clusterID, pdClient := range pdClients {
-		...
-	}
-
-	// Close etcd clients
-	for clusterID, etcdClient := range etcdClients {
-		...
-	}
-
 	// Close checkpoint watchers
 	for _, clusterWatchers := range checkpointWatchers {
 		for _, watcher := range clusterWatchers {
 			watcher.Close()
 		}
 	}
 
 	// Close s3 watchers
 	for _, s3Watcher := range s3Watchers {
 		s3Watcher.Close()
 	}
+
+	// Close etcd clients
+	for clusterID, etcdClient := range etcdClients {
+		...
+	}
+
+	// Close PD clients
+	for clusterID, pdClient := range pdClients {
+		...
+	}
 
 	log.Info("Client cleanup completed")
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/task.go` around lines 285 - 331, In
cleanupClients, reverse the teardown order so watchers are closed before their
underlying clients: iterate and call Close() on all checkpointWatchers and
s3Watchers first (use the existing checkpointWatchers and s3Watchers maps and
the watcher.Close/S3Watcher.Close methods), wait/ensure watchers stop if needed,
and only after that close PD clients (pd.Client.Close) and etcd clients
(etcd.CDCEtcdClientImpl.Close) with the existing logging and error handling;
this prevents watchers from observing closed etcd clients mid-shutdown.
cmd/multi-cluster-consistency-checker/checker/checker.go (1)

345-378: ⚠️ Potential issue | 🔴 Critical

interface{} comparison at line 357 can panic on non-comparable types.

The comment claims ColumnValues only holds comparable types, but JSON unmarshaling can produce map[string]interface{} or []interface{} for nested JSON values, which are non-comparable and will cause a runtime panic with !=. Use reflect.DeepEqual or normalize values before comparison.

Suggested fix
+import "reflect"
+
 func diffColumns(local, replicated *decoder.Record) []recorder.InconsistentColumn {
 	var result []recorder.InconsistentColumn
 	for colName, localVal := range local.ColumnValues {
 		replicatedVal, ok := replicated.ColumnValues[colName]
 		if !ok {
 			result = append(result, recorder.InconsistentColumn{
 				Column:     colName,
 				Local:      localVal,
 				Replicated: nil,
 			})
-		} else if localVal != replicatedVal { // safe: ColumnValues only holds comparable types (see decoder.go)
+		} else if !reflect.DeepEqual(localVal, replicatedVal) {
 			result = append(result, recorder.InconsistentColumn{
 				Column:     colName,
 				Local:      localVal,
 				Replicated: replicatedVal,
 			})
 		}
 	}
#!/bin/bash
# Check what types ColumnValues can hold by examining the decoder
fd -t f decoder.go --exec grep -n "ColumnValues\|columnValues\|valueToDatum\|interface{}" {} \;
echo "---"
# Look for the specific function that populates ColumnValues
ast-grep --pattern 'func $_($$) {
  $$$
  ColumnValues[$_] = $_
  $$$
}'
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/checker/checker.go` around lines 345 -
378, The diffColumns function currently uses the non-comparable != operator on
values from decoder.Record.ColumnValues which can be maps/slices from JSON and
will panic; replace the direct != check with a safe deep comparison (use
reflect.DeepEqual) when comparing localVal and replicatedVal, and ensure you
still treat missing keys as inconsistencies; update the comment above
diffColumns to reflect that DeepEqual is used to compare possibly non-comparable
nested values and keep the rest of the logic (the two-loops that append
recorder.InconsistentColumn for missing keys) unchanged so only the equality
check is altered.
cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go (1)

70-77: Past review concern addressed: pending tasks are now unblocked on terminal errors.

failPendingTasksLocked (lines 72–77) is invoked when errChangefeedKeyDeleted is encountered (line 198), properly unblocking any goroutines waiting in AdvanceCheckpointTs. This resolves the prior concern.

Also applies to: 194-201

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go` around
lines 70 - 77, The duplicate review comment indicates the same approval was
posted twice; remove the redundant comment and ensure the behavior remains: when
errChangefeedKeyDeleted triggers, Call failPendingTasksLocked to close each
task.respCh (in CheckpointWatcher.failPendingTasksLocked) so goroutines blocked
in AdvanceCheckpointTs are unblocked; verify the other instance referencing the
same logic (the occurrence around errChangefeedKeyDeleted / the block that calls
failPendingTasksLocked) is consistent and not left with outdated comments.
🧹 Nitpick comments (5)
cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go (1)

60-62: time.Sleep-based synchronization is inherently flaky.

Multiple tests rely on time.Sleep(50 * time.Millisecond) to wait for the background goroutine to initialize. Under CI load or slow environments, this can intermittently fail. Consider using a polling/retry helper (e.g., require.Eventually) or exposing a ready signal from the watcher for tests.

That said, the timeouts are generous enough for practical use and this is a common Go test pattern, so this is a non-blocking suggestion.

Also applies to: 98-100, 165-167, 219-221, 285-287, 394-396, 440-442, 447-448, 524-526

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go`
around lines 60 - 62, Tests in checkpoint_watcher_test.go use time.Sleep(50 *
time.Millisecond) to wait for the watcher goroutine to initialize which is
flaky; replace these sleeps (occurrences around the watcher initialization and
other spots) with a deterministic wait such as require.Eventually or a short
polling loop that checks a ready condition on the watcher (e.g., expose a
Ready() bool or channel from the watcher created by NewCheckpointWatcher/Start)
and wait until that returns true (with a timeout) so tests no longer rely on
fixed sleeps.
cmd/multi-cluster-consistency-checker/checker/failpoint.go (1)

43-67: File handle is never closed.

The file opened by ensureFile is never closed. While this is a single FD for the process lifetime, it means the final JSONL entries may not be flushed on abrupt exit. Consider adding a Close() or using defer file.Sync() at the end of Write, or registering an atexit handler.

Also, the file is opened with 0644 permissions. Since the failpoint output may contain primary key data, 0600 would be more restrictive.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 43 -
67, The opened file in ensureFile (variable file) is never closed and is created
with too-permissive mode; modify ensureFile to open the file with mode 0600
instead of 0644 and add a safe close/sync path: ensure the Write function (the
method that appends JSONL entries) calls file.Sync() after successful writes (or
flushes) and register a one-time cleanup/Close for file (e.g., via a
package-level CloseFailpoint or an init/interrupt handler that calls file.Sync()
and file.Close(), using initOnce/mu to guard concurrent access), and ensure
disabled/ensureFile semantics remain intact when file is nil.
cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go (1)

245-251: Log messages start with uppercase.

Per coding guidelines, log message strings should follow a consistent style. These messages ("Starting to watch checkpoint", "Checkpoint updated") start with uppercase, which is atypical for structured log messages in Go.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go` around
lines 245 - 251, Update the structured log message strings in
checkpoint_watcher.go to use sentence case starting with lowercase (e.g. change
"Starting to watch checkpoint" -> "starting to watch checkpoint" and "Checkpoint
updated" -> "checkpoint updated") so they follow project logging style; locate
the zap log.Debug calls that reference cw.changefeedID, statusKey,
cw.localClusterID, cw.replicatedClusterID, status.CheckpointTs and modRev (and
any other similar log.Debug invocations in the same file/method) and replace the
leading uppercase log message text with the lowercase variants while keeping all
existing structured fields unchanged.
cmd/multi-cluster-consistency-checker/task.go (1)

168-170: Use errors.Errorf instead of fmt.Errorf for consistency and stack traces.

Multiple error returns in this file use fmt.Errorf (lines 169, 177, 184, 198, 205, 227, 232, 247) instead of the project's errors.Errorf. The latter attaches stack traces, which is consistent with the rest of the codebase and the coding guidelines about wrapping errors with the errors package.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/task.go` around lines 168 - 170,
Replace uses of fmt.Errorf in the s3 changefeed sink validation and the other
error-return sites in this file with the project's errors.Errorf so stack traces
are preserved; specifically update the error creation in the block that checks
cfInfo.Config and cfInfo.Config.Sink (where clusterID and s3ChangefeedID are
used) and the other analogous returns in this file (the other fmt.Errorf
occurrences referenced in the review) to call errors.Errorf with the same format
string and arguments, ensuring the errors package is imported if not already.
cmd/multi-cluster-consistency-checker/recorder/recorder.go (1)

162-180: Non-atomic two-file report flush.

flushReport writes report-N.report and report-N.json sequentially. If the process crashes between the two writes, one file exists without the other. For a monitoring/auditing tool, this means a partially flushed round. Consider treating the pair atomically (e.g., write both to temp files first, then rename both) or documenting this as an accepted trade-off.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go` around lines 162
- 180, The flushReport function currently writes reportName and jsonName
sequentially which can leave a partial pair on crash; modify
Recorder.flushReport to persist the pair atomically: marshal both outputs, write
them to temporary files (e.g., reportName.tmp and jsonName.tmp) inside
r.reportDir using atomicWriteFile (or reuse its temp-then-rename logic), then
rename/move both temp files to their final names (reportName and jsonName) so
either both appear or none; update r.reportFiles only after successful renames
of both files (use the existing reportName, jsonName, r.reportDir,
atomicWriteFile, and r.reportFiles symbols to locate and change the logic).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 1-98: Run the formatter to fix import ordering and gofumports/gci
issues and commit the changes: run `make fmt` (or `gofumports`/`gci` directly)
to reformat the file containing ensureFile, Write, and the import block so the
imports are grouped/ordered correctly and any spacing or unused-import fixes are
applied; then re-run CI and push the formatted changes.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go`:
- Around line 176-184: When run() observes cw.ctx.Done() it sets cw.watchErr but
doesn't drain pending tasks, which can leave callers that were still acquiring
cw.mu stuck; after setting cw.watchErr you should call failPendingTasksLocked()
while holding cw.mu to drain and notify all pending tasks just like the
key-deletion path does. Update the ctx.Done() branch in CheckpointWatcher.run()
to acquire cw.mu, set cw.watchErr (errors.Annotate(cw.ctx.Err(), "context
canceled")), call failPendingTasksLocked(), release the lock, and then return so
no pending AdvanceCheckpointTs tasks remain enqueued without a notifier.

In `@pkg/sink/cloudstorage/path_key.go`:
- Around line 187-189: The regex in the path building code (where str is
appended with `CDC(?:_(\w+)_)?(\d+).\w+` and compiled into pathRE) uses an
unescaped dot after (\d+), which allows '/' to match; change that segment to
require a literal dot by replacing `(\d+).\w+` with `(\d+)\.\w+` (i.e., update
the string passed to regexp.Compile that produces pathRE).
- Around line 189-207: The errors returned from regexp.Compile and
strconv.Parse* calls are returned raw; wrap them with errors.Trace so stack
context is preserved: when calling regexp.Compile(str) (variable pathRE) return
errors.Trace(err) instead of err, and do the same for
strconv.ParseUint(matches[3], 10, 64) and strconv.ParseInt(matches[4], 10, 64)
(used to set version and partitionNum); keep the same error messages/flow
(including the fmt.Errorf when matches length mismatches) but replace direct err
returns with errors.Trace(err) to preserve stack traces.
- Around line 211-214: The code trims leading zeros from matches[matchesFileIdx]
then calls strconv.ParseUint, which yields an "invalid syntax" error when the
trimmed result is empty (i.e., the original was all zeros). Before calling
strconv.ParseUint in the section around fileIdx and matches[matchesFileIdx],
check the trimmed value and if it is an empty string set it to "0", then call
strconv.ParseUint; this ensures zero-index files parse correctly while keeping
the existing use of strconv.ParseUint and matches[matchesFileIdx].

---

Duplicate comments:
In `@cmd/multi-cluster-consistency-checker/checker/checker.go`:
- Around line 345-378: The diffColumns function currently uses the
non-comparable != operator on values from decoder.Record.ColumnValues which can
be maps/slices from JSON and will panic; replace the direct != check with a safe
deep comparison (use reflect.DeepEqual) when comparing localVal and
replicatedVal, and ensure you still treat missing keys as inconsistencies;
update the comment above diffColumns to reflect that DeepEqual is used to
compare possibly non-comparable nested values and keep the rest of the logic
(the two-loops that append recorder.InconsistentColumn for missing keys)
unchanged so only the equality check is altered.

In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go`:
- Around line 106-135: The os.Stat checks for checkpointFile and bakFile swallow
non-NotExist errors; update the logic in the block handling checkpointFile and
the block handling bakFile so that after calling os.Stat(...) you explicitly
check the error: if err == nil proceed as now, else if os.IsNotExist(err)
continue to the next recovery/fresh-start path, else return/propagate the err
(wrap/trace/annotate consistently with existing error handling) to avoid masking
permission or I/O errors; apply this change around the checkpoint.json and
checkpoint.json.bak checks that populate r.checkpoint and perform os.Rename.

In `@cmd/multi-cluster-consistency-checker/task.go`:
- Around line 285-331: In cleanupClients, reverse the teardown order so watchers
are closed before their underlying clients: iterate and call Close() on all
checkpointWatchers and s3Watchers first (use the existing checkpointWatchers and
s3Watchers maps and the watcher.Close/S3Watcher.Close methods), wait/ensure
watchers stop if needed, and only after that close PD clients (pd.Client.Close)
and etcd clients (etcd.CDCEtcdClientImpl.Close) with the existing logging and
error handling; this prevents watchers from observing closed etcd clients
mid-shutdown.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go`:
- Around line 70-77: The duplicate review comment indicates the same approval
was posted twice; remove the redundant comment and ensure the behavior remains:
when errChangefeedKeyDeleted triggers, Call failPendingTasksLocked to close each
task.respCh (in CheckpointWatcher.failPendingTasksLocked) so goroutines blocked
in AdvanceCheckpointTs are unblocked; verify the other instance referencing the
same logic (the occurrence around errChangefeedKeyDeleted / the block that calls
failPendingTasksLocked) is consistent and not left with outdated comments.

---

Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 43-67: The opened file in ensureFile (variable file) is never
closed and is created with too-permissive mode; modify ensureFile to open the
file with mode 0600 instead of 0644 and add a safe close/sync path: ensure the
Write function (the method that appends JSONL entries) calls file.Sync() after
successful writes (or flushes) and register a one-time cleanup/Close for file
(e.g., via a package-level CloseFailpoint or an init/interrupt handler that
calls file.Sync() and file.Close(), using initOnce/mu to guard concurrent
access), and ensure disabled/ensureFile semantics remain intact when file is
nil.

In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go`:
- Around line 162-180: The flushReport function currently writes reportName and
jsonName sequentially which can leave a partial pair on crash; modify
Recorder.flushReport to persist the pair atomically: marshal both outputs, write
them to temporary files (e.g., reportName.tmp and jsonName.tmp) inside
r.reportDir using atomicWriteFile (or reuse its temp-then-rename logic), then
rename/move both temp files to their final names (reportName and jsonName) so
either both appear or none; update r.reportFiles only after successful renames
of both files (use the existing reportName, jsonName, r.reportDir,
atomicWriteFile, and r.reportFiles symbols to locate and change the logic).

In `@cmd/multi-cluster-consistency-checker/task.go`:
- Around line 168-170: Replace uses of fmt.Errorf in the s3 changefeed sink
validation and the other error-return sites in this file with the project's
errors.Errorf so stack traces are preserved; specifically update the error
creation in the block that checks cfInfo.Config and cfInfo.Config.Sink (where
clusterID and s3ChangefeedID are used) and the other analogous returns in this
file (the other fmt.Errorf occurrences referenced in the review) to call
errors.Errorf with the same format string and arguments, ensuring the errors
package is imported if not already.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go`:
- Around line 60-62: Tests in checkpoint_watcher_test.go use time.Sleep(50 *
time.Millisecond) to wait for the watcher goroutine to initialize which is
flaky; replace these sleeps (occurrences around the watcher initialization and
other spots) with a deterministic wait such as require.Eventually or a short
polling loop that checks a ready condition on the watcher (e.g., expose a
Ready() bool or channel from the watcher created by NewCheckpointWatcher/Start)
and wait until that returns true (with a timeout) so tests no longer rely on
fixed sleeps.

In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go`:
- Around line 245-251: Update the structured log message strings in
checkpoint_watcher.go to use sentence case starting with lowercase (e.g. change
"Starting to watch checkpoint" -> "starting to watch checkpoint" and "Checkpoint
updated" -> "checkpoint updated") so they follow project logging style; locate
the zap log.Debug calls that reference cw.changefeedID, statusKey,
cw.localClusterID, cw.replicatedClusterID, status.CheckpointTs and modRev (and
any other similar log.Debug invocations in the same file/method) and replace the
leading uppercase log message text with the lowercase variants while keeping all
existing structured fields unchanged.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f334d70 and 079cfde.

📒 Files selected for processing (12)
  • cmd/multi-cluster-consistency-checker/checker/checker.go
  • cmd/multi-cluster-consistency-checker/checker/checker_test.go
  • cmd/multi-cluster-consistency-checker/checker/failpoint.go
  • cmd/multi-cluster-consistency-checker/config/config_test.go
  • cmd/multi-cluster-consistency-checker/integration/integration_test.go
  • cmd/multi-cluster-consistency-checker/recorder/recorder.go
  • cmd/multi-cluster-consistency-checker/recorder/recorder_test.go
  • cmd/multi-cluster-consistency-checker/task.go
  • cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go
  • cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go
  • pkg/sink/cloudstorage/path_key.go
  • pkg/sink/cloudstorage/path_key_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • cmd/multi-cluster-consistency-checker/recorder/recorder_test.go
  • cmd/multi-cluster-consistency-checker/config/config_test.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
cmd/multi-cluster-consistency-checker/checker/failpoint.go (1)

1-98: Formatting issue already flagged.

The gofumports/gci formatting failure noted in a previous review has not been resolved. Run make fmt before merging.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 1 -
98, The file has gofumports/gci formatting failures; run the formatter and
reorder/import-grouping tool and re-run gofmt: run `make fmt` (or run
gofumports/gci directly) and commit the resulting changes so imports and
formatting in failpoint.go (notably around the package imports and functions
ensureFile and Write) comply with the repository style; no functional changes
required—just apply the automatic formatters and re-run tests/lint before
pushing.
🧹 Nitpick comments (1)
cmd/multi-cluster-consistency-checker/checker/failpoint.go (1)

43-48: Package-level sync.Once prevents state reset between tests.

These package-level globals are initialized exactly once per process lifetime. Any test that needs to exercise a different TICDC_MULTI_CLUSTER_CONSISTENCY_CHECKER_FAILPOINT_RECORD_FILE value (e.g., testing both the enabled and disabled paths) cannot reset initOnce, file, or disabled, forcing per-test-case process isolation.

Consider wrapping the state in a struct and exposing a package-level default instance, which keeps the call sites identical while making the component resettable and injectable in tests.

♻️ Sketch of a struct-based approach
-var (
-	initOnce sync.Once
-	mu       sync.Mutex
-	file     *os.File
-	disabled atomic.Bool
-)
-
-func ensureFile() { ... }
-
-// Write persists one failpoint event to the JSONL file.
-func Write(failpoint string, rows []RowRecord) { ... }

+// Recorder serializes failpoint events to a JSONL file.
+type Recorder struct {
+	initOnce sync.Once
+	mu       sync.Mutex
+	file     *os.File
+	disabled atomic.Bool
+}
+
+func (r *Recorder) ensureFile() { ... }
+
+// Write persists one failpoint event to the JSONL file.
+func (r *Recorder) Write(failpoint string, rows []RowRecord) { ... }
+
+// defaultRecorder is the package-level singleton driven by the env var.
+var defaultRecorder Recorder
+
+// Write is a convenience wrapper around the default recorder.
+func Write(failpoint string, rows []RowRecord) {
+	defaultRecorder.Write(failpoint, rows)
+}

Tests can then construct a Recorder with a temp file path without touching the global.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 43 -
48, The package-level sync.Once and globals (initOnce, mu, file, disabled) make
the recorder state unresettable in tests; refactor by creating a Recorder struct
encapsulating these fields (e.g., type Recorder struct { initOnce sync.Once; mu
sync.Mutex; file *os.File; disabled atomic.Bool; path string }), replace
package-level vars with a package-level default Recorder instance (e.g., var
defaultRecorder = NewRecorderFromEnv()), and change public functions that
reference initOnce/mu/file/disabled to be methods on *Recorder while keeping
wrappers that call defaultRecorder.Method(...) so call sites remain the same but
tests can instantiate/reset a Recorder with different
TICDC_MULTI_CLUSTER_CONSISTENCY_CHECKER_FAILPOINT_RECORD_FILE values.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 93-97: The write path that acquires mu and calls file.Write(data)
should also call file.Sync() after a successful write to flush kernel buffers to
stable storage; update the failpoint record write block (the mu.Lock()/defer
mu.Unlock() section where file.Write(data) is called) to call file.Sync() when
Write returns no error and log any sync errors with log.Warn/zap.Error so
failures are visible; ensure Sync is performed while still holding mu (or
otherwise ensure no concurrent writes) so the flush corresponds to the written
record.

---

Duplicate comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 1-98: The file has gofumports/gci formatting failures; run the
formatter and reorder/import-grouping tool and re-run gofmt: run `make fmt` (or
run gofumports/gci directly) and commit the resulting changes so imports and
formatting in failpoint.go (notably around the package imports and functions
ensureFile and Write) comply with the repository style; no functional changes
required—just apply the automatic formatters and re-run tests/lint before
pushing.

---

Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 43-48: The package-level sync.Once and globals (initOnce, mu,
file, disabled) make the recorder state unresettable in tests; refactor by
creating a Recorder struct encapsulating these fields (e.g., type Recorder
struct { initOnce sync.Once; mu sync.Mutex; file *os.File; disabled atomic.Bool;
path string }), replace package-level vars with a package-level default Recorder
instance (e.g., var defaultRecorder = NewRecorderFromEnv()), and change public
functions that reference initOnce/mu/file/disabled to be methods on *Recorder
while keeping wrappers that call defaultRecorder.Method(...) so call sites
remain the same but tests can instantiate/reset a Recorder with different
TICDC_MULTI_CLUSTER_CONSISTENCY_CHECKER_FAILPOINT_RECORD_FILE values.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 079cfde and f7392af.

📒 Files selected for processing (1)
  • cmd/multi-cluster-consistency-checker/checker/failpoint.go

Comment on lines +93 to +97
mu.Lock()
defer mu.Unlock()
if _, err := file.Write(data); err != nil {
log.Warn("failed to write failpoint record", zap.Error(err))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing file.Sync() risks losing records on unclean process exit.

os.File.Write() writes into the OS kernel buffer. On SIGKILL or an OOM kill, unflushed kernel pages for the file can be lost. For a tool whose purpose is capturing data anomalies, silently losing the last batch of failpoint records on crash significantly reduces its reliability.

🛡️ Proposed fix
 	mu.Lock()
 	defer mu.Unlock()
 	if _, err := file.Write(data); err != nil {
 		log.Warn("failed to write failpoint record", zap.Error(err))
+		return
 	}
+	if err := file.Sync(); err != nil {
+		log.Warn("failed to sync failpoint record file", zap.Error(err))
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
mu.Lock()
defer mu.Unlock()
if _, err := file.Write(data); err != nil {
log.Warn("failed to write failpoint record", zap.Error(err))
}
mu.Lock()
defer mu.Unlock()
if _, err := file.Write(data); err != nil {
log.Warn("failed to write failpoint record", zap.Error(err))
return
}
if err := file.Sync(); err != nil {
log.Warn("failed to sync failpoint record file", zap.Error(err))
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 93 -
97, The write path that acquires mu and calls file.Write(data) should also call
file.Sync() after a successful write to flush kernel buffers to stable storage;
update the failpoint record write block (the mu.Lock()/defer mu.Unlock() section
where file.Write(data) is called) to call file.Sync() when Write returns no
error and log any sync errors with log.Warn/zap.Error so failures are visible;
ensure Sync is performed while still holding mu (or otherwise ensure no
concurrent writes) so the flush corresponds to the written record.

Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (3)
cmd/multi-cluster-consistency-checker/checker/failpoint.go (1)

94-98: ⚠️ Potential issue | 🟠 Major

Persist writes durably before releasing the lock.

Line 96 writes to the file but never calls file.Sync(). On abrupt exit, buffered data can be lost, which weakens auditability/resumability for checker failpoint records.

💾 Suggested change
 	mu.Lock()
 	defer mu.Unlock()
 	if _, err := file.Write(data); err != nil {
 		log.Warn("failed to write failpoint record", zap.Error(err))
+		return
+	}
+	if err := file.Sync(); err != nil {
+		log.Warn("failed to sync failpoint record file", zap.Error(err))
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 94 -
98, The write to the failpoint file must be durably persisted before releasing
mu: after calling file.Write(data) in the same critical section (where mu is
held), call file.Sync() and check its error, logging via log.Warn (similar to
the Write error) if Sync fails; ensure both write and sync errors are handled
before allowing the deferred mu.Unlock() to run so failpoint records are durable
across abrupt exits.
cmd/multi-cluster-consistency-checker/decoder/decoder.go (1)

328-329: ⚠️ Potential issue | 🟡 Minor

Remove hyphen from log message text.

Line 328 still logs "canal-json ...". This violates the log-string style rule in this repo.

As per coding guidelines, log message strings should avoid hyphens (use spaces instead).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go` around lines 328 -
329, Update the log message passed to log.Panic so it removes the hyphen in
"canal-json" (replace with a space: "canal json") to comply with the repo's
log-string style rule; locate the log.Panic call that currently reads
"canal-json encoded message should have type in `string`" in decoder.go and
update that string literal accordingly.
cmd/multi-cluster-consistency-checker/consumer/consumer.go (1)

271-299: ⚠️ Potential issue | 🟠 Major

Bound all remaining errgroup fan-outs, not only DML file reads.

downloadDMLFiles is limited, but these paths still fan out unbounded goroutines. Under large table/version counts this can still exhaust resources and overload S3.

#!/bin/bash
# Verify errgroup usage vs SetLimit in this file.
rg -n 'errgroup\.WithContext|SetLimit\(' cmd/multi-cluster-consistency-checker/consumer/consumer.go

Also applies to: 392-412, 464-483, 687-711, 729-762

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go` around lines 271
- 299, The outer errgroup (eg, created via errgroup.WithContext in consumer.go)
spawns unbounded goroutines for each schemaTableKey; bound those fan-outs by
applying a concurrency limit (eg.SetLimit(N)) on that errgroup before the
for-loop or by using a semaphore/channel to limit concurrent eg.Go calls; ensure
downloadSchemaFilesWithScanRange and downloadDataFilesWithScanRange calls are
executed under that bounded group, and if downloadDataFilesWithScanRange itself
spawns goroutines (or calls downloadDMLFiles), add a local bounded errgroup or
semaphore inside downloadDataFilesWithScanRange/downloadDMLFiles to cap S3/read
concurrency as well.
🧹 Nitpick comments (2)
cmd/multi-cluster-consistency-checker/checker/checker.go (1)

411-456: Stabilize detection iteration order to keep report item ordering reproducible.

These detection passes iterate maps directly, so failure-item append order can vary between runs. Sorting schema keys / PKs / timestamps before iteration would make report outputs reliably stable.

Also applies to: 462-477, 481-517

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/checker/checker.go` around lines 411 -
456, The iteration over maps in the detection pass (starting from
cd.timeWindowDataCaches[timeWindowIdx].tableDataCaches and iterating schemaKey,
localDataCache, and cd.timeWindowDataCaches[timeWindowIdx].checkpointTs) causes
non-deterministic report ordering; update the loops to iterate deterministic,
sorted slices instead: extract and sort schema keys from
cd.timeWindowDataCaches[timeWindowIdx].tableDataCaches before looping, extract
and sort primary-key identifiers (e.g., record.PkStr or the map keys used to
build localDataCache) before iterating localDataCache, and extract and sort
replicatedClusterID/checkpointTs pairs (by cluster ID or by timestamp) from
cd.timeWindowDataCaches[timeWindowIdx].checkpointTs before iterating; apply the
same sorting approach to the other affected blocks (around lines 462–477 and
481–517) so report append order is reproducible.
cmd/multi-cluster-consistency-checker/recorder/recorder.go (1)

57-69: Sort retention candidates by round number, not filename lexicographic order.

sort.Strings(reportFiles) on startup can misorder rounds (report-10.* before report-2.*), so cleanupOldReports may delete newer files first after restart.

Also applies to: 225-237

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go` around lines 57 -
69, The startup code collects report file names into reportFiles and currently
uses sort.Strings(reportFiles), which sorts lexicographically and misorders
rounds (e.g., report-10 before report-2); change the sorting to numeric round
order by parsing the round number from each filename (e.g., extract the integer
after "report-" and before the next separator) and use sort.Slice with a
comparator that compares those integers so older rounds come first; apply the
same change to the other occurrence referenced by cleanupOldReports so both
places sort by parsed round number rather than filename string.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Line 59: The file is currently created with permissions set to 0o644, which
allows read access to other local users and exposes sensitive PrimaryKeys. To
fix this, change the file permission argument in the os.OpenFile call within the
failpoint.go file (at the line opening the file for append) from 0o644 to 0o600,
ensuring only the owner has read and write access, following the least privilege
principle and matching the pattern used in recorder.go.

In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go`:
- Around line 255-256: The code dereferences
d.msg.TiDBCommitTsExtension.CommitTs without checking that d.msg and its
TiDBCommitTsExtension (the `_tidb` extension) are present, which can panic on
malformed payloads; update the decoder method that contains the lines setting
commitTs := d.msg.TiDBCommitTsExtension.CommitTs and d.msg = nil to first
validate d.msg != nil and d.msg.TiDBCommitTsExtension != nil (or the equivalent
presence flag), and if absent return a wrapped error (e.g., fmt.Errorf("missing
_tidb extension: %w", err)) instead of dereferencing, preserving existing
cleanup (d.msg = nil) only after successful validation.
- Around line 177-181: The current decoding uses bytes.IndexAny(d.data,
d.config.Terminator) which treats d.config.Terminator ("\\r\\n") as a set of
runes and matches either '\r' or '\n', causing incorrect splits; replace that
call with bytes.Index(d.data, []byte(d.config.Terminator)) (or equivalent) to
search for the full CRLF sequence, then keep the existing logic that sets
encodedData = d.data[:idx] and advances d.data =
d.data[idx+len(d.config.Terminator):] so records are only split on the full
terminator.

In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go`:
- Around line 261-275: Update the test expectations to match the current
formatter that uses "local commit ts" instead of "commit ts": in the test case
using NewReport(1) and NewClusterReport("c1", tw) where AddDataLossItem("d1",
testSchemaKey, ...) is called and s := r.MarshalReport() is asserted, replace
the expected substring `commit ts` with `local commit ts` for the DataLossItem
lines (the assertions referencing the peer cluster line produced by
MarshalReport). Ensure the same change is applied to the other test cases in
this file that use AddDataLossItem and assert MarshalReport output.

---

Duplicate comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 94-98: The write to the failpoint file must be durably persisted
before releasing mu: after calling file.Write(data) in the same critical section
(where mu is held), call file.Sync() and check its error, logging via log.Warn
(similar to the Write error) if Sync fails; ensure both write and sync errors
are handled before allowing the deferred mu.Unlock() to run so failpoint records
are durable across abrupt exits.

In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go`:
- Around line 271-299: The outer errgroup (eg, created via errgroup.WithContext
in consumer.go) spawns unbounded goroutines for each schemaTableKey; bound those
fan-outs by applying a concurrency limit (eg.SetLimit(N)) on that errgroup
before the for-loop or by using a semaphore/channel to limit concurrent eg.Go
calls; ensure downloadSchemaFilesWithScanRange and
downloadDataFilesWithScanRange calls are executed under that bounded group, and
if downloadDataFilesWithScanRange itself spawns goroutines (or calls
downloadDMLFiles), add a local bounded errgroup or semaphore inside
downloadDataFilesWithScanRange/downloadDMLFiles to cap S3/read concurrency as
well.

In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go`:
- Around line 328-329: Update the log message passed to log.Panic so it removes
the hyphen in "canal-json" (replace with a space: "canal json") to comply with
the repo's log-string style rule; locate the log.Panic call that currently reads
"canal-json encoded message should have type in `string`" in decoder.go and
update that string literal accordingly.

---

Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/checker/checker.go`:
- Around line 411-456: The iteration over maps in the detection pass (starting
from cd.timeWindowDataCaches[timeWindowIdx].tableDataCaches and iterating
schemaKey, localDataCache, and
cd.timeWindowDataCaches[timeWindowIdx].checkpointTs) causes non-deterministic
report ordering; update the loops to iterate deterministic, sorted slices
instead: extract and sort schema keys from
cd.timeWindowDataCaches[timeWindowIdx].tableDataCaches before looping, extract
and sort primary-key identifiers (e.g., record.PkStr or the map keys used to
build localDataCache) before iterating localDataCache, and extract and sort
replicatedClusterID/checkpointTs pairs (by cluster ID or by timestamp) from
cd.timeWindowDataCaches[timeWindowIdx].checkpointTs before iterating; apply the
same sorting approach to the other affected blocks (around lines 462–477 and
481–517) so report append order is reproducible.

In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go`:
- Around line 57-69: The startup code collects report file names into
reportFiles and currently uses sort.Strings(reportFiles), which sorts
lexicographically and misorders rounds (e.g., report-10 before report-2); change
the sorting to numeric round order by parsing the round number from each
filename (e.g., extract the integer after "report-" and before the next
separator) and use sort.Slice with a comparator that compares those integers so
older rounds come first; apply the same change to the other occurrence
referenced by cleanupOldReports so both places sort by parsed round number
rather than filename string.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f7392af and 1b51f1d.

📒 Files selected for processing (8)
  • cmd/multi-cluster-consistency-checker/checker/checker.go
  • cmd/multi-cluster-consistency-checker/checker/checker_test.go
  • cmd/multi-cluster-consistency-checker/checker/failpoint.go
  • cmd/multi-cluster-consistency-checker/consumer/consumer.go
  • cmd/multi-cluster-consistency-checker/decoder/decoder.go
  • cmd/multi-cluster-consistency-checker/recorder/recorder.go
  • cmd/multi-cluster-consistency-checker/recorder/types.go
  • cmd/multi-cluster-consistency-checker/recorder/types_test.go

return
}
var err error
file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's locate and examine the failpoint.go file
find . -name "failpoint.go" -type f

Repository: pingcap/ticdc

Length of output: 139


🏁 Script executed:

# Once found, let's look at the context around line 59
head -80 cmd/multi-cluster-consistency-checker/checker/failpoint.go | tail -40

Repository: pingcap/ticdc

Length of output: 941


🏁 Script executed:

# Also check the full context of the file to understand data being written
wc -l cmd/multi-cluster-consistency-checker/checker/failpoint.go

Repository: pingcap/ticdc

Length of output: 118


🏁 Script executed:

# Let's read the full failpoint.go file to understand data structure and context
cat -n cmd/multi-cluster-consistency-checker/checker/failpoint.go

Repository: pingcap/ticdc

Length of output: 3236


🏁 Script executed:

# Let's also check if there are other instances of os.OpenFile in the codebase to see permission patterns
rg "os\.OpenFile" --type go -B2 -A2

Repository: pingcap/ticdc

Length of output: 3344


🏁 Script executed:

# Let's check what DefaultFileMode is set to in the redo package
rg "DefaultFileMode" --type go -B2 -A2

Repository: pingcap/ticdc

Length of output: 1687


🏁 Script executed:

# Let's also verify if there are any tests or comments explaining the 0o644 choice
rg "0o644|0644" --type go

Repository: pingcap/ticdc

Length of output: 3379


Change file permissions from 0o644 to 0o600 to restrict access to sensitive failpoint records.

Line 59 creates the file with world-readable permissions (0o644), exposing failpoint records containing PrimaryKeys (sensitive database identifiers) to other local users. Use owner-only permissions (0o600) to follow the principle of least privilege. This pattern is already established in the same module at cmd/multi-cluster-consistency-checker/recorder/recorder.go, which uses 0o600 for writing similar data.

🔐 Suggested change
-		file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
+		file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` at line 59, The
file is currently created with permissions set to 0o644, which allows read
access to other local users and exposes sensitive PrimaryKeys. To fix this,
change the file permission argument in the os.OpenFile call within the
failpoint.go file (at the line opening the file for append) from 0o644 to 0o600,
ensuring only the owner has read and write access, following the least privilege
principle and matching the pattern used in recorder.go.

Comment on lines +177 to +181
idx := bytes.IndexAny(d.data, d.config.Terminator)
if idx >= 0 {
encodedData = d.data[:idx]
d.data = d.data[idx+len(d.config.Terminator):]
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

Go bytes.IndexAny function behavior does it match individual runes or full string sequence

💡 Result:

bytes.IndexAny(s, chars) matches individual Unicode code points (runes), not a full multi-rune string sequence. It treats chars as a set of runes and returns the byte index in s of the first occurrence of any one of those runes. If none match (or chars is empty), it returns -1. [1]

If you need to find a full substring/byte sequence, use bytes.Index(s, sep) (with sep as a []byte). [1]

Example (sequence vs “any of”):

  • bytes.IndexAny([]byte("abcXYZ"), "XY") matches 'X' (or 'Y') individually.
  • bytes.Index([]byte("abcXYZ"), []byte("XY")) matches the sequence "XY".

Sources

  1. Go bytes.IndexAny / bytes.Index docs (pkg.go.dev) [1]

🏁 Script executed:

cat -n cmd/multi-cluster-consistency-checker/decoder/decoder.go | head -200

Repository: pingcap/ticdc

Length of output: 6810


🏁 Script executed:

cat -n cmd/multi-cluster-consistency-checker/decoder/decoder.go | sed -n '200,340p'

Repository: pingcap/ticdc

Length of output: 5602


🏁 Script executed:

rg -n "config.CRLF\|CRLF\s*=" --type go | head -20

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

rg -n "CRLF" --type go pkg/config/ | head -30

Repository: pingcap/ticdc

Length of output: 305


Use bytes.Index to match full CRLF sequence, not individual runes.

Line 177 uses bytes.IndexAny(d.data, d.config.Terminator) where Terminator is "\r\n". The bytes.IndexAny function treats its second argument as a set of individual runes and matches either \r OR \n—not the full CRLF sequence. This causes data corruption when a lone newline appears in the payload, splitting records at the wrong boundary.

Fix
-	idx := bytes.IndexAny(d.data, d.config.Terminator)
+	idx := bytes.Index(d.data, []byte(d.config.Terminator))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
idx := bytes.IndexAny(d.data, d.config.Terminator)
if idx >= 0 {
encodedData = d.data[:idx]
d.data = d.data[idx+len(d.config.Terminator):]
} else {
idx := bytes.Index(d.data, []byte(d.config.Terminator))
if idx >= 0 {
encodedData = d.data[:idx]
d.data = d.data[idx+len(d.config.Terminator):]
} else {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go` around lines 177 -
181, The current decoding uses bytes.IndexAny(d.data, d.config.Terminator) which
treats d.config.Terminator ("\\r\\n") as a set of runes and matches either '\r'
or '\n', causing incorrect splits; replace that call with bytes.Index(d.data,
[]byte(d.config.Terminator)) (or equivalent) to search for the full CRLF
sequence, then keep the existing logic that sets encodedData = d.data[:idx] and
advances d.data = d.data[idx+len(d.config.Terminator):] so records are only
split on the full terminator.

Comment on lines +255 to +256
commitTs := d.msg.TiDBCommitTsExtension.CommitTs
d.msg = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard _tidb extension before dereferencing commitTs.

Line 255 can panic on malformed row payloads if _tidb is absent/null. Return a wrapped error instead of dereferencing unconditionally.

🔧 Proposed fix
-	commitTs := d.msg.TiDBCommitTsExtension.CommitTs
+	if d.msg.TiDBCommitTsExtension == nil {
+		return nil, errors.New("missing _tidb commit ts extension")
+	}
+	commitTs := d.msg.TiDBCommitTsExtension.CommitTs
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
commitTs := d.msg.TiDBCommitTsExtension.CommitTs
d.msg = nil
if d.msg.TiDBCommitTsExtension == nil {
return nil, errors.New("missing _tidb commit ts extension")
}
commitTs := d.msg.TiDBCommitTsExtension.CommitTs
d.msg = nil
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go` around lines 255 -
256, The code dereferences d.msg.TiDBCommitTsExtension.CommitTs without checking
that d.msg and its TiDBCommitTsExtension (the `_tidb` extension) are present,
which can panic on malformed payloads; update the decoder method that contains
the lines setting commitTs := d.msg.TiDBCommitTsExtension.CommitTs and d.msg =
nil to first validate d.msg != nil and d.msg.TiDBCommitTsExtension != nil (or
the equivalent presence flag), and if absent return a wrapped error (e.g.,
fmt.Errorf("missing _tidb extension: %w", err)) instead of dereferencing,
preserving existing cleanup (d.msg = nil) only after successful validation.

Comment on lines +116 to +118
} else if !os.IsNotExist(err) {
return errors.Annotatef(ErrCheckpointCorruption, "failed to stat checkpoint.json: %v", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not classify os.Stat operational failures as checkpoint corruption.

Line 117 and Line 138 annotate stat failures with ErrCheckpointCorruption, which can mislead operators (e.g., permission/IO/transient failures are not corruption).

🔧 Proposed fix
-	} else if !os.IsNotExist(err) {
-		return errors.Annotatef(ErrCheckpointCorruption, "failed to stat checkpoint.json: %v", err)
+	} else if !os.IsNotExist(err) {
+		return errors.Trace(err)
 	}
...
-	} else if !os.IsNotExist(err) {
-		return errors.Annotatef(ErrCheckpointCorruption, "failed to stat checkpoint.json.bak: %v", err)
+	} else if !os.IsNotExist(err) {
+		return errors.Trace(err)
 	}

Also applies to: 137-139

Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 25, 2026

@Leavrth: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 7e86f69 link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (3)
cmd/multi-cluster-consistency-checker/recorder/types_test.go (3)

248-353: Add a multi-cluster/multi-table marshal test to lock deterministic output behavior.

Current MarshalReport assertions are strong, but they mostly cover one emitted cluster/table. Add a case with multiple non-empty clusters and multiple tables to ensure output order remains deterministic and audit-friendly.

As per coding guidelines, "**/*_test.go: Use unit test files named *_test.go in Go; favor deterministic tests and use testify/require".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go` around lines
248 - 353, Add a new subtest inside TestReport_MarshalReport that builds a
NewReport and adds multiple non-empty ClusterReports (via NewClusterReport) each
containing multiple tables and a mix of items (use AddDataLossItem,
AddDataRedundantItem, AddDataInconsistentItem, AddLWWViolationItem across
clusters/tables), call MarshalReport and assert the full string equals the
expected multi-cluster/multi-table output to lock deterministic ordering; ensure
the test uses testify/require for equality and parallel t.Parallel() like other
subtests and references MarshalReport, NewReport and NewClusterReport so the
failure pinpoints ordering issues.

377-385: Strengthen the “multiple entries” assertion beyond length-only checks.

The test currently verifies only Len==2. Consider asserting expected key/value membership (Schema, Table, Version, paths) using require.ElementsMatch to catch mapping regressions.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go` around lines
377 - 385, Update the "multiple entries" subtest to assert actual entries
instead of only checking length: after calling
NewSchemaTableVersionKeyFromVersionKeyMap(m) verify that the returned slice
contains the expected SchemaTableVersionKey objects (matching Schema, Table,
Version and any path fields) for inputs {Schema:"db1",Table:"t1",Version:1} and
{Schema:"db2",Table:"t2",Version:2} using require.ElementsMatch (or equivalent)
so the test fails on mapping regressions; locate the test in the t.Run("multiple
entries") block and replace the single require.Len assertion with an
ElementsMatch against the expected slice of
types.SchemaTableKey/VersionKey-derived results produced by
NewSchemaTableVersionKeyFromVersionKeyMap.

24-24: Rename test functions to camelCase (remove underscores).

Several test function names use underscores (for example TestDataLossItem_String), which violates the Go naming rule used in this repo. Please rename them to camelCase forms like TestDataLossItemString.

As per coding guidelines, "Use camelCase for function names and do not include underscores (e.g., getPartitionNum, not get_partition_num) in Go".

Also applies to: 36-36, 92-92, 104-104, 248-248, 355-355, 388-388, 467-467

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go` at line 24,
Several test functions use underscores in their names (e.g.,
TestDataLossItem_String) which violates the repo's camelCase rule; rename each
test to camelCase (for example change TestDataLossItem_String to
TestDataLossItemString) and similarly convert the other underscore-separated
test names referenced in the file to camelCase; update any references (calls,
table-driven test names, or go test filters) to use the new names and run `go
test` to verify no symbol breakages.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go`:
- Around line 196-197: The test builds lwwViolationItems
(map[lwwViolationKey]struct{}) from expected violations but never removes
entries when iterating actual reports, so the final assertion at the end of Test
(around the reports loop) will always fail; modify the loop that processes
reports (the for _, report := range reports block) to detect matching
lwwViolationKey values and delete them from lwwViolationItems (use
delete(lwwViolationItems, key)) when a report reconciles that key, and/or adjust
the final assertion to only require the map to be empty if
reconciliation/removal occurred; ensure you reference lwwViolationItems,
lwwViolationKey and the reports loop when making the change.
- Around line 41-44: Wrap the raw errors returned by library calls with
errors.Trace to preserve stack traces: replace the direct return of err from
os.Open(path) with errors.Trace(err), and change the fmt.Errorf wrapping around
scanner.Err() to return errors.Trace(err) instead; ensure both replacements use
the errors.Trace symbol so the function that opens and scans the file returns
traced errors consistent with the repository convention (matching the
os.OpenFile pattern used elsewhere).
- Line 284: Remove the unconditional failure call require.Fail(t, "success") at
the end of the validation logic in validation_test.go; either delete that line
or replace it with a non-failing confirmation (e.g., t.Log("validation
succeeded") or a no-op assertion) so the test does not always fail after passing
all checks and the rest of the validation functions/assertions can report true
results.
- Around line 60-63: Replace the current json.Unmarshal usage that reads each
JSONL line into the local variable `record` with a json.Decoder configured via
`UseNumber()` so numeric values are preserved as `json.Number`; this ensures
`normalizePKValue` can see the `json.Number` branch and prevents precision loss
for large integer PKs used by `pkMapToKey`. Concretely, where `record` is
populated (the scanner loop that currently calls `json.Unmarshal([]byte(line),
&record)`), create a `json.NewDecoder` for the line, call `UseNumber()` on it,
then `Decode(&record)`; apply the same decoder+UseNumber() pattern to any other
places in the validation flow that decode JSON into types consumed by
`normalizePKValue` and `pkMapToKey`.

---

Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go`:
- Around line 248-353: Add a new subtest inside TestReport_MarshalReport that
builds a NewReport and adds multiple non-empty ClusterReports (via
NewClusterReport) each containing multiple tables and a mix of items (use
AddDataLossItem, AddDataRedundantItem, AddDataInconsistentItem,
AddLWWViolationItem across clusters/tables), call MarshalReport and assert the
full string equals the expected multi-cluster/multi-table output to lock
deterministic ordering; ensure the test uses testify/require for equality and
parallel t.Parallel() like other subtests and references MarshalReport,
NewReport and NewClusterReport so the failure pinpoints ordering issues.
- Around line 377-385: Update the "multiple entries" subtest to assert actual
entries instead of only checking length: after calling
NewSchemaTableVersionKeyFromVersionKeyMap(m) verify that the returned slice
contains the expected SchemaTableVersionKey objects (matching Schema, Table,
Version and any path fields) for inputs {Schema:"db1",Table:"t1",Version:1} and
{Schema:"db2",Table:"t2",Version:2} using require.ElementsMatch (or equivalent)
so the test fails on mapping regressions; locate the test in the t.Run("multiple
entries") block and replace the single require.Len assertion with an
ElementsMatch against the expected slice of
types.SchemaTableKey/VersionKey-derived results produced by
NewSchemaTableVersionKeyFromVersionKeyMap.
- Line 24: Several test functions use underscores in their names (e.g.,
TestDataLossItem_String) which violates the repo's camelCase rule; rename each
test to camelCase (for example change TestDataLossItem_String to
TestDataLossItemString) and similarly convert the other underscore-separated
test names referenced in the file to camelCase; update any references (calls,
table-driven test names, or go test filters) to use the new names and run `go
test` to verify no symbol breakages.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1b51f1d and 7e86f69.

📒 Files selected for processing (2)
  • cmd/multi-cluster-consistency-checker/integration/validation_test.go
  • cmd/multi-cluster-consistency-checker/recorder/types_test.go

Comment on lines +41 to +44
file, err := os.Open(path)
if err != nil {
return nil, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's view the file and see the actual code at lines 41-44 and 68-71
cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '35,75p'

Repository: pingcap/ticdc

Length of output: 1424


🏁 Script executed:

# Check error wrapping patterns in the codebase
rg -n 'errors\.(Trace|WrapError)' cmd/multi-cluster-consistency-checker --type=go -A 1 -B 1 | head -50

Repository: pingcap/ticdc

Length of output: 3300


🏁 Script executed:

# Check for os.Open usage patterns in the repository
rg -n 'os\.Open' cmd/multi-cluster-consistency-checker --type=go -A 2 -B 1 | head -40

Repository: pingcap/ticdc

Length of output: 1326


Wrap library-call errors using errors.Trace() for consistent stack trace handling.

Line 41-44: os.Open(path) error is not wrapped at all. Wrap it with errors.Trace(err).

Line 68-71: scanner.Err() is wrapped with fmt.Errorf instead of errors.Trace(err). Use errors.Trace() for consistency with the repository's error-wrapping convention (see cmd/multi-cluster-consistency-checker/recorder/recorder.go:205 for the same os.OpenFile pattern).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go` around
lines 41 - 44, Wrap the raw errors returned by library calls with errors.Trace
to preserve stack traces: replace the direct return of err from os.Open(path)
with errors.Trace(err), and change the fmt.Errorf wrapping around scanner.Err()
to return errors.Trace(err) instead; ensure both replacements use the
errors.Trace symbol so the function that opens and scans the file returns traced
errors consistent with the repository convention (matching the os.OpenFile
pattern used elsewhere).

Comment on lines +60 to +63
var record checker.Record
err := json.Unmarshal([]byte(line), &record)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal jsonl record in %s line %d: %w", path, lineNo, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

rg -n -A5 'type\s+Record\s+struct' cmd/multi-cluster-consistency-checker

Repository: pingcap/ticdc

Length of output: 1076


🏁 Script executed:

rg -n -A10 'type\s+Row\s+struct' cmd/multi-cluster-consistency-checker

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

rg -n -B2 -A5 'PrimaryKeys' cmd/multi-cluster-consistency-checker

Repository: pingcap/ticdc

Length of output: 6749


🏁 Script executed:

cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | head -70

Repository: pingcap/ticdc

Length of output: 2425


🏁 Script executed:

cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '130,190p'

Repository: pingcap/ticdc

Length of output: 2062


🏁 Script executed:

rg -n 'UseNumber|json\.Decoder' cmd/multi-cluster-consistency-checker

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

rg -n 'PrimaryKeys.*:=' cmd/multi-cluster-consistency-checker/checker --type=go -B3

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

rg -n 'json\.Decoder|UseNumber' cmd/multi-cluster-consistency-checker --type=go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

rg -n 'PrimaryKeys' cmd/multi-cluster-consistency-checker --type=go -A2 -B2

Repository: pingcap/ticdc

Length of output: 4655


🏁 Script executed:

# Check if there are any tests that show what numeric PKs are being used
grep -r "PrimaryKeys\|primaryKeys" cmd/multi-cluster-consistency-checker/integration/validation_test.go

Repository: pingcap/ticdc

Length of output: 704


🏁 Script executed:

# Check if json.Decoder with UseNumber is used anywhere in the codebase
rg -n 'NewDecoder' cmd/multi-cluster-consistency-checker --type=go -A5

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check the exact method of reading records in checker/decoder.go
cat -n cmd/multi-cluster-consistency-checker/decoder/decoder.go | sed -n '1,100p'

Repository: pingcap/ticdc

Length of output: 3477


🏁 Script executed:

# Look for any test data or fixtures that might show what numeric PKs look like
fd -e json cmd/multi-cluster-consistency-checker | head -20

Repository: pingcap/ticdc

Length of output: 510


Use json.Decoder with UseNumber() to preserve numeric precision for large integer primary keys.

JSON unmarshaling without UseNumber() coerces numeric values to float64, causing precision loss for PKs ≥ 2^53. The json.Number case in normalizePKValue (lines 158–159) is unreachable with standard Unmarshal. Replace the scanner pattern in lines 60–63 with:

decoder := json.NewDecoder(bytes.NewReader([]byte(line)))
decoder.UseNumber()
err := decoder.Decode(&record)

This applies to all JSON unmarshaling in the validation flow, including lines 135–186 where pkMapToKey depends on correctly encoded numeric PKs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go` around
lines 60 - 63, Replace the current json.Unmarshal usage that reads each JSONL
line into the local variable `record` with a json.Decoder configured via
`UseNumber()` so numeric values are preserved as `json.Number`; this ensures
`normalizePKValue` can see the `json.Number` branch and prevents precision loss
for large integer PKs used by `pkMapToKey`. Concretely, where `record` is
populated (the scanner loop that currently calls `json.Unmarshal([]byte(line),
&record)`), create a `json.NewDecoder` for the line, call `UseNumber()` on it,
then `Decode(&record)`; apply the same decoder+UseNumber() pattern to any other
places in the validation flow that decode JSON into types consumed by
`normalizePKValue` and `pkMapToKey`.

Comment on lines +196 to +197
lwwViolationItems := make(map[lwwViolationKey]struct{})
for _, report := range reports {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

LWW violations are collected but never reconciled before assertion.

lwwViolationItems is populated from reports, but no CDC/checker path removes matched keys. Line 283 then requires it to be empty, which will fail as soon as reports contain real LWW items.

Also applies to: 221-227, 283-283

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go` around
lines 196 - 197, The test builds lwwViolationItems
(map[lwwViolationKey]struct{}) from expected violations but never removes
entries when iterating actual reports, so the final assertion at the end of Test
(around the reports loop) will always fail; modify the loop that processes
reports (the for _, report := range reports block) to detect matching
lwwViolationKey values and delete them from lwwViolationItems (use
delete(lwwViolationItems, key)) when a report reconciles that key, and/or adjust
the final assertion to only require the map to be empty if
reconciliation/removal occurred; ensure you reference lwwViolationItems,
lwwViolationKey and the reports loop when making the change.

require.Empty(t, dataInconsistentItems)
require.Empty(t, dataRedundantItems)
require.Empty(t, lwwViolationItems)
require.Fail(t, "success")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '270,300p'

Repository: pingcap/ticdc

Length of output: 1086


🏁 Script executed:

cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '240,285p'

Repository: pingcap/ticdc

Length of output: 2071


🏁 Script executed:

cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '200,245p'

Repository: pingcap/ticdc

Length of output: 2023


🏁 Script executed:

cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '150,205p'

Repository: pingcap/ticdc

Length of output: 2215


Remove the unconditional failure at the end of the validation logic.

Line 284 contains require.Fail(t, "success") which causes the test to fail unconditionally after all validation checks pass. This should be removed or replaced with logging.

Suggested fix
-	require.Fail(t, "success")
+	t.Log("validation succeeded")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
require.Fail(t, "success")
t.Log("validation succeeded")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go` at line
284, Remove the unconditional failure call require.Fail(t, "success") at the end
of the validation logic in validation_test.go; either delete that line or
replace it with a non-failing confirmation (e.g., t.Log("validation succeeded")
or a no-op assertion) so the test does not always fail after passing all checks
and the rest of the validation functions/assertions can report true results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-triage-completed first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support multi-clusters active-active data consistent checker

1 participant