Support multi-clusters active-active data consistent checker#4246
Support multi-clusters active-active data consistent checker#4246Leavrth wants to merge 6 commits intopingcap:masterfrom
Conversation
This PR introduces a consistency validation tool for multi-cluster active-active TiCDC deployments. It connects to each cluster’s PD(etcd) and S3 sink, verifies critical changefeed settings (for example canal-json, date-separator=day, and expected file index width), then continuously watches checkpoints and sink files to advance a unified time window across clusters. Within each window, the checker compares local writes and replicated records by primary key and timestamp to detect anomalies such as missing data, redundant data, out-of-order updates, and LWW violations. The recorder persists checkpoints and reports atomically, so the process is resumable after failures and suitable for long-running, auditable consistency monitoring.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a new multi-cluster active-active consistency checker: CLI, config, advancer, S3 consumer & decoder, checkpoint & S3 watchers, core checker, durable recorder, task orchestration, mocks/integration tests, and many unit/integration tests. Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as CLI Main
participant Config as Config Loader
participant Init as Client Init
participant PD as PD/Etcd Clients
participant Adv as TimeWindow Advancer
participant S3 as S3 Watcher/Consumer
participant Checker as Data Checker
participant Recorder as Recorder
CLI->>Config: LoadConfig(path)
CLI->>Init: initClients(config)
Init->>PD: create PD/etcd clients & watchers
loop Each Round
Adv->>PD: Advance/checkpoint & GetPDTS()
Adv->>S3: AdvanceS3CheckpointTs / ConsumeNewFiles()
S3-->>Adv: IncrementalData + MaxVersion
Adv-->>Checker: TimeWindowData
Checker->>Checker: decode & detect (loss, redundancy, inconsistency, LWW)
Checker-->>Recorder: Report & time-window
Recorder->>Recorder: persist checkpoint & reports
end
sequenceDiagram
participant Adv as TimeWindow Advancer
participant CW as CheckpointWatcher
participant S3 as S3Consumer
participant PD as PD Client
participant State as Internal State
Adv->>CW: AdvanceCheckpointTs(minTs)
CW-->>Adv: latest checkpoint when >= minTs
Adv->>S3: ConsumeNewFiles()
S3-->>Adv: incremental files + max versions
Adv->>PD: getPDTsFromCluster / getPDTsFromOtherClusters
PD-->>Adv: pd timestamps
Adv->>State: update triplets, boundaries, PDTimestampAfterTimeWindow
Adv-->>Adv: compose TimeWindowData
Estimated code review effort🎯 5 (Critical) | ⏱️ ~95 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @Leavrth, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust multi-cluster consistency checker for TiCDC's active-active replication. The tool systematically monitors data flow across clusters by analyzing S3 sink data and changefeed checkpoints. It's designed to proactively identify and report various data inconsistencies, ensuring data integrity and providing a reliable mechanism for continuous validation in complex distributed environments. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive data consistency checker for multi-cluster active-active deployments. The implementation is robust, with thorough handling of configuration, state persistence via checkpoints, and graceful shutdown. The testing strategy is excellent, covering unit, integration, and various failure scenarios.
I've identified a few areas for improvement, mainly related to performance and code clarity. Specifically, there are opportunities to reduce redundant RPC calls to PD and to optimize S3 directory listings. I've also suggested a small refactoring to simplify some of the checking logic.
Overall, this is a high-quality contribution that adds a valuable tool for ensuring data consistency in complex deployments.
| opt := &storage.WalkOption{ | ||
| SubDir: metaSubDir, | ||
| ObjPrefix: "schema_", | ||
| // TODO: StartAfter: startVersionKey, |
There was a problem hiding this comment.
The WalkDir calls in this file do not use the StartAfter option in storage.WalkOption, which can lead to re-listing files that have already been processed in previous rounds. The author has noted this with a TODO.
Using StartAfter would be a good performance optimization. For example, in downloadSchemaFilesWithScanRange, you can set opt.StartAfter = startVersionKey. This applies to other WalkDir calls in this file as well, such as in getNewFilesForSchemaPathKeyWithEndPath and discoverAndDownloadNewTableVersions.
There was a problem hiding this comment.
Actionable comments posted: 11
🧹 Nitpick comments (11)
cmd/multi-cluster-consistency-checker/recorder/types_test.go (1)
247-351:MarshalReporttests will become flaky if any multi-cluster scenario is added.
r.ClusterReportsis amap[string]*ClusterReport, so iterating it inMarshalReportproduces non-deterministic cluster ordering. All current tests are safe only because they include at most one non-empty cluster per report. Adding a second non-empty cluster to any test (or to the production code's output) will cause intermittent failures.The production
MarshalReportshould sort cluster IDs before building the output string, e.g.:// In MarshalReport (types.go), replace map range with sorted keys: clusterIDs := make([]string, 0, len(r.ClusterReports)) for id := range r.ClusterReports { clusterIDs = append(clusterIDs, id) } slices.Sort(clusterIDs) for _, id := range clusterIDs { cr := r.ClusterReports[id] // ... }Once the production method is stable, future tests with multiple non-empty clusters will be reliable. Based on learnings: favor deterministic tests.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go` around lines 247 - 351, MarshalReport iterates r.ClusterReports (a map) which yields non-deterministic cluster ordering; make iteration deterministic by collecting the map keys, sorting them (e.g., using slices.Sort or sort.Strings), and then iterating over the sorted clusterIDs to fetch each ClusterReport (reference: MarshalReport, r.ClusterReports, ClusterReport); update the loop in MarshalReport to use the sorted keys so tests and output are stable across runs.Makefile (1)
177-179:multi-cluster-consistency-checkermissing from.PHONY.Similar build targets
cdc,kafka_consumer,storage_consumer,pulsar_consumer, andfilter_helperare all declared.PHONY. The new target and the existingconfig-converter/oauth2_servertargets are inconsistently absent. While functionally harmless (no such file exists in the root), it breaks the convention set by the other binary targets.🔧 Suggested fix
-.PHONY: clean fmt check tidy \ +.PHONY: clean fmt check tidy multi-cluster-consistency-checker \ generate-protobuf generate_mock \ cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Makefile` around lines 177 - 179, Add the new make target name multi-cluster-consistency-checker to the .PHONY declaration so it follows the existing convention; update the .PHONY list that currently includes cdc, kafka_consumer, storage_consumer, pulsar_consumer, filter_helper (and note config-converter/oauth2_server are also missing) to include multi-cluster-consistency-checker to maintain consistency across binary build targets.cmd/multi-cluster-consistency-checker/config/config_test.go (1)
251-274: Missing test coverage: single cluster with no peers should arguably be rejected.The
LoadConfigcode only rejects zero clusters; a configuration with exactly one cluster passes validation even though the checker is meaningless in that scenario. The test for "missing pd-addrs" happens to use a single cluster, but only because pd-addrs fires first — it doesn't exercise the "single cluster accepted" path. Consider adding a test case and a minimum-cluster count check (≥ 2) inLoadConfig.📋 Suggested test case
t.Run("single cluster is rejected", func(t *testing.T) { t.Parallel() tmpDir := t.TempDir() configPath := filepath.Join(tmpDir, "config.toml") configContent := ` [global] data-dir = "/tmp/data" [global.tables] schema1 = ["table1"] [clusters] [clusters.cluster1] pd-addrs = ["127.0.0.1:2379"] s3-sink-uri = "s3://bucket/cluster1/" s3-changefeed-id = "s3-cf-1" ` require.NoError(t, os.WriteFile(configPath, []byte(configContent), 0644)) cfg, err := LoadConfig(configPath) require.Error(t, err) require.Nil(t, cfg) require.Contains(t, err.Error(), "at least two clusters must be configured") })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/config/config_test.go` around lines 251 - 274, The config loader currently allows a single cluster which makes the checker meaningless; update LoadConfig to validate that the parsed clusters map has length >= 2 and return an error like "at least two clusters must be configured" when fewer are present, then add a unit test in config_test.go (e.g., a new t.Run "single cluster is rejected") that writes a config with one cluster (use pd-addrs, s3-sink-uri, s3-changefeed-id) and asserts LoadConfig returns an error, nil cfg, and that the error message contains the new "at least two clusters must be configured" text to cover the single-cluster rejection path.cmd/multi-cluster-consistency-checker/decoder/decoder_test.go (1)
81-88: Reversedrequire.Equalargument order.
testify/require.Equalconvention is(t, expected, actual). The assertions here passactualRecordas the first (expected) argument andexpectedRecordas the second (actual). This won't affect pass/fail correctness, but failure messages will swap "expected" and "got", making debugging confusing.The same pattern appears in
TestCanalJSONDecoder2(lines 121–127) and partially inTestCanalJSONDecoderWithInvalidMessage(lines 143–146).Example fix for TestCanalJSONDecoder1
for i, actualRecord := range records { expectedRecord := ExpectedRecords1[i] - require.Equal(t, actualRecord.Pk, expectedRecord.Pk) - require.Equal(t, actualRecord.PkStr, expectedRecord.PkStr) - require.Equal(t, actualRecord.ColumnValues, expectedRecord.ColumnValues) - require.Equal(t, actualRecord.CdcVersion.CommitTs, expectedRecord.CdcVersion.CommitTs) - require.Equal(t, actualRecord.CdcVersion.OriginTs, expectedRecord.CdcVersion.OriginTs) + require.Equal(t, expectedRecord.Pk, actualRecord.Pk) + require.Equal(t, expectedRecord.PkStr, actualRecord.PkStr) + require.Equal(t, expectedRecord.ColumnValues, actualRecord.ColumnValues) + require.Equal(t, expectedRecord.CdcVersion.CommitTs, actualRecord.CdcVersion.CommitTs) + require.Equal(t, expectedRecord.CdcVersion.OriginTs, actualRecord.CdcVersion.OriginTs) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/decoder/decoder_test.go` around lines 81 - 88, The require.Equal calls in the decoder tests pass actual then expected which flips failure messages; update assertions in TestCanalJSONDecoder1 (the loop using records and ExpectedRecords1), TestCanalJSONDecoder2 (the analogous loop using ExpectedRecords2), and the checks in TestCanalJSONDecoderWithInvalidMessage to call require.Equal(t, expected, actual) — e.g., swap arguments so ExpectedRecordsX.* (ExpectedRecords1/2, expectedRecord.Pk, expectedRecord.PkStr, expectedRecord.ColumnValues, expectedRecord.CdcVersion.CommitTs, expectedRecord.CdcVersion.OriginTs) are the second-to-last (expected) parameter and actualRecord.* are the last (actual) parameter in each require.Equal invocation.cmd/multi-cluster-consistency-checker/config/config.go (1)
78-81: Consider wrapping the TOML decode error witherrors.Trace.Per coding guidelines, errors from third-party/library calls should be wrapped with
errors.Trace(err)to attach a stack trace. Thetoml.DecodeFileerror on line 78 is wrapped withfmt.Errorf+%winstead.This is minor since the caller (
main.go) will print the error directly, but for consistency with the rest of the codebase it's worth aligning.As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/config/config.go` around lines 78 - 81, The toml.DecodeFile call returns a third-party error that should be wrapped with errors.Trace to attach a stack trace; change the error return in the block handling toml.DecodeFile (currently using fmt.Errorf with %w) to wrap the original err via errors.Trace(err) (e.g. return nil, fmt.Errorf("failed to decode config file: %w", errors.Trace(err))) so the DecodeFile error is traced per project conventions; update the import if errors is not already imported.cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go (2)
60-61:time.Sleep-based synchronization is inherently flaky.There are ~16
time.Sleepcalls used to synchronize with goroutines. While common in test code, these are non-deterministic and can cause flaky failures under CI load. Consider introducing a polling/retry helper or condition variable so tests wait for the precise state rather than guessing a duration.This is not blocking, but worth addressing over time to reduce CI noise.
Also applies to: 98-99, 110-111, 165-166, 178-179, 219-220, 230-231, 285-286, 304-305, 324-325, 394-395, 440-441, 446-447, 488-489, 524-525, 536-537
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go` around lines 60 - 61, The tests use time.Sleep calls to wait for the watcher to initialize and other goroutine-driven states, which is flaky; replace these sleeps with a deterministic polling/retry helper (e.g., waitForCondition or waitUntil) that accepts a timeout and periodically checks a predicate until true, and then use it wherever the file currently calls time.Sleep (instances in checkpoint_watcher_test.go around watcher initialization and subsequent synchronization points). Locate the places referencing the watcher initialization and goroutine coordination in checkpoint_watcher_test.go and update them to call the helper with a short tick interval and a reasonable timeout so tests wait for the exact condition (e.g., watcher started, checkpoint processed) instead of sleeping.
23-23: Deprecatedgomockimport path — migrate to maintained fork.
github.com/golang/mockwas archived (June 27, 2023) and is no longer maintained. The actively maintained fork isgo.uber.org/mock, maintained by Uber. Update the import fromgithub.com/golang/mock/gomocktogo.uber.org/mock/gomock, and regenerate mocks usinggo install go.uber.org/mock/mockgen@latest.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go` at line 23, The test imports the archived gomock path; update the import in checkpoint_watcher_test.go from "github.com/golang/mock/gomock" to "go.uber.org/mock/gomock" and update any references to gomock.NewController or gomock.Expect* accordingly, then regenerate mocks with "go install go.uber.org/mock/mockgen@latest" and re-run mock generation (mockgen) so generated mocks reference the new module path; ensure CI/go.mod/go.sum are updated and tests compile.cmd/multi-cluster-consistency-checker/watcher/s3_watcher.go (1)
64-65: Resolve the pending S3 index update TODO.The TODO suggests missing functionality for index updates from S3; please either implement it or track it with an issue so the watcher doesn’t fall behind.
If you want, I can draft the implementation or open a follow-up issue.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/watcher/s3_watcher.go` around lines 64 - 65, The TODO indicates we never apply the S3-derived index updates after calling sw.consumer.ConsumeNewFiles; implement logic immediately after the ConsumeNewFiles call to merge newData into the watcher's index and advance per-key versions using maxVersionMap (e.g., iterate newData and upsert into the internal index structure, then set per-key versions from maxVersionMap), handle and return errors from the index update, and add retries/logging as needed; if you cannot implement now, replace the TODO with a created-issue reference and a clear TODO note so the watcher does not silently fall behind.cmd/multi-cluster-consistency-checker/checker/checker.go (1)
402-418: Zap field keys contain spaces — prefer camelCase.Fields like
zap.String("local cluster ID", ...)andzap.String("replicated cluster ID", ...)use spaces. The zap convention and structured logging best practice is to use camelCase keys (e.g.,"localClusterID"), which are easier to query in log aggregation systems.Example diff
- zap.String("local cluster ID", cd.clusterID), - zap.String("replicated cluster ID", replicatedClusterID), + zap.String("localClusterID", cd.clusterID), + zap.String("replicatedClusterID", replicatedClusterID),As per coding guidelines, "Use structured logs via
github.com/pingcap/logwithzapfields in Go; log message strings should not include function names and should avoid hyphens (use spaces instead)".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/checker/checker.go` around lines 402 - 418, The zap log field keys in the log.Debug call inside the loop (involving cd.timeWindowDataCaches, cd.checkedRecordsCount, checker.FindClusterReplicatedData and cd.lwwSkippedRecordsCount) use spaces—replace them with camelCase keys (e.g., change "local cluster ID" to "localClusterID" and "replicated cluster ID" to "replicatedClusterID") so structured logs follow conventions; update the zap.String field keys in that log.Debug invocation to camelCase (keeping other keys like "schemaKey", "pk", and "commitTs" consistent) and run a quick grep to ensure no other nearby log usages use spaced keys.cmd/multi-cluster-consistency-checker/recorder/types.go (1)
374-409:ToScanRangeassumes tables present in older checkpoints are always in the newest.Lines 392–396 and 402–407 return an error if a table from
CheckpointItems[1]or[0]is not found in the result (seeded from[2]). This is correct under normal operation — tables shouldn't disappear from newer checkpoints. However, if a DDL drops a table between rounds, this would surface as an error on recovery. If table drops are expected, consider gracefully skipping missing keys.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/recorder/types.go` around lines 374 - 409, ToScanRange currently seeds result from c.CheckpointItems[2] and then treats missing SchemaTableKey entries from c.CheckpointItems[1] and [0] as fatal errors; change the logic in the loops that iterate over c.CheckpointItems[1].ClusterInfo[clusterID].MaxVersion and c.CheckpointItems[0].ClusterInfo[clusterID].MaxVersion to skip entries that are not found in result (instead of returning errors.Errorf), so dropped tables do not cause recovery to fail; keep the initial seeding from CheckpointItems[2] and the updates to StartVersionKey/StartDataPath when present, and also ensure you check for nil ClusterInfo[clusterID] before iterating to avoid panics.cmd/multi-cluster-consistency-checker/consumer/consumer.go (1)
306-377: TODO comments forStartAfteroptimization.Lines 317, 429, 499, 550 contain
// TODO: StartAfter:hints. These are important for performance in production S3 buckets with many objects — withoutStartAfter, everyWalkDirscans from the beginning of the prefix.Would you like me to open an issue to track implementing the
StartAfteroptimization across theseWalkDircalls?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go` around lines 306 - 377, The WalkDir scans start from the prefix every time because WalkOption.StartAfter is not set; update the WalkOption in downloadSchemaFilesWithScanRange (and the other TODO'd WalkDir usages in consumer.go) to set StartAfter to the last-known object key (use startVersionKey or the previous highest schema key as appropriate) so S3 listing resumes after that key; pass this StartAfter in the storage.WalkOption used by c.s3Storage.WalkDir and ensure the logic still handles ErrWalkDirEnd and edge cases where StartAfter may be empty.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/multi-cluster-consistency-checker/checker/checker.go`:
- Around line 547-559: The call to clusterDataChecker.InitializeFromCheckpoint
inside DataChecker.initializeFromCheckpoint currently ignores the returned
error, so parsing failures (e.g., from decoder.Decode) are swallowed; change
DataChecker.initializeFromCheckpoint to return an error and propagate any error
returned by clusterDataChecker.InitializeFromCheckpoint (referencing
DataChecker.initializeFromCheckpoint and
clusterDataChecker.InitializeFromCheckpoint), ensuring you preserve existing nil
checks on checkpoint and CheckpointItems[2]; then update NewDataChecker to
handle and return this error from initializeFromCheckpoint so callers can react
to corrupted checkpoints.
- Around line 344-377: The direct interface{} comparisons in diffColumns (and in
EqualReplicatedRecord) are unsafe because JSON may produce non-comparable types
(maps/slices); replace the != checks with a deep-equality check (e.g., use
reflect.DeepEqual) or otherwise normalize/serialize JSON values before
comparing; update the comparisons that build recorder.InconsistentColumn for
local.ColumnValues and replicated.ColumnValues to use reflect.DeepEqual (or an
equivalent canonical-JSON comparison) and ensure the reflect package is imported
and used consistently in both diffColumns and EqualReplicatedRecord to avoid
runtime panics when comparing slices/maps.
In `@cmd/multi-cluster-consistency-checker/consumer/consumer_test.go`:
- Around line 231-232: The test setup passes nil to SetSchemaDefinition in the
"remove with condition" subtests which risks panics if SetSchemaDefinition
dereferences td; update those calls to pass an empty table definition instead by
replacing nil with &cloudstorage.TableDefinition{} for both occurrences (the
SetSchemaDefinition invocations used alongside key1/key2 in the "remove with
condition" subtests) so the tests use a safe, consistent empty struct like the
existing "set and get empty table definition" subtest.
- Around line 395-397: The mockS3Storage.ReadFile implementation incorrectly
uses the zero map value and returns m.sortedFiles[0] for unknown names; update
ReadFile to first check for existence with the map lookup pattern (val, ok :=
m.fileOffset[name]) and if !ok return an explicit error (e.g., fmt.Errorf or
errors.New) indicating missing file name, otherwise return
m.sortedFiles[val].content, nil; reference the mockS3Storage.ReadFile method,
m.fileOffset map and m.sortedFiles slice when making the change.
In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go`:
- Around line 627-651: The loop spawns an unbounded goroutine per task (using
errgroup eg and egCtx) which can exhaust resources; call eg.SetLimit(N) before
starting goroutines to cap concurrent downloads (choose N from a new or existing
configurable field on the consumer, e.g., c.downloadConcurrency or a constant),
and adjust the fileContents channel buffer to be min(len(tasks), N) to avoid
blocking; apply the same pattern to the other download helpers
(downloadSchemaFiles, downloadDataFilesWithScanRange,
downloadNewFilesWithVersions), and keep using c.s3Storage.ReadFile/egCtx inside
the limited goroutines.
In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go`:
- Around line 190-193: The log message in the json.Unmarshal error path uses a
hyphenated string and includes a function name; update the log.Error calls (the
one around json.Unmarshal in decoder.go that sets d.msg = nil and the similar
call at the later occurrence around lines 328–329) to remove hyphens and any
function names—e.g., change "canal-json decoder unmarshal data failed" to a
plain descriptive message like "canal json decoder unmarshal failed" (preserve
the zap.Error(err) and zap.ByteString("data", encodedData) fields) so the
message follows the logging guidelines.
In `@cmd/multi-cluster-consistency-checker/integration/mock_cluster.go`:
- Around line 135-141: The calls to the external storage WriteFile (inside the
loop over mc.S3Storages and mc.Tables where path := fmt.Sprintf(... ) and the
later similar block at 156-159) return raw errors from storage.ExternalStorage;
wrap those errors immediately before returning (e.g., replace `return err` with
`return errors.Trace(err)` or `return errors.WrapError(err, "...")`) so
s3.WriteFile errors are traceable and include context; apply the same wrapping
to the other s3.WriteFile/Read/Delete call sites referenced in the same
function.
In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go`:
- Around line 106-129: The code currently ignores non-NotExist errors from
os.Stat for checkpointFile and bakFile and treats them as "missing"; update both
checks to explicitly handle errors: after calling os.Stat(checkpointFile) and
os.Stat(bakFile) if err != nil then if !os.IsNotExist(err) return
errors.Trace(err) so permission/I/O errors are propagated instead of falling
back or proceeding, preserving the existing ReadFile/json.Unmarshal logic when
os.IsNotExist(err) is true; adjust the blocks around checkpointFile and bakFile
checks (the variables checkpointFile, bakFile and the logic that
reads/unmarshals into r.checkpoint) accordingly.
In `@cmd/multi-cluster-consistency-checker/task.go`:
- Around line 282-328: The cleanupClients function currently closes PD and etcd
clients before closing checkpoint and S3 watchers; change it so
checkpointWatchers and s3Watchers are closed first (iterate checkpointWatchers
and call watcher.Close() with a debug log per watcher, and iterate s3Watchers
and call Close() with a debug log), then close etcd clients (as in the existing
etcdClients loop with error handling/logging), and finally close PD clients;
keep the existing nil checks and logging behavior but reorder the blocks inside
cleanupClients to ensure watchers are shut down before their underlying clients.
In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go`:
- Around line 175-180: When detecting the non-recoverable error (errors.Is(err,
errChangefeedKeyDeleted)) set cw.watchErr under cw.mu and then wake any
goroutines waiting in AdvanceCheckpointTs so they can observe the error and
return instead of hanging; specifically, after setting cw.watchErr in
checkpoint_watcher.go call the watcher’s wait/notify primitive (e.g.
cw.cond.Broadcast if a sync.Cond is used, or close a done channel if the watcher
uses a channel) while holding the lock so AdvanceCheckpointTs can see
cw.watchErr and unblock immediately.
In `@Makefile`:
- Around line 177-179: CI shows gofumports/gci formatting failures; run the
repository formatter and commit the results: execute make fmt (which runs gci,
gofumports and shfmt) to reformat all modified Go files introduced in this PR,
verify the Makefile target multi-cluster-consistency-checker is unchanged, then
add and commit the formatted files and push the commit so the formatting check
passes.
---
Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/checker/checker.go`:
- Around line 402-418: The zap log field keys in the log.Debug call inside the
loop (involving cd.timeWindowDataCaches, cd.checkedRecordsCount,
checker.FindClusterReplicatedData and cd.lwwSkippedRecordsCount) use
spaces—replace them with camelCase keys (e.g., change "local cluster ID" to
"localClusterID" and "replicated cluster ID" to "replicatedClusterID") so
structured logs follow conventions; update the zap.String field keys in that
log.Debug invocation to camelCase (keeping other keys like "schemaKey", "pk",
and "commitTs" consistent) and run a quick grep to ensure no other nearby log
usages use spaced keys.
In `@cmd/multi-cluster-consistency-checker/config/config_test.go`:
- Around line 251-274: The config loader currently allows a single cluster which
makes the checker meaningless; update LoadConfig to validate that the parsed
clusters map has length >= 2 and return an error like "at least two clusters
must be configured" when fewer are present, then add a unit test in
config_test.go (e.g., a new t.Run "single cluster is rejected") that writes a
config with one cluster (use pd-addrs, s3-sink-uri, s3-changefeed-id) and
asserts LoadConfig returns an error, nil cfg, and that the error message
contains the new "at least two clusters must be configured" text to cover the
single-cluster rejection path.
In `@cmd/multi-cluster-consistency-checker/config/config.go`:
- Around line 78-81: The toml.DecodeFile call returns a third-party error that
should be wrapped with errors.Trace to attach a stack trace; change the error
return in the block handling toml.DecodeFile (currently using fmt.Errorf with
%w) to wrap the original err via errors.Trace(err) (e.g. return nil,
fmt.Errorf("failed to decode config file: %w", errors.Trace(err))) so the
DecodeFile error is traced per project conventions; update the import if errors
is not already imported.
In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go`:
- Around line 306-377: The WalkDir scans start from the prefix every time
because WalkOption.StartAfter is not set; update the WalkOption in
downloadSchemaFilesWithScanRange (and the other TODO'd WalkDir usages in
consumer.go) to set StartAfter to the last-known object key (use startVersionKey
or the previous highest schema key as appropriate) so S3 listing resumes after
that key; pass this StartAfter in the storage.WalkOption used by
c.s3Storage.WalkDir and ensure the logic still handles ErrWalkDirEnd and edge
cases where StartAfter may be empty.
In `@cmd/multi-cluster-consistency-checker/decoder/decoder_test.go`:
- Around line 81-88: The require.Equal calls in the decoder tests pass actual
then expected which flips failure messages; update assertions in
TestCanalJSONDecoder1 (the loop using records and ExpectedRecords1),
TestCanalJSONDecoder2 (the analogous loop using ExpectedRecords2), and the
checks in TestCanalJSONDecoderWithInvalidMessage to call require.Equal(t,
expected, actual) — e.g., swap arguments so ExpectedRecordsX.*
(ExpectedRecords1/2, expectedRecord.Pk, expectedRecord.PkStr,
expectedRecord.ColumnValues, expectedRecord.CdcVersion.CommitTs,
expectedRecord.CdcVersion.OriginTs) are the second-to-last (expected) parameter
and actualRecord.* are the last (actual) parameter in each require.Equal
invocation.
In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go`:
- Around line 247-351: MarshalReport iterates r.ClusterReports (a map) which
yields non-deterministic cluster ordering; make iteration deterministic by
collecting the map keys, sorting them (e.g., using slices.Sort or sort.Strings),
and then iterating over the sorted clusterIDs to fetch each ClusterReport
(reference: MarshalReport, r.ClusterReports, ClusterReport); update the loop in
MarshalReport to use the sorted keys so tests and output are stable across runs.
In `@cmd/multi-cluster-consistency-checker/recorder/types.go`:
- Around line 374-409: ToScanRange currently seeds result from
c.CheckpointItems[2] and then treats missing SchemaTableKey entries from
c.CheckpointItems[1] and [0] as fatal errors; change the logic in the loops that
iterate over c.CheckpointItems[1].ClusterInfo[clusterID].MaxVersion and
c.CheckpointItems[0].ClusterInfo[clusterID].MaxVersion to skip entries that are
not found in result (instead of returning errors.Errorf), so dropped tables do
not cause recovery to fail; keep the initial seeding from CheckpointItems[2] and
the updates to StartVersionKey/StartDataPath when present, and also ensure you
check for nil ClusterInfo[clusterID] before iterating to avoid panics.
In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go`:
- Around line 60-61: The tests use time.Sleep calls to wait for the watcher to
initialize and other goroutine-driven states, which is flaky; replace these
sleeps with a deterministic polling/retry helper (e.g., waitForCondition or
waitUntil) that accepts a timeout and periodically checks a predicate until
true, and then use it wherever the file currently calls time.Sleep (instances in
checkpoint_watcher_test.go around watcher initialization and subsequent
synchronization points). Locate the places referencing the watcher
initialization and goroutine coordination in checkpoint_watcher_test.go and
update them to call the helper with a short tick interval and a reasonable
timeout so tests wait for the exact condition (e.g., watcher started, checkpoint
processed) instead of sleeping.
- Line 23: The test imports the archived gomock path; update the import in
checkpoint_watcher_test.go from "github.com/golang/mock/gomock" to
"go.uber.org/mock/gomock" and update any references to gomock.NewController or
gomock.Expect* accordingly, then regenerate mocks with "go install
go.uber.org/mock/mockgen@latest" and re-run mock generation (mockgen) so
generated mocks reference the new module path; ensure CI/go.mod/go.sum are
updated and tests compile.
In `@cmd/multi-cluster-consistency-checker/watcher/s3_watcher.go`:
- Around line 64-65: The TODO indicates we never apply the S3-derived index
updates after calling sw.consumer.ConsumeNewFiles; implement logic immediately
after the ConsumeNewFiles call to merge newData into the watcher's index and
advance per-key versions using maxVersionMap (e.g., iterate newData and upsert
into the internal index structure, then set per-key versions from
maxVersionMap), handle and return errors from the index update, and add
retries/logging as needed; if you cannot implement now, replace the TODO with a
created-issue reference and a clear TODO note so the watcher does not silently
fall behind.
In `@Makefile`:
- Around line 177-179: Add the new make target name
multi-cluster-consistency-checker to the .PHONY declaration so it follows the
existing convention; update the .PHONY list that currently includes cdc,
kafka_consumer, storage_consumer, pulsar_consumer, filter_helper (and note
config-converter/oauth2_server are also missing) to include
multi-cluster-consistency-checker to maintain consistency across binary build
targets.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (28)
Makefilecmd/multi-cluster-consistency-checker/advancer/time_window_advancer.gocmd/multi-cluster-consistency-checker/advancer/time_window_advancer_test.gocmd/multi-cluster-consistency-checker/checker/checker.gocmd/multi-cluster-consistency-checker/checker/checker_test.gocmd/multi-cluster-consistency-checker/config/config.example.tomlcmd/multi-cluster-consistency-checker/config/config.gocmd/multi-cluster-consistency-checker/config/config_test.gocmd/multi-cluster-consistency-checker/consumer/consumer.gocmd/multi-cluster-consistency-checker/consumer/consumer_test.gocmd/multi-cluster-consistency-checker/decoder/decoder.gocmd/multi-cluster-consistency-checker/decoder/decoder_test.gocmd/multi-cluster-consistency-checker/decoder/value_to_datum_test.gocmd/multi-cluster-consistency-checker/integration/integration_test.gocmd/multi-cluster-consistency-checker/integration/mock_cluster.gocmd/multi-cluster-consistency-checker/main.gocmd/multi-cluster-consistency-checker/main_test.gocmd/multi-cluster-consistency-checker/recorder/recorder.gocmd/multi-cluster-consistency-checker/recorder/recorder_test.gocmd/multi-cluster-consistency-checker/recorder/types.gocmd/multi-cluster-consistency-checker/recorder/types_test.gocmd/multi-cluster-consistency-checker/task.gocmd/multi-cluster-consistency-checker/types/types.gocmd/multi-cluster-consistency-checker/types/types_test.gocmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.gocmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.gocmd/multi-cluster-consistency-checker/watcher/s3_watcher.gocmd/multi-cluster-consistency-checker/watcher/s3_watcher_test.go
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (4)
cmd/multi-cluster-consistency-checker/recorder/recorder.go (1)
106-135:⚠️ Potential issue | 🟠 Major
os.Staterrors other thanIsNotExistare silently swallowed.If
os.Statfails due to a permission or I/O error (not "file not found"), the code falls through to the "fresh start" path at line 138, masking real failures and potentially losing an existing checkpoint.Suggested fix
// If checkpoint.json exists, use it directly. - if _, err := os.Stat(checkpointFile); err == nil { + if _, statErr := os.Stat(checkpointFile); statErr == nil { data, err := os.ReadFile(checkpointFile) if err != nil { return errors.Trace(err) // transient I/O error } if err := json.Unmarshal(data, r.checkpoint); err != nil { return errors.Annotatef(ErrCheckpointCorruption, "failed to unmarshal checkpoint.json: %v", err) } return nil + } else if !os.IsNotExist(statErr) { + return errors.Trace(statErr) } // checkpoint.json is missing — try recovering from the backup. - if _, err := os.Stat(bakFile); err == nil { + if _, statErr := os.Stat(bakFile); statErr == nil { log.Warn("checkpoint.json not found, recovering from checkpoint.json.bak") ... return nil + } else if !os.IsNotExist(statErr) { + return errors.Trace(statErr) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go` around lines 106 - 135, The os.Stat checks for checkpointFile and bakFile swallow non-NotExist errors; update the logic in the block handling checkpointFile and the block handling bakFile so that after calling os.Stat(...) you explicitly check the error: if err == nil proceed as now, else if os.IsNotExist(err) continue to the next recovery/fresh-start path, else return/propagate the err (wrap/trace/annotate consistently with existing error handling) to avoid masking permission or I/O errors; apply this change around the checkpoint.json and checkpoint.json.bak checks that populate r.checkpoint and perform os.Rename.cmd/multi-cluster-consistency-checker/task.go (1)
285-331:⚠️ Potential issue | 🟡 MinorCleanup order: watchers are closed after their underlying clients.
cleanupClientsstill closes PD and etcd clients (lines 299–318) before closing checkpoint watchers and S3 watchers (lines 320–330). Since watchers hold active references to the etcd client for their watch loops, tearing down the client first forces in-flight operations to fail noisily before the watcher'sClose()can shut them down gracefully.Close watchers first, then clients:
Suggested fix
func cleanupClients( pdClients map[string]pd.Client, etcdClients map[string]*etcd.CDCEtcdClientImpl, checkpointWatchers map[string]map[string]watcher.Watcher, s3Watchers map[string]*watcher.S3Watcher, ) { ... - // Close PD clients - for clusterID, pdClient := range pdClients { - ... - } - - // Close etcd clients - for clusterID, etcdClient := range etcdClients { - ... - } - // Close checkpoint watchers for _, clusterWatchers := range checkpointWatchers { for _, watcher := range clusterWatchers { watcher.Close() } } // Close s3 watchers for _, s3Watcher := range s3Watchers { s3Watcher.Close() } + + // Close etcd clients + for clusterID, etcdClient := range etcdClients { + ... + } + + // Close PD clients + for clusterID, pdClient := range pdClients { + ... + } log.Info("Client cleanup completed") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/task.go` around lines 285 - 331, In cleanupClients, reverse the teardown order so watchers are closed before their underlying clients: iterate and call Close() on all checkpointWatchers and s3Watchers first (use the existing checkpointWatchers and s3Watchers maps and the watcher.Close/S3Watcher.Close methods), wait/ensure watchers stop if needed, and only after that close PD clients (pd.Client.Close) and etcd clients (etcd.CDCEtcdClientImpl.Close) with the existing logging and error handling; this prevents watchers from observing closed etcd clients mid-shutdown.cmd/multi-cluster-consistency-checker/checker/checker.go (1)
345-378:⚠️ Potential issue | 🔴 Critical
interface{}comparison at line 357 can panic on non-comparable types.The comment claims
ColumnValuesonly holds comparable types, but JSON unmarshaling can producemap[string]interface{}or[]interface{}for nested JSON values, which are non-comparable and will cause a runtime panic with!=. Usereflect.DeepEqualor normalize values before comparison.Suggested fix
+import "reflect" + func diffColumns(local, replicated *decoder.Record) []recorder.InconsistentColumn { var result []recorder.InconsistentColumn for colName, localVal := range local.ColumnValues { replicatedVal, ok := replicated.ColumnValues[colName] if !ok { result = append(result, recorder.InconsistentColumn{ Column: colName, Local: localVal, Replicated: nil, }) - } else if localVal != replicatedVal { // safe: ColumnValues only holds comparable types (see decoder.go) + } else if !reflect.DeepEqual(localVal, replicatedVal) { result = append(result, recorder.InconsistentColumn{ Column: colName, Local: localVal, Replicated: replicatedVal, }) } }#!/bin/bash # Check what types ColumnValues can hold by examining the decoder fd -t f decoder.go --exec grep -n "ColumnValues\|columnValues\|valueToDatum\|interface{}" {} \; echo "---" # Look for the specific function that populates ColumnValues ast-grep --pattern 'func $_($$) { $$$ ColumnValues[$_] = $_ $$$ }'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/checker/checker.go` around lines 345 - 378, The diffColumns function currently uses the non-comparable != operator on values from decoder.Record.ColumnValues which can be maps/slices from JSON and will panic; replace the direct != check with a safe deep comparison (use reflect.DeepEqual) when comparing localVal and replicatedVal, and ensure you still treat missing keys as inconsistencies; update the comment above diffColumns to reflect that DeepEqual is used to compare possibly non-comparable nested values and keep the rest of the logic (the two-loops that append recorder.InconsistentColumn for missing keys) unchanged so only the equality check is altered.cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go (1)
70-77: Past review concern addressed: pending tasks are now unblocked on terminal errors.
failPendingTasksLocked(lines 72–77) is invoked whenerrChangefeedKeyDeletedis encountered (line 198), properly unblocking any goroutines waiting inAdvanceCheckpointTs. This resolves the prior concern.Also applies to: 194-201
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go` around lines 70 - 77, The duplicate review comment indicates the same approval was posted twice; remove the redundant comment and ensure the behavior remains: when errChangefeedKeyDeleted triggers, Call failPendingTasksLocked to close each task.respCh (in CheckpointWatcher.failPendingTasksLocked) so goroutines blocked in AdvanceCheckpointTs are unblocked; verify the other instance referencing the same logic (the occurrence around errChangefeedKeyDeleted / the block that calls failPendingTasksLocked) is consistent and not left with outdated comments.
🧹 Nitpick comments (5)
cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go (1)
60-62:time.Sleep-based synchronization is inherently flaky.Multiple tests rely on
time.Sleep(50 * time.Millisecond)to wait for the background goroutine to initialize. Under CI load or slow environments, this can intermittently fail. Consider using a polling/retry helper (e.g.,require.Eventually) or exposing a ready signal from the watcher for tests.That said, the timeouts are generous enough for practical use and this is a common Go test pattern, so this is a non-blocking suggestion.
Also applies to: 98-100, 165-167, 219-221, 285-287, 394-396, 440-442, 447-448, 524-526
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go` around lines 60 - 62, Tests in checkpoint_watcher_test.go use time.Sleep(50 * time.Millisecond) to wait for the watcher goroutine to initialize which is flaky; replace these sleeps (occurrences around the watcher initialization and other spots) with a deterministic wait such as require.Eventually or a short polling loop that checks a ready condition on the watcher (e.g., expose a Ready() bool or channel from the watcher created by NewCheckpointWatcher/Start) and wait until that returns true (with a timeout) so tests no longer rely on fixed sleeps.cmd/multi-cluster-consistency-checker/checker/failpoint.go (1)
43-67: File handle is never closed.The
fileopened byensureFileis never closed. While this is a single FD for the process lifetime, it means the final JSONL entries may not be flushed on abrupt exit. Consider adding aClose()or usingdefer file.Sync()at the end ofWrite, or registering anatexithandler.Also, the file is opened with
0644permissions. Since the failpoint output may contain primary key data,0600would be more restrictive.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 43 - 67, The opened file in ensureFile (variable file) is never closed and is created with too-permissive mode; modify ensureFile to open the file with mode 0600 instead of 0644 and add a safe close/sync path: ensure the Write function (the method that appends JSONL entries) calls file.Sync() after successful writes (or flushes) and register a one-time cleanup/Close for file (e.g., via a package-level CloseFailpoint or an init/interrupt handler that calls file.Sync() and file.Close(), using initOnce/mu to guard concurrent access), and ensure disabled/ensureFile semantics remain intact when file is nil.cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go (1)
245-251: Log messages start with uppercase.Per coding guidelines, log message strings should follow a consistent style. These messages (
"Starting to watch checkpoint","Checkpoint updated") start with uppercase, which is atypical for structured log messages in Go.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go` around lines 245 - 251, Update the structured log message strings in checkpoint_watcher.go to use sentence case starting with lowercase (e.g. change "Starting to watch checkpoint" -> "starting to watch checkpoint" and "Checkpoint updated" -> "checkpoint updated") so they follow project logging style; locate the zap log.Debug calls that reference cw.changefeedID, statusKey, cw.localClusterID, cw.replicatedClusterID, status.CheckpointTs and modRev (and any other similar log.Debug invocations in the same file/method) and replace the leading uppercase log message text with the lowercase variants while keeping all existing structured fields unchanged.cmd/multi-cluster-consistency-checker/task.go (1)
168-170: Useerrors.Errorfinstead offmt.Errorffor consistency and stack traces.Multiple error returns in this file use
fmt.Errorf(lines 169, 177, 184, 198, 205, 227, 232, 247) instead of the project'serrors.Errorf. The latter attaches stack traces, which is consistent with the rest of the codebase and the coding guidelines about wrapping errors with theerrorspackage.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/task.go` around lines 168 - 170, Replace uses of fmt.Errorf in the s3 changefeed sink validation and the other error-return sites in this file with the project's errors.Errorf so stack traces are preserved; specifically update the error creation in the block that checks cfInfo.Config and cfInfo.Config.Sink (where clusterID and s3ChangefeedID are used) and the other analogous returns in this file (the other fmt.Errorf occurrences referenced in the review) to call errors.Errorf with the same format string and arguments, ensuring the errors package is imported if not already.cmd/multi-cluster-consistency-checker/recorder/recorder.go (1)
162-180: Non-atomic two-file report flush.
flushReportwritesreport-N.reportandreport-N.jsonsequentially. If the process crashes between the two writes, one file exists without the other. For a monitoring/auditing tool, this means a partially flushed round. Consider treating the pair atomically (e.g., write both to temp files first, then rename both) or documenting this as an accepted trade-off.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go` around lines 162 - 180, The flushReport function currently writes reportName and jsonName sequentially which can leave a partial pair on crash; modify Recorder.flushReport to persist the pair atomically: marshal both outputs, write them to temporary files (e.g., reportName.tmp and jsonName.tmp) inside r.reportDir using atomicWriteFile (or reuse its temp-then-rename logic), then rename/move both temp files to their final names (reportName and jsonName) so either both appear or none; update r.reportFiles only after successful renames of both files (use the existing reportName, jsonName, r.reportDir, atomicWriteFile, and r.reportFiles symbols to locate and change the logic).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 1-98: Run the formatter to fix import ordering and gofumports/gci
issues and commit the changes: run `make fmt` (or `gofumports`/`gci` directly)
to reformat the file containing ensureFile, Write, and the import block so the
imports are grouped/ordered correctly and any spacing or unused-import fixes are
applied; then re-run CI and push the formatted changes.
In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go`:
- Around line 176-184: When run() observes cw.ctx.Done() it sets cw.watchErr but
doesn't drain pending tasks, which can leave callers that were still acquiring
cw.mu stuck; after setting cw.watchErr you should call failPendingTasksLocked()
while holding cw.mu to drain and notify all pending tasks just like the
key-deletion path does. Update the ctx.Done() branch in CheckpointWatcher.run()
to acquire cw.mu, set cw.watchErr (errors.Annotate(cw.ctx.Err(), "context
canceled")), call failPendingTasksLocked(), release the lock, and then return so
no pending AdvanceCheckpointTs tasks remain enqueued without a notifier.
In `@pkg/sink/cloudstorage/path_key.go`:
- Around line 187-189: The regex in the path building code (where str is
appended with `CDC(?:_(\w+)_)?(\d+).\w+` and compiled into pathRE) uses an
unescaped dot after (\d+), which allows '/' to match; change that segment to
require a literal dot by replacing `(\d+).\w+` with `(\d+)\.\w+` (i.e., update
the string passed to regexp.Compile that produces pathRE).
- Around line 189-207: The errors returned from regexp.Compile and
strconv.Parse* calls are returned raw; wrap them with errors.Trace so stack
context is preserved: when calling regexp.Compile(str) (variable pathRE) return
errors.Trace(err) instead of err, and do the same for
strconv.ParseUint(matches[3], 10, 64) and strconv.ParseInt(matches[4], 10, 64)
(used to set version and partitionNum); keep the same error messages/flow
(including the fmt.Errorf when matches length mismatches) but replace direct err
returns with errors.Trace(err) to preserve stack traces.
- Around line 211-214: The code trims leading zeros from matches[matchesFileIdx]
then calls strconv.ParseUint, which yields an "invalid syntax" error when the
trimmed result is empty (i.e., the original was all zeros). Before calling
strconv.ParseUint in the section around fileIdx and matches[matchesFileIdx],
check the trimmed value and if it is an empty string set it to "0", then call
strconv.ParseUint; this ensures zero-index files parse correctly while keeping
the existing use of strconv.ParseUint and matches[matchesFileIdx].
---
Duplicate comments:
In `@cmd/multi-cluster-consistency-checker/checker/checker.go`:
- Around line 345-378: The diffColumns function currently uses the
non-comparable != operator on values from decoder.Record.ColumnValues which can
be maps/slices from JSON and will panic; replace the direct != check with a safe
deep comparison (use reflect.DeepEqual) when comparing localVal and
replicatedVal, and ensure you still treat missing keys as inconsistencies;
update the comment above diffColumns to reflect that DeepEqual is used to
compare possibly non-comparable nested values and keep the rest of the logic
(the two-loops that append recorder.InconsistentColumn for missing keys)
unchanged so only the equality check is altered.
In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go`:
- Around line 106-135: The os.Stat checks for checkpointFile and bakFile swallow
non-NotExist errors; update the logic in the block handling checkpointFile and
the block handling bakFile so that after calling os.Stat(...) you explicitly
check the error: if err == nil proceed as now, else if os.IsNotExist(err)
continue to the next recovery/fresh-start path, else return/propagate the err
(wrap/trace/annotate consistently with existing error handling) to avoid masking
permission or I/O errors; apply this change around the checkpoint.json and
checkpoint.json.bak checks that populate r.checkpoint and perform os.Rename.
In `@cmd/multi-cluster-consistency-checker/task.go`:
- Around line 285-331: In cleanupClients, reverse the teardown order so watchers
are closed before their underlying clients: iterate and call Close() on all
checkpointWatchers and s3Watchers first (use the existing checkpointWatchers and
s3Watchers maps and the watcher.Close/S3Watcher.Close methods), wait/ensure
watchers stop if needed, and only after that close PD clients (pd.Client.Close)
and etcd clients (etcd.CDCEtcdClientImpl.Close) with the existing logging and
error handling; this prevents watchers from observing closed etcd clients
mid-shutdown.
In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go`:
- Around line 70-77: The duplicate review comment indicates the same approval
was posted twice; remove the redundant comment and ensure the behavior remains:
when errChangefeedKeyDeleted triggers, Call failPendingTasksLocked to close each
task.respCh (in CheckpointWatcher.failPendingTasksLocked) so goroutines blocked
in AdvanceCheckpointTs are unblocked; verify the other instance referencing the
same logic (the occurrence around errChangefeedKeyDeleted / the block that calls
failPendingTasksLocked) is consistent and not left with outdated comments.
---
Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 43-67: The opened file in ensureFile (variable file) is never
closed and is created with too-permissive mode; modify ensureFile to open the
file with mode 0600 instead of 0644 and add a safe close/sync path: ensure the
Write function (the method that appends JSONL entries) calls file.Sync() after
successful writes (or flushes) and register a one-time cleanup/Close for file
(e.g., via a package-level CloseFailpoint or an init/interrupt handler that
calls file.Sync() and file.Close(), using initOnce/mu to guard concurrent
access), and ensure disabled/ensureFile semantics remain intact when file is
nil.
In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go`:
- Around line 162-180: The flushReport function currently writes reportName and
jsonName sequentially which can leave a partial pair on crash; modify
Recorder.flushReport to persist the pair atomically: marshal both outputs, write
them to temporary files (e.g., reportName.tmp and jsonName.tmp) inside
r.reportDir using atomicWriteFile (or reuse its temp-then-rename logic), then
rename/move both temp files to their final names (reportName and jsonName) so
either both appear or none; update r.reportFiles only after successful renames
of both files (use the existing reportName, jsonName, r.reportDir,
atomicWriteFile, and r.reportFiles symbols to locate and change the logic).
In `@cmd/multi-cluster-consistency-checker/task.go`:
- Around line 168-170: Replace uses of fmt.Errorf in the s3 changefeed sink
validation and the other error-return sites in this file with the project's
errors.Errorf so stack traces are preserved; specifically update the error
creation in the block that checks cfInfo.Config and cfInfo.Config.Sink (where
clusterID and s3ChangefeedID are used) and the other analogous returns in this
file (the other fmt.Errorf occurrences referenced in the review) to call
errors.Errorf with the same format string and arguments, ensuring the errors
package is imported if not already.
In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.go`:
- Around line 60-62: Tests in checkpoint_watcher_test.go use time.Sleep(50 *
time.Millisecond) to wait for the watcher goroutine to initialize which is
flaky; replace these sleeps (occurrences around the watcher initialization and
other spots) with a deterministic wait such as require.Eventually or a short
polling loop that checks a ready condition on the watcher (e.g., expose a
Ready() bool or channel from the watcher created by NewCheckpointWatcher/Start)
and wait until that returns true (with a timeout) so tests no longer rely on
fixed sleeps.
In `@cmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.go`:
- Around line 245-251: Update the structured log message strings in
checkpoint_watcher.go to use sentence case starting with lowercase (e.g. change
"Starting to watch checkpoint" -> "starting to watch checkpoint" and "Checkpoint
updated" -> "checkpoint updated") so they follow project logging style; locate
the zap log.Debug calls that reference cw.changefeedID, statusKey,
cw.localClusterID, cw.replicatedClusterID, status.CheckpointTs and modRev (and
any other similar log.Debug invocations in the same file/method) and replace the
leading uppercase log message text with the lowercase variants while keeping all
existing structured fields unchanged.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
cmd/multi-cluster-consistency-checker/checker/checker.gocmd/multi-cluster-consistency-checker/checker/checker_test.gocmd/multi-cluster-consistency-checker/checker/failpoint.gocmd/multi-cluster-consistency-checker/config/config_test.gocmd/multi-cluster-consistency-checker/integration/integration_test.gocmd/multi-cluster-consistency-checker/recorder/recorder.gocmd/multi-cluster-consistency-checker/recorder/recorder_test.gocmd/multi-cluster-consistency-checker/task.gocmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher.gocmd/multi-cluster-consistency-checker/watcher/checkpoint_watcher_test.gopkg/sink/cloudstorage/path_key.gopkg/sink/cloudstorage/path_key_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- cmd/multi-cluster-consistency-checker/recorder/recorder_test.go
- cmd/multi-cluster-consistency-checker/config/config_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
cmd/multi-cluster-consistency-checker/checker/failpoint.go (1)
1-98: Formatting issue already flagged.The
gofumports/gciformatting failure noted in a previous review has not been resolved. Runmake fmtbefore merging.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 1 - 98, The file has gofumports/gci formatting failures; run the formatter and reorder/import-grouping tool and re-run gofmt: run `make fmt` (or run gofumports/gci directly) and commit the resulting changes so imports and formatting in failpoint.go (notably around the package imports and functions ensureFile and Write) comply with the repository style; no functional changes required—just apply the automatic formatters and re-run tests/lint before pushing.
🧹 Nitpick comments (1)
cmd/multi-cluster-consistency-checker/checker/failpoint.go (1)
43-48: Package-levelsync.Onceprevents state reset between tests.These package-level globals are initialized exactly once per process lifetime. Any test that needs to exercise a different
TICDC_MULTI_CLUSTER_CONSISTENCY_CHECKER_FAILPOINT_RECORD_FILEvalue (e.g., testing both the enabled and disabled paths) cannot resetinitOnce,file, ordisabled, forcing per-test-case process isolation.Consider wrapping the state in a struct and exposing a package-level default instance, which keeps the call sites identical while making the component resettable and injectable in tests.
♻️ Sketch of a struct-based approach
-var ( - initOnce sync.Once - mu sync.Mutex - file *os.File - disabled atomic.Bool -) - -func ensureFile() { ... } - -// Write persists one failpoint event to the JSONL file. -func Write(failpoint string, rows []RowRecord) { ... } +// Recorder serializes failpoint events to a JSONL file. +type Recorder struct { + initOnce sync.Once + mu sync.Mutex + file *os.File + disabled atomic.Bool +} + +func (r *Recorder) ensureFile() { ... } + +// Write persists one failpoint event to the JSONL file. +func (r *Recorder) Write(failpoint string, rows []RowRecord) { ... } + +// defaultRecorder is the package-level singleton driven by the env var. +var defaultRecorder Recorder + +// Write is a convenience wrapper around the default recorder. +func Write(failpoint string, rows []RowRecord) { + defaultRecorder.Write(failpoint, rows) +}Tests can then construct a
Recorderwith a temp file path without touching the global.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 43 - 48, The package-level sync.Once and globals (initOnce, mu, file, disabled) make the recorder state unresettable in tests; refactor by creating a Recorder struct encapsulating these fields (e.g., type Recorder struct { initOnce sync.Once; mu sync.Mutex; file *os.File; disabled atomic.Bool; path string }), replace package-level vars with a package-level default Recorder instance (e.g., var defaultRecorder = NewRecorderFromEnv()), and change public functions that reference initOnce/mu/file/disabled to be methods on *Recorder while keeping wrappers that call defaultRecorder.Method(...) so call sites remain the same but tests can instantiate/reset a Recorder with different TICDC_MULTI_CLUSTER_CONSISTENCY_CHECKER_FAILPOINT_RECORD_FILE values.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 93-97: The write path that acquires mu and calls file.Write(data)
should also call file.Sync() after a successful write to flush kernel buffers to
stable storage; update the failpoint record write block (the mu.Lock()/defer
mu.Unlock() section where file.Write(data) is called) to call file.Sync() when
Write returns no error and log any sync errors with log.Warn/zap.Error so
failures are visible; ensure Sync is performed while still holding mu (or
otherwise ensure no concurrent writes) so the flush corresponds to the written
record.
---
Duplicate comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 1-98: The file has gofumports/gci formatting failures; run the
formatter and reorder/import-grouping tool and re-run gofmt: run `make fmt` (or
run gofumports/gci directly) and commit the resulting changes so imports and
formatting in failpoint.go (notably around the package imports and functions
ensureFile and Write) comply with the repository style; no functional changes
required—just apply the automatic formatters and re-run tests/lint before
pushing.
---
Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 43-48: The package-level sync.Once and globals (initOnce, mu,
file, disabled) make the recorder state unresettable in tests; refactor by
creating a Recorder struct encapsulating these fields (e.g., type Recorder
struct { initOnce sync.Once; mu sync.Mutex; file *os.File; disabled atomic.Bool;
path string }), replace package-level vars with a package-level default Recorder
instance (e.g., var defaultRecorder = NewRecorderFromEnv()), and change public
functions that reference initOnce/mu/file/disabled to be methods on *Recorder
while keeping wrappers that call defaultRecorder.Method(...) so call sites
remain the same but tests can instantiate/reset a Recorder with different
TICDC_MULTI_CLUSTER_CONSISTENCY_CHECKER_FAILPOINT_RECORD_FILE values.
| mu.Lock() | ||
| defer mu.Unlock() | ||
| if _, err := file.Write(data); err != nil { | ||
| log.Warn("failed to write failpoint record", zap.Error(err)) | ||
| } |
There was a problem hiding this comment.
Missing file.Sync() risks losing records on unclean process exit.
os.File.Write() writes into the OS kernel buffer. On SIGKILL or an OOM kill, unflushed kernel pages for the file can be lost. For a tool whose purpose is capturing data anomalies, silently losing the last batch of failpoint records on crash significantly reduces its reliability.
🛡️ Proposed fix
mu.Lock()
defer mu.Unlock()
if _, err := file.Write(data); err != nil {
log.Warn("failed to write failpoint record", zap.Error(err))
+ return
}
+ if err := file.Sync(); err != nil {
+ log.Warn("failed to sync failpoint record file", zap.Error(err))
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| mu.Lock() | |
| defer mu.Unlock() | |
| if _, err := file.Write(data); err != nil { | |
| log.Warn("failed to write failpoint record", zap.Error(err)) | |
| } | |
| mu.Lock() | |
| defer mu.Unlock() | |
| if _, err := file.Write(data); err != nil { | |
| log.Warn("failed to write failpoint record", zap.Error(err)) | |
| return | |
| } | |
| if err := file.Sync(); err != nil { | |
| log.Warn("failed to sync failpoint record file", zap.Error(err)) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 93 -
97, The write path that acquires mu and calls file.Write(data) should also call
file.Sync() after a successful write to flush kernel buffers to stable storage;
update the failpoint record write block (the mu.Lock()/defer mu.Unlock() section
where file.Write(data) is called) to call file.Sync() when Write returns no
error and log any sync errors with log.Warn/zap.Error so failures are visible;
ensure Sync is performed while still holding mu (or otherwise ensure no
concurrent writes) so the flush corresponds to the written record.
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (3)
cmd/multi-cluster-consistency-checker/checker/failpoint.go (1)
94-98:⚠️ Potential issue | 🟠 MajorPersist writes durably before releasing the lock.
Line 96 writes to the file but never calls
file.Sync(). On abrupt exit, buffered data can be lost, which weakens auditability/resumability for checker failpoint records.💾 Suggested change
mu.Lock() defer mu.Unlock() if _, err := file.Write(data); err != nil { log.Warn("failed to write failpoint record", zap.Error(err)) + return + } + if err := file.Sync(); err != nil { + log.Warn("failed to sync failpoint record file", zap.Error(err)) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` around lines 94 - 98, The write to the failpoint file must be durably persisted before releasing mu: after calling file.Write(data) in the same critical section (where mu is held), call file.Sync() and check its error, logging via log.Warn (similar to the Write error) if Sync fails; ensure both write and sync errors are handled before allowing the deferred mu.Unlock() to run so failpoint records are durable across abrupt exits.cmd/multi-cluster-consistency-checker/decoder/decoder.go (1)
328-329:⚠️ Potential issue | 🟡 MinorRemove hyphen from log message text.
Line 328 still logs
"canal-json ...". This violates the log-string style rule in this repo.As per coding guidelines, log message strings should avoid hyphens (use spaces instead).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go` around lines 328 - 329, Update the log message passed to log.Panic so it removes the hyphen in "canal-json" (replace with a space: "canal json") to comply with the repo's log-string style rule; locate the log.Panic call that currently reads "canal-json encoded message should have type in `string`" in decoder.go and update that string literal accordingly.cmd/multi-cluster-consistency-checker/consumer/consumer.go (1)
271-299:⚠️ Potential issue | 🟠 MajorBound all remaining errgroup fan-outs, not only DML file reads.
downloadDMLFilesis limited, but these paths still fan out unbounded goroutines. Under large table/version counts this can still exhaust resources and overload S3.#!/bin/bash # Verify errgroup usage vs SetLimit in this file. rg -n 'errgroup\.WithContext|SetLimit\(' cmd/multi-cluster-consistency-checker/consumer/consumer.goAlso applies to: 392-412, 464-483, 687-711, 729-762
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go` around lines 271 - 299, The outer errgroup (eg, created via errgroup.WithContext in consumer.go) spawns unbounded goroutines for each schemaTableKey; bound those fan-outs by applying a concurrency limit (eg.SetLimit(N)) on that errgroup before the for-loop or by using a semaphore/channel to limit concurrent eg.Go calls; ensure downloadSchemaFilesWithScanRange and downloadDataFilesWithScanRange calls are executed under that bounded group, and if downloadDataFilesWithScanRange itself spawns goroutines (or calls downloadDMLFiles), add a local bounded errgroup or semaphore inside downloadDataFilesWithScanRange/downloadDMLFiles to cap S3/read concurrency as well.
🧹 Nitpick comments (2)
cmd/multi-cluster-consistency-checker/checker/checker.go (1)
411-456: Stabilize detection iteration order to keep report item ordering reproducible.These detection passes iterate maps directly, so failure-item append order can vary between runs. Sorting schema keys / PKs / timestamps before iteration would make report outputs reliably stable.
Also applies to: 462-477, 481-517
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/checker/checker.go` around lines 411 - 456, The iteration over maps in the detection pass (starting from cd.timeWindowDataCaches[timeWindowIdx].tableDataCaches and iterating schemaKey, localDataCache, and cd.timeWindowDataCaches[timeWindowIdx].checkpointTs) causes non-deterministic report ordering; update the loops to iterate deterministic, sorted slices instead: extract and sort schema keys from cd.timeWindowDataCaches[timeWindowIdx].tableDataCaches before looping, extract and sort primary-key identifiers (e.g., record.PkStr or the map keys used to build localDataCache) before iterating localDataCache, and extract and sort replicatedClusterID/checkpointTs pairs (by cluster ID or by timestamp) from cd.timeWindowDataCaches[timeWindowIdx].checkpointTs before iterating; apply the same sorting approach to the other affected blocks (around lines 462–477 and 481–517) so report append order is reproducible.cmd/multi-cluster-consistency-checker/recorder/recorder.go (1)
57-69: Sort retention candidates by round number, not filename lexicographic order.
sort.Strings(reportFiles)on startup can misorder rounds (report-10.*beforereport-2.*), socleanupOldReportsmay delete newer files first after restart.Also applies to: 225-237
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go` around lines 57 - 69, The startup code collects report file names into reportFiles and currently uses sort.Strings(reportFiles), which sorts lexicographically and misorders rounds (e.g., report-10 before report-2); change the sorting to numeric round order by parsing the round number from each filename (e.g., extract the integer after "report-" and before the next separator) and use sort.Slice with a comparator that compares those integers so older rounds come first; apply the same change to the other occurrence referenced by cleanupOldReports so both places sort by parsed round number rather than filename string.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Line 59: The file is currently created with permissions set to 0o644, which
allows read access to other local users and exposes sensitive PrimaryKeys. To
fix this, change the file permission argument in the os.OpenFile call within the
failpoint.go file (at the line opening the file for append) from 0o644 to 0o600,
ensuring only the owner has read and write access, following the least privilege
principle and matching the pattern used in recorder.go.
In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go`:
- Around line 255-256: The code dereferences
d.msg.TiDBCommitTsExtension.CommitTs without checking that d.msg and its
TiDBCommitTsExtension (the `_tidb` extension) are present, which can panic on
malformed payloads; update the decoder method that contains the lines setting
commitTs := d.msg.TiDBCommitTsExtension.CommitTs and d.msg = nil to first
validate d.msg != nil and d.msg.TiDBCommitTsExtension != nil (or the equivalent
presence flag), and if absent return a wrapped error (e.g., fmt.Errorf("missing
_tidb extension: %w", err)) instead of dereferencing, preserving existing
cleanup (d.msg = nil) only after successful validation.
- Around line 177-181: The current decoding uses bytes.IndexAny(d.data,
d.config.Terminator) which treats d.config.Terminator ("\\r\\n") as a set of
runes and matches either '\r' or '\n', causing incorrect splits; replace that
call with bytes.Index(d.data, []byte(d.config.Terminator)) (or equivalent) to
search for the full CRLF sequence, then keep the existing logic that sets
encodedData = d.data[:idx] and advances d.data =
d.data[idx+len(d.config.Terminator):] so records are only split on the full
terminator.
In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go`:
- Around line 261-275: Update the test expectations to match the current
formatter that uses "local commit ts" instead of "commit ts": in the test case
using NewReport(1) and NewClusterReport("c1", tw) where AddDataLossItem("d1",
testSchemaKey, ...) is called and s := r.MarshalReport() is asserted, replace
the expected substring `commit ts` with `local commit ts` for the DataLossItem
lines (the assertions referencing the peer cluster line produced by
MarshalReport). Ensure the same change is applied to the other test cases in
this file that use AddDataLossItem and assert MarshalReport output.
---
Duplicate comments:
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go`:
- Around line 94-98: The write to the failpoint file must be durably persisted
before releasing mu: after calling file.Write(data) in the same critical section
(where mu is held), call file.Sync() and check its error, logging via log.Warn
(similar to the Write error) if Sync fails; ensure both write and sync errors
are handled before allowing the deferred mu.Unlock() to run so failpoint records
are durable across abrupt exits.
In `@cmd/multi-cluster-consistency-checker/consumer/consumer.go`:
- Around line 271-299: The outer errgroup (eg, created via errgroup.WithContext
in consumer.go) spawns unbounded goroutines for each schemaTableKey; bound those
fan-outs by applying a concurrency limit (eg.SetLimit(N)) on that errgroup
before the for-loop or by using a semaphore/channel to limit concurrent eg.Go
calls; ensure downloadSchemaFilesWithScanRange and
downloadDataFilesWithScanRange calls are executed under that bounded group, and
if downloadDataFilesWithScanRange itself spawns goroutines (or calls
downloadDMLFiles), add a local bounded errgroup or semaphore inside
downloadDataFilesWithScanRange/downloadDMLFiles to cap S3/read concurrency as
well.
In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go`:
- Around line 328-329: Update the log message passed to log.Panic so it removes
the hyphen in "canal-json" (replace with a space: "canal json") to comply with
the repo's log-string style rule; locate the log.Panic call that currently reads
"canal-json encoded message should have type in `string`" in decoder.go and
update that string literal accordingly.
---
Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/checker/checker.go`:
- Around line 411-456: The iteration over maps in the detection pass (starting
from cd.timeWindowDataCaches[timeWindowIdx].tableDataCaches and iterating
schemaKey, localDataCache, and
cd.timeWindowDataCaches[timeWindowIdx].checkpointTs) causes non-deterministic
report ordering; update the loops to iterate deterministic, sorted slices
instead: extract and sort schema keys from
cd.timeWindowDataCaches[timeWindowIdx].tableDataCaches before looping, extract
and sort primary-key identifiers (e.g., record.PkStr or the map keys used to
build localDataCache) before iterating localDataCache, and extract and sort
replicatedClusterID/checkpointTs pairs (by cluster ID or by timestamp) from
cd.timeWindowDataCaches[timeWindowIdx].checkpointTs before iterating; apply the
same sorting approach to the other affected blocks (around lines 462–477 and
481–517) so report append order is reproducible.
In `@cmd/multi-cluster-consistency-checker/recorder/recorder.go`:
- Around line 57-69: The startup code collects report file names into
reportFiles and currently uses sort.Strings(reportFiles), which sorts
lexicographically and misorders rounds (e.g., report-10 before report-2); change
the sorting to numeric round order by parsing the round number from each
filename (e.g., extract the integer after "report-" and before the next
separator) and use sort.Slice with a comparator that compares those integers so
older rounds come first; apply the same change to the other occurrence
referenced by cleanupOldReports so both places sort by parsed round number
rather than filename string.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
cmd/multi-cluster-consistency-checker/checker/checker.gocmd/multi-cluster-consistency-checker/checker/checker_test.gocmd/multi-cluster-consistency-checker/checker/failpoint.gocmd/multi-cluster-consistency-checker/consumer/consumer.gocmd/multi-cluster-consistency-checker/decoder/decoder.gocmd/multi-cluster-consistency-checker/recorder/recorder.gocmd/multi-cluster-consistency-checker/recorder/types.gocmd/multi-cluster-consistency-checker/recorder/types_test.go
| return | ||
| } | ||
| var err error | ||
| file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and examine the failpoint.go file
find . -name "failpoint.go" -type fRepository: pingcap/ticdc
Length of output: 139
🏁 Script executed:
# Once found, let's look at the context around line 59
head -80 cmd/multi-cluster-consistency-checker/checker/failpoint.go | tail -40Repository: pingcap/ticdc
Length of output: 941
🏁 Script executed:
# Also check the full context of the file to understand data being written
wc -l cmd/multi-cluster-consistency-checker/checker/failpoint.goRepository: pingcap/ticdc
Length of output: 118
🏁 Script executed:
# Let's read the full failpoint.go file to understand data structure and context
cat -n cmd/multi-cluster-consistency-checker/checker/failpoint.goRepository: pingcap/ticdc
Length of output: 3236
🏁 Script executed:
# Let's also check if there are other instances of os.OpenFile in the codebase to see permission patterns
rg "os\.OpenFile" --type go -B2 -A2Repository: pingcap/ticdc
Length of output: 3344
🏁 Script executed:
# Let's check what DefaultFileMode is set to in the redo package
rg "DefaultFileMode" --type go -B2 -A2Repository: pingcap/ticdc
Length of output: 1687
🏁 Script executed:
# Let's also verify if there are any tests or comments explaining the 0o644 choice
rg "0o644|0644" --type goRepository: pingcap/ticdc
Length of output: 3379
Change file permissions from 0o644 to 0o600 to restrict access to sensitive failpoint records.
Line 59 creates the file with world-readable permissions (0o644), exposing failpoint records containing PrimaryKeys (sensitive database identifiers) to other local users. Use owner-only permissions (0o600) to follow the principle of least privilege. This pattern is already established in the same module at cmd/multi-cluster-consistency-checker/recorder/recorder.go, which uses 0o600 for writing similar data.
🔐 Suggested change
- file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
+ file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) | |
| file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/multi-cluster-consistency-checker/checker/failpoint.go` at line 59, The
file is currently created with permissions set to 0o644, which allows read
access to other local users and exposes sensitive PrimaryKeys. To fix this,
change the file permission argument in the os.OpenFile call within the
failpoint.go file (at the line opening the file for append) from 0o644 to 0o600,
ensuring only the owner has read and write access, following the least privilege
principle and matching the pattern used in recorder.go.
| idx := bytes.IndexAny(d.data, d.config.Terminator) | ||
| if idx >= 0 { | ||
| encodedData = d.data[:idx] | ||
| d.data = d.data[idx+len(d.config.Terminator):] | ||
| } else { |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Go bytes.IndexAny function behavior does it match individual runes or full string sequence
💡 Result:
bytes.IndexAny(s, chars) matches individual Unicode code points (runes), not a full multi-rune string sequence. It treats chars as a set of runes and returns the byte index in s of the first occurrence of any one of those runes. If none match (or chars is empty), it returns -1. [1]
If you need to find a full substring/byte sequence, use bytes.Index(s, sep) (with sep as a []byte). [1]
Example (sequence vs “any of”):
bytes.IndexAny([]byte("abcXYZ"), "XY")matches'X'(or'Y') individually.bytes.Index([]byte("abcXYZ"), []byte("XY"))matches the sequence"XY".
Sources
- Go
bytes.IndexAny/bytes.Indexdocs (pkg.go.dev) [1]
🏁 Script executed:
cat -n cmd/multi-cluster-consistency-checker/decoder/decoder.go | head -200Repository: pingcap/ticdc
Length of output: 6810
🏁 Script executed:
cat -n cmd/multi-cluster-consistency-checker/decoder/decoder.go | sed -n '200,340p'Repository: pingcap/ticdc
Length of output: 5602
🏁 Script executed:
rg -n "config.CRLF\|CRLF\s*=" --type go | head -20Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
rg -n "CRLF" --type go pkg/config/ | head -30Repository: pingcap/ticdc
Length of output: 305
Use bytes.Index to match full CRLF sequence, not individual runes.
Line 177 uses bytes.IndexAny(d.data, d.config.Terminator) where Terminator is "\r\n". The bytes.IndexAny function treats its second argument as a set of individual runes and matches either \r OR \n—not the full CRLF sequence. This causes data corruption when a lone newline appears in the payload, splitting records at the wrong boundary.
Fix
- idx := bytes.IndexAny(d.data, d.config.Terminator)
+ idx := bytes.Index(d.data, []byte(d.config.Terminator))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| idx := bytes.IndexAny(d.data, d.config.Terminator) | |
| if idx >= 0 { | |
| encodedData = d.data[:idx] | |
| d.data = d.data[idx+len(d.config.Terminator):] | |
| } else { | |
| idx := bytes.Index(d.data, []byte(d.config.Terminator)) | |
| if idx >= 0 { | |
| encodedData = d.data[:idx] | |
| d.data = d.data[idx+len(d.config.Terminator):] | |
| } else { |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go` around lines 177 -
181, The current decoding uses bytes.IndexAny(d.data, d.config.Terminator) which
treats d.config.Terminator ("\\r\\n") as a set of runes and matches either '\r'
or '\n', causing incorrect splits; replace that call with bytes.Index(d.data,
[]byte(d.config.Terminator)) (or equivalent) to search for the full CRLF
sequence, then keep the existing logic that sets encodedData = d.data[:idx] and
advances d.data = d.data[idx+len(d.config.Terminator):] so records are only
split on the full terminator.
| commitTs := d.msg.TiDBCommitTsExtension.CommitTs | ||
| d.msg = nil |
There was a problem hiding this comment.
Guard _tidb extension before dereferencing commitTs.
Line 255 can panic on malformed row payloads if _tidb is absent/null. Return a wrapped error instead of dereferencing unconditionally.
🔧 Proposed fix
- commitTs := d.msg.TiDBCommitTsExtension.CommitTs
+ if d.msg.TiDBCommitTsExtension == nil {
+ return nil, errors.New("missing _tidb commit ts extension")
+ }
+ commitTs := d.msg.TiDBCommitTsExtension.CommitTs📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| commitTs := d.msg.TiDBCommitTsExtension.CommitTs | |
| d.msg = nil | |
| if d.msg.TiDBCommitTsExtension == nil { | |
| return nil, errors.New("missing _tidb commit ts extension") | |
| } | |
| commitTs := d.msg.TiDBCommitTsExtension.CommitTs | |
| d.msg = nil |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/multi-cluster-consistency-checker/decoder/decoder.go` around lines 255 -
256, The code dereferences d.msg.TiDBCommitTsExtension.CommitTs without checking
that d.msg and its TiDBCommitTsExtension (the `_tidb` extension) are present,
which can panic on malformed payloads; update the decoder method that contains
the lines setting commitTs := d.msg.TiDBCommitTsExtension.CommitTs and d.msg =
nil to first validate d.msg != nil and d.msg.TiDBCommitTsExtension != nil (or
the equivalent presence flag), and if absent return a wrapped error (e.g.,
fmt.Errorf("missing _tidb extension: %w", err)) instead of dereferencing,
preserving existing cleanup (d.msg = nil) only after successful validation.
| } else if !os.IsNotExist(err) { | ||
| return errors.Annotatef(ErrCheckpointCorruption, "failed to stat checkpoint.json: %v", err) | ||
| } |
There was a problem hiding this comment.
Do not classify os.Stat operational failures as checkpoint corruption.
Line 117 and Line 138 annotate stat failures with ErrCheckpointCorruption, which can mislead operators (e.g., permission/IO/transient failures are not corruption).
🔧 Proposed fix
- } else if !os.IsNotExist(err) {
- return errors.Annotatef(ErrCheckpointCorruption, "failed to stat checkpoint.json: %v", err)
+ } else if !os.IsNotExist(err) {
+ return errors.Trace(err)
}
...
- } else if !os.IsNotExist(err) {
- return errors.Annotatef(ErrCheckpointCorruption, "failed to stat checkpoint.json.bak: %v", err)
+ } else if !os.IsNotExist(err) {
+ return errors.Trace(err)
}Also applies to: 137-139
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
|
@Leavrth: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
cmd/multi-cluster-consistency-checker/recorder/types_test.go (3)
248-353: Add a multi-cluster/multi-table marshal test to lock deterministic output behavior.Current
MarshalReportassertions are strong, but they mostly cover one emitted cluster/table. Add a case with multiple non-empty clusters and multiple tables to ensure output order remains deterministic and audit-friendly.As per coding guidelines, "
**/*_test.go: Use unit test files named*_test.goin Go; favor deterministic tests and usetestify/require".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go` around lines 248 - 353, Add a new subtest inside TestReport_MarshalReport that builds a NewReport and adds multiple non-empty ClusterReports (via NewClusterReport) each containing multiple tables and a mix of items (use AddDataLossItem, AddDataRedundantItem, AddDataInconsistentItem, AddLWWViolationItem across clusters/tables), call MarshalReport and assert the full string equals the expected multi-cluster/multi-table output to lock deterministic ordering; ensure the test uses testify/require for equality and parallel t.Parallel() like other subtests and references MarshalReport, NewReport and NewClusterReport so the failure pinpoints ordering issues.
377-385: Strengthen the “multiple entries” assertion beyond length-only checks.The test currently verifies only
Len==2. Consider asserting expected key/value membership (Schema,Table,Version, paths) usingrequire.ElementsMatchto catch mapping regressions.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go` around lines 377 - 385, Update the "multiple entries" subtest to assert actual entries instead of only checking length: after calling NewSchemaTableVersionKeyFromVersionKeyMap(m) verify that the returned slice contains the expected SchemaTableVersionKey objects (matching Schema, Table, Version and any path fields) for inputs {Schema:"db1",Table:"t1",Version:1} and {Schema:"db2",Table:"t2",Version:2} using require.ElementsMatch (or equivalent) so the test fails on mapping regressions; locate the test in the t.Run("multiple entries") block and replace the single require.Len assertion with an ElementsMatch against the expected slice of types.SchemaTableKey/VersionKey-derived results produced by NewSchemaTableVersionKeyFromVersionKeyMap.
24-24: Rename test functions to camelCase (remove underscores).Several test function names use underscores (for example
TestDataLossItem_String), which violates the Go naming rule used in this repo. Please rename them to camelCase forms likeTestDataLossItemString.As per coding guidelines, "Use camelCase for function names and do not include underscores (e.g.,
getPartitionNum, notget_partition_num) in Go".Also applies to: 36-36, 92-92, 104-104, 248-248, 355-355, 388-388, 467-467
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go` at line 24, Several test functions use underscores in their names (e.g., TestDataLossItem_String) which violates the repo's camelCase rule; rename each test to camelCase (for example change TestDataLossItem_String to TestDataLossItemString) and similarly convert the other underscore-separated test names referenced in the file to camelCase; update any references (calls, table-driven test names, or go test filters) to use the new names and run `go test` to verify no symbol breakages.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go`:
- Around line 196-197: The test builds lwwViolationItems
(map[lwwViolationKey]struct{}) from expected violations but never removes
entries when iterating actual reports, so the final assertion at the end of Test
(around the reports loop) will always fail; modify the loop that processes
reports (the for _, report := range reports block) to detect matching
lwwViolationKey values and delete them from lwwViolationItems (use
delete(lwwViolationItems, key)) when a report reconciles that key, and/or adjust
the final assertion to only require the map to be empty if
reconciliation/removal occurred; ensure you reference lwwViolationItems,
lwwViolationKey and the reports loop when making the change.
- Around line 41-44: Wrap the raw errors returned by library calls with
errors.Trace to preserve stack traces: replace the direct return of err from
os.Open(path) with errors.Trace(err), and change the fmt.Errorf wrapping around
scanner.Err() to return errors.Trace(err) instead; ensure both replacements use
the errors.Trace symbol so the function that opens and scans the file returns
traced errors consistent with the repository convention (matching the
os.OpenFile pattern used elsewhere).
- Line 284: Remove the unconditional failure call require.Fail(t, "success") at
the end of the validation logic in validation_test.go; either delete that line
or replace it with a non-failing confirmation (e.g., t.Log("validation
succeeded") or a no-op assertion) so the test does not always fail after passing
all checks and the rest of the validation functions/assertions can report true
results.
- Around line 60-63: Replace the current json.Unmarshal usage that reads each
JSONL line into the local variable `record` with a json.Decoder configured via
`UseNumber()` so numeric values are preserved as `json.Number`; this ensures
`normalizePKValue` can see the `json.Number` branch and prevents precision loss
for large integer PKs used by `pkMapToKey`. Concretely, where `record` is
populated (the scanner loop that currently calls `json.Unmarshal([]byte(line),
&record)`), create a `json.NewDecoder` for the line, call `UseNumber()` on it,
then `Decode(&record)`; apply the same decoder+UseNumber() pattern to any other
places in the validation flow that decode JSON into types consumed by
`normalizePKValue` and `pkMapToKey`.
---
Nitpick comments:
In `@cmd/multi-cluster-consistency-checker/recorder/types_test.go`:
- Around line 248-353: Add a new subtest inside TestReport_MarshalReport that
builds a NewReport and adds multiple non-empty ClusterReports (via
NewClusterReport) each containing multiple tables and a mix of items (use
AddDataLossItem, AddDataRedundantItem, AddDataInconsistentItem,
AddLWWViolationItem across clusters/tables), call MarshalReport and assert the
full string equals the expected multi-cluster/multi-table output to lock
deterministic ordering; ensure the test uses testify/require for equality and
parallel t.Parallel() like other subtests and references MarshalReport,
NewReport and NewClusterReport so the failure pinpoints ordering issues.
- Around line 377-385: Update the "multiple entries" subtest to assert actual
entries instead of only checking length: after calling
NewSchemaTableVersionKeyFromVersionKeyMap(m) verify that the returned slice
contains the expected SchemaTableVersionKey objects (matching Schema, Table,
Version and any path fields) for inputs {Schema:"db1",Table:"t1",Version:1} and
{Schema:"db2",Table:"t2",Version:2} using require.ElementsMatch (or equivalent)
so the test fails on mapping regressions; locate the test in the t.Run("multiple
entries") block and replace the single require.Len assertion with an
ElementsMatch against the expected slice of
types.SchemaTableKey/VersionKey-derived results produced by
NewSchemaTableVersionKeyFromVersionKeyMap.
- Line 24: Several test functions use underscores in their names (e.g.,
TestDataLossItem_String) which violates the repo's camelCase rule; rename each
test to camelCase (for example change TestDataLossItem_String to
TestDataLossItemString) and similarly convert the other underscore-separated
test names referenced in the file to camelCase; update any references (calls,
table-driven test names, or go test filters) to use the new names and run `go
test` to verify no symbol breakages.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
cmd/multi-cluster-consistency-checker/integration/validation_test.gocmd/multi-cluster-consistency-checker/recorder/types_test.go
| file, err := os.Open(path) | ||
| if err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's view the file and see the actual code at lines 41-44 and 68-71
cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '35,75p'Repository: pingcap/ticdc
Length of output: 1424
🏁 Script executed:
# Check error wrapping patterns in the codebase
rg -n 'errors\.(Trace|WrapError)' cmd/multi-cluster-consistency-checker --type=go -A 1 -B 1 | head -50Repository: pingcap/ticdc
Length of output: 3300
🏁 Script executed:
# Check for os.Open usage patterns in the repository
rg -n 'os\.Open' cmd/multi-cluster-consistency-checker --type=go -A 2 -B 1 | head -40Repository: pingcap/ticdc
Length of output: 1326
Wrap library-call errors using errors.Trace() for consistent stack trace handling.
Line 41-44: os.Open(path) error is not wrapped at all. Wrap it with errors.Trace(err).
Line 68-71: scanner.Err() is wrapped with fmt.Errorf instead of errors.Trace(err). Use errors.Trace() for consistency with the repository's error-wrapping convention (see cmd/multi-cluster-consistency-checker/recorder/recorder.go:205 for the same os.OpenFile pattern).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go` around
lines 41 - 44, Wrap the raw errors returned by library calls with errors.Trace
to preserve stack traces: replace the direct return of err from os.Open(path)
with errors.Trace(err), and change the fmt.Errorf wrapping around scanner.Err()
to return errors.Trace(err) instead; ensure both replacements use the
errors.Trace symbol so the function that opens and scans the file returns traced
errors consistent with the repository convention (matching the os.OpenFile
pattern used elsewhere).
| var record checker.Record | ||
| err := json.Unmarshal([]byte(line), &record) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to unmarshal jsonl record in %s line %d: %w", path, lineNo, err) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n -A5 'type\s+Record\s+struct' cmd/multi-cluster-consistency-checkerRepository: pingcap/ticdc
Length of output: 1076
🏁 Script executed:
rg -n -A10 'type\s+Row\s+struct' cmd/multi-cluster-consistency-checkerRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
rg -n -B2 -A5 'PrimaryKeys' cmd/multi-cluster-consistency-checkerRepository: pingcap/ticdc
Length of output: 6749
🏁 Script executed:
cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | head -70Repository: pingcap/ticdc
Length of output: 2425
🏁 Script executed:
cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '130,190p'Repository: pingcap/ticdc
Length of output: 2062
🏁 Script executed:
rg -n 'UseNumber|json\.Decoder' cmd/multi-cluster-consistency-checkerRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
rg -n 'PrimaryKeys.*:=' cmd/multi-cluster-consistency-checker/checker --type=go -B3Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
rg -n 'json\.Decoder|UseNumber' cmd/multi-cluster-consistency-checker --type=goRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
rg -n 'PrimaryKeys' cmd/multi-cluster-consistency-checker --type=go -A2 -B2Repository: pingcap/ticdc
Length of output: 4655
🏁 Script executed:
# Check if there are any tests that show what numeric PKs are being used
grep -r "PrimaryKeys\|primaryKeys" cmd/multi-cluster-consistency-checker/integration/validation_test.goRepository: pingcap/ticdc
Length of output: 704
🏁 Script executed:
# Check if json.Decoder with UseNumber is used anywhere in the codebase
rg -n 'NewDecoder' cmd/multi-cluster-consistency-checker --type=go -A5Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Check the exact method of reading records in checker/decoder.go
cat -n cmd/multi-cluster-consistency-checker/decoder/decoder.go | sed -n '1,100p'Repository: pingcap/ticdc
Length of output: 3477
🏁 Script executed:
# Look for any test data or fixtures that might show what numeric PKs look like
fd -e json cmd/multi-cluster-consistency-checker | head -20Repository: pingcap/ticdc
Length of output: 510
Use json.Decoder with UseNumber() to preserve numeric precision for large integer primary keys.
JSON unmarshaling without UseNumber() coerces numeric values to float64, causing precision loss for PKs ≥ 2^53. The json.Number case in normalizePKValue (lines 158–159) is unreachable with standard Unmarshal. Replace the scanner pattern in lines 60–63 with:
decoder := json.NewDecoder(bytes.NewReader([]byte(line)))
decoder.UseNumber()
err := decoder.Decode(&record)This applies to all JSON unmarshaling in the validation flow, including lines 135–186 where pkMapToKey depends on correctly encoded numeric PKs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go` around
lines 60 - 63, Replace the current json.Unmarshal usage that reads each JSONL
line into the local variable `record` with a json.Decoder configured via
`UseNumber()` so numeric values are preserved as `json.Number`; this ensures
`normalizePKValue` can see the `json.Number` branch and prevents precision loss
for large integer PKs used by `pkMapToKey`. Concretely, where `record` is
populated (the scanner loop that currently calls `json.Unmarshal([]byte(line),
&record)`), create a `json.NewDecoder` for the line, call `UseNumber()` on it,
then `Decode(&record)`; apply the same decoder+UseNumber() pattern to any other
places in the validation flow that decode JSON into types consumed by
`normalizePKValue` and `pkMapToKey`.
| lwwViolationItems := make(map[lwwViolationKey]struct{}) | ||
| for _, report := range reports { |
There was a problem hiding this comment.
LWW violations are collected but never reconciled before assertion.
lwwViolationItems is populated from reports, but no CDC/checker path removes matched keys. Line 283 then requires it to be empty, which will fail as soon as reports contain real LWW items.
Also applies to: 221-227, 283-283
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go` around
lines 196 - 197, The test builds lwwViolationItems
(map[lwwViolationKey]struct{}) from expected violations but never removes
entries when iterating actual reports, so the final assertion at the end of Test
(around the reports loop) will always fail; modify the loop that processes
reports (the for _, report := range reports block) to detect matching
lwwViolationKey values and delete them from lwwViolationItems (use
delete(lwwViolationItems, key)) when a report reconciles that key, and/or adjust
the final assertion to only require the map to be empty if
reconciliation/removal occurred; ensure you reference lwwViolationItems,
lwwViolationKey and the reports loop when making the change.
| require.Empty(t, dataInconsistentItems) | ||
| require.Empty(t, dataRedundantItems) | ||
| require.Empty(t, lwwViolationItems) | ||
| require.Fail(t, "success") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '270,300p'Repository: pingcap/ticdc
Length of output: 1086
🏁 Script executed:
cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '240,285p'Repository: pingcap/ticdc
Length of output: 2071
🏁 Script executed:
cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '200,245p'Repository: pingcap/ticdc
Length of output: 2023
🏁 Script executed:
cat -n cmd/multi-cluster-consistency-checker/integration/validation_test.go | sed -n '150,205p'Repository: pingcap/ticdc
Length of output: 2215
Remove the unconditional failure at the end of the validation logic.
Line 284 contains require.Fail(t, "success") which causes the test to fail unconditionally after all validation checks pass. This should be removed or replaced with logging.
Suggested fix
- require.Fail(t, "success")
+ t.Log("validation succeeded")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| require.Fail(t, "success") | |
| t.Log("validation succeeded") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/multi-cluster-consistency-checker/integration/validation_test.go` at line
284, Remove the unconditional failure call require.Fail(t, "success") at the end
of the validation logic in validation_test.go; either delete that line or
replace it with a non-failing confirmation (e.g., t.Log("validation succeeded")
or a no-op assertion) so the test does not always fail after passing all checks
and the rest of the validation functions/assertions can report true results.
What problem does this PR solve?
Issue Number: close #4244
What is changed and how it works?
This PR introduces a consistency validation tool for multi-cluster active-active TiCDC deployments. It connects to each cluster’s PD(etcd) and S3 sink, verifies critical changefeed settings (for example canal-json, date-separator=day, and expected file index width), then continuously watches checkpoints and sink files to advance a unified time window across clusters. Within each window, the checker compares local writes and replicated records by primary key and timestamp to detect anomalies such as missing data, redundant data, out-of-order updates, and LWW violations. The recorder persists checkpoints and reports atomically, so the process is resumable after failures and suitable for long-running, auditable consistency monitoring.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Durability & Config
Tests