Conversation
Signed-off-by: dongmen <414110582@qq.com>
|
Skipping CI for Draft Pull Request. |
Summary of ChangesHello @asddongmen, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant enhancement to the system's memory control by upgrading the congestion control messaging to include detailed memory usage and maximum memory limits. A new dynamic scan interval adjustment mechanism has been implemented, allowing changefeeds to adapt their scanning rate based on real-time memory consumption. This adaptive approach is designed to improve system stability and performance by proactively managing memory pressure. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request enhances memory control by introducing a version 2 for congestion control messages, which now include used and max memory details. This new information is leveraged to dynamically adjust the event scanning interval, making memory management more adaptive. The implementation is solid and includes relevant tests for the new functionality. My feedback primarily focuses on opportunities to refactor duplicated code in the serialization logic to improve long-term maintainability.
| if existing, exists := changefeedUsedMemory[cfID]; exists { | ||
| changefeedUsedMemory[cfID] = min(existing, uint64(quota.MemoryUsage())) | ||
| } else { | ||
| changefeedUsedMemory[cfID] = uint64(quota.MemoryUsage()) | ||
| } | ||
| if existing, exists := changefeedMaxMemory[cfID]; exists { | ||
| changefeedMaxMemory[cfID] = min(existing, uint64(quota.MaxMemory())) | ||
| } else { | ||
| changefeedMaxMemory[cfID] = uint64(quota.MaxMemory()) | ||
| } |
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
…tcher-scan-priority-v2
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
|
/test all |
…tcher-scan-priority-v2
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
/test all |
Signed-off-by: dongmen <414110582@qq.com>
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/eventservice/event_broker.go (1)
1218-1239:⚠️ Potential issue | 🟡 Minor
changedChangefeedsis populated but never consumed — dead code.The map is built at line 1218 and entries are inserted at line 1239, but it is never read after the loop. If this was meant to trigger a per-changefeed action (e.g., eagerly refreshing
minSentResolvedTs), the follow-up logic is missing. Otherwise, remove it to avoid confusion.
🤖 Fix all issues with AI agents
In `@downstreamadapter/eventcollector/event_collector.go`:
- Around line 650-659: The current update uses min(...) for changefeedUsedMemory
which can underreport memory pressure; changefeedUsedMemory[cfID] should use the
larger of the two sources so replace the min(...) logic with max(...) when
computing used memory (keep the existing branching that checks if existing,
exists := changefeedUsedMemory[cfID] and use uint64(quota.MemoryUsage()) and
cfID identifiers). Leave the changefeedMaxMemory logic as-is (or continue using
min for capacity) and ensure you import/define max if not already available.
In `@pkg/eventservice/dispatcher_stat.go`:
- Around line 430-438: Remove the unused field lastRatio atomic.Float64 from the
struct where it's declared (the field named lastRatio); update the struct
definition in dispatcher_stat.go to delete that field declaration and run
tests/linters to ensure no references remain—no other code changes required
since lastRatio is not read or written anywhere.
In `@pkg/eventservice/scan_window.go`:
- Around line 192-206: Update the top-of-function doc comment for
adjustScanInterval (and the two inline comments near the trend checks) so the
threshold percentages match the actual constants used:
memoryUsageCriticalThreshold = 90% (critical), memoryUsageHighThreshold = 70%
(high), increasingTrendStartRatio = 30% (trend detection),
memoryUsageLowThreshold = 20% (low), and memoryUsageVeryLowThreshold = 10% (very
low); keep the described actions (1/4 aggressive reduction, 1/2 reduction, 10%
trend damping, +25% low increase, +50% very-low increase) but replace the
mismatched percent values in the doc and inline comments to reference these
constant names/percentages for clarity.
In `@pkg/sink/mysql/mysql_writer_ddl.go`:
- Line 38: Replace the hardcoded time.Sleep(time.Second * 5) with the
configurable dry-run delay: locate the time.Sleep call in mysql_writer_ddl.go
(the block using DryRun behavior) and use the DryRunDelay configuration value
(or a named constant if the config field was intentionally removed) to compute
the sleep duration; e.g., derive a time.Duration from the DryRunDelay field on
the writer/config struct and pass that into time.Sleep so tests can control the
delay (or define and use a descriptive constant like DefaultDryRunDelay if
config is absent).
In `@tools/workload/ddl_config_test.go`:
- Around line 1-7: This file is missing the standard Apache 2.0 copyright header
which causes CI failure; insert the same Apache 2.0 header used by the other new
files in this PR at the very top of tools/workload/ddl_config_test.go (above the
package main declaration) so the file's header matches the project's copyright
template.
In `@tools/workload/ddl_executor.go`:
- Around line 53-58: The initial connection attempt using getConnWithTimeout in
the worker should not log, sleep, and return (which causes wg.Done and
permanently loses the worker); instead implement a retry loop similar to the
later reconnection logic: repeatedly call getConnWithTimeout(db.DB,
10*time.Second) with exponential/backoff or fixed delay (e.g., 5s) until
success, logging errors via plog.Info/zap.Error each attempt, and only proceed
once conn is established so the worker thread (in ddl_executor.go) doesn't exit
prematurely; update the block around getConnWithTimeout, plog.Info, and the
surrounding startup sequence to retry rather than return.
- Around line 67-82: Currently a connection error path closes conn and then
continues, which lets the outer loop consume the next task from r.taskCh and
lose work; change the logic so that after conn.Close() (inside the error branch
where r.app.isConnectionError(err) is true) you retry reconnecting (using
getConnWithTimeout and the same backoff/delay strategy) in a loop without
returning to the outer receive (do not read from r.taskCh while reconnecting),
assign conn only on successful reconnect, and then re-run r.executeTask(conn,
task) for the same task (i.e., do not drop the current task); reference
r.taskCh, r.executeTask, conn, getConnWithTimeout and r.app.isConnectionError
when locating and updating the code.
🧹 Nitpick comments (9)
utils/dynstream/memory_control_test.go (1)
275-292: Helper mirrors production sort order implicitly through argument ordering.
calcExpectedReleasedPathsiterates paths in the caller-supplied order, whilereleaseMemorysorts bylastHandleEventTsdescending before iterating. The test calls pass paths pre-sorted (300, 200, 100), so results happen to match. If a future edit changes the argument order or the path timestamps, this helper will silently produce wrong expectations.Consider sorting inside the helper the same way
releaseMemorydoes:♻️ Suggested improvement
calcExpectedReleasedPaths := func( as *areaMemStat[int, string, *mockEvent, any, *mockHandler], paths ...*pathInfo[int, string, *mockEvent, any, *mockHandler], ) []string { + sort.Slice(paths, func(i, j int) bool { + return paths[i].lastHandleEventTs.Load() > paths[j].lastHandleEventTs.Load() + }) sizeToRelease := int64(float64(as.totalPendingSize.Load()) * defaultReleaseMemoryRatio)(You'd also need to add
"sort"to the imports.)tools/workload/ddl_config.go (1)
68-86: Minor: mode defaulting uses pre-trimmed table list.
normalize()checkslen(c.Tables) > 0before filtering empty/whitespace entries. If all entries are whitespace-only,Modedefaults toddlModeFixed, then the subsequentvalidate()correctly rejects it. Not a bug, but reordering the trim-and-filter block before the mode-defaulting logic would make the intent clearer.♻️ Suggested reorder
func (c *DDLConfig) normalize() { c.Mode = strings.ToLower(strings.TrimSpace(c.Mode)) - if c.Mode == "" { - if len(c.Tables) > 0 { - c.Mode = ddlModeFixed - } else { - c.Mode = ddlModeRandom - } - } // Trim and drop empty entries. tables := make([]string, 0, len(c.Tables)) for _, t := range c.Tables { t = strings.TrimSpace(t) if t != "" { tables = append(tables, t) } } c.Tables = tables + + if c.Mode == "" { + if len(c.Tables) > 0 { + c.Mode = ddlModeFixed + } else { + c.Mode = ddlModeRandom + } + } }tools/workload/ddl_config_test.go (1)
9-45: Usetestify/requireinstead of manualif err != nil { t.Fatalf(...) }patterns.The coding guidelines mandate
testify/requirefor test assertions. This applies to all subtests in bothTestParseTableNameandTestLoadDDLConfig. For example:// Before table, err := ParseTableName("test.sbtest1", "ignored") if err != nil { t.Fatalf("unexpected error: %v", err) } if table.Schema != "test" || table.Name != "sbtest1" { t.Fatalf("unexpected table: %+v", table) } // After table, err := ParseTableName("test.sbtest1", "ignored") require.NoError(t, err) require.Equal(t, "test", table.Schema) require.Equal(t, "sbtest1", table.Name)Similarly, error-expecting tests can use
require.Error(t, err). As per coding guidelines,**/*_test.go: favor deterministic tests and usetestify/require.tools/workload/ddl_runner.go (2)
115-131: Scheduler goroutines have no shutdown path and can block indefinitely ontaskCh.
startTypeSchedulerspawns a goroutine with nocontext.Contextor stop channel. If workers are slow or dead,r.taskCh <- DDLTask{...}blocks forever with no way to cancel. Consider accepting acontext.Contextand using aselectwithctx.Done()for both the ticker wait and the channel send.♻️ Sketch with context-based cancellation
-func (r *DDLRunner) startTypeScheduler(ddlType DDLType, perMinute int) { +func (r *DDLRunner) startTypeScheduler(ctx context.Context, ddlType DDLType, perMinute int) { if perMinute <= 0 { return } go func() { ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { for i := 0; i < perMinute; i++ { table, ok := r.selector.Next() if !ok { r.app.Stats.DDLSkipped.Add(1) continue } - r.taskCh <- DDLTask{Type: ddlType, Table: table} + select { + case r.taskCh <- DDLTask{Type: ddlType, Table: table}: + case <-ctx.Done(): + return + } } - <-ticker.C + select { + case <-ticker.C: + case <-ctx.Done(): + return + } } }() }
133-145: Refresh error logged at Info level; consider Warn.Line 141 logs a table-refresh failure at
Infolevel. Since this is an error condition (database query failure),Warnwould be more appropriate for operational visibility.♻️ Proposed fix
- plog.Info("refresh random tables failed", zap.Error(err)) + plog.Warn("refresh random tables failed", zap.Error(err))pkg/eventservice/scan_window.go (2)
151-160: Sliding window reslice may retain underlying array, causing a slow memory leak under sustained load.
w.samples = w.samples[idx:]keeps a reference to the original backing array. Over time, if samples are continuously added and pruned, the old portion of the slice is never GC'd. For a 30s window this is unlikely to be a practical issue, but worth noting.♻️ Optional fix to copy remaining samples
func (w *memoryUsageWindow) pruneLocked(now time.Time) { cutoff := now.Add(-w.window) idx := 0 for idx < len(w.samples) && w.samples[idx].ts.Before(cutoff) { idx++ } if idx > 0 { - w.samples = w.samples[idx:] + remaining := make([]memoryUsageSample, len(w.samples)-idx) + copy(remaining, w.samples[idx:]) + w.samples = remaining } }
313-354: Stale dispatcher log at Info level may be noisy.
refreshMinSentResolvedTsis called every second (fromevent_broker.goline 262). If a dispatcher stays stale for an extended period, thislog.Infofires once per second per stale dispatcher. Consider downgrading tolog.Debugor rate-limiting.pkg/eventservice/event_broker.go (1)
1185-1190: Resetting scan interval for the entire changefeed on a single dispatcher reset.When one dispatcher resets (epoch > 1), the scan interval is reset to
defaultScanIntervalfor the whole changefeed, potentially disrupting all other dispatchers' scan windows. This is safe (the interval will adapt back quickly), but worth documenting the intent — is it intentional to "fast-brake" the entire changefeed when any dispatcher resets?pkg/common/event/congestion_control.go (1)
112-134: Size calculation allocatesDispatcherIDobjects unnecessarily in a loop.Both
sizeV1andsizeV2allocate a newcommon.DispatcherIDper iteration just to callGetSize(), which returns a constant. This can be simplified.♻️ Suggested simplification
func (m AvailableMemory) sizeV1() int { size := m.Gid.GetSize() + 8 size += 4 // dispatcher count - for range m.DispatcherCount { - dispatcherID := &common.DispatcherID{} - size += dispatcherID.GetSize() + 8 - } + dispatcherIDSize := (&common.DispatcherID{}).GetSize() + size += int(m.DispatcherCount) * (dispatcherIDSize + 8) return size } func (m AvailableMemory) sizeV2() int { size := m.Gid.GetSize() + 8 + 8 + 8 size += 4 // dispatcher count - for range m.DispatcherCount { - dispatcherID := &common.DispatcherID{} - size += dispatcherID.GetSize() + 8 - } + dispatcherIDSize := (&common.DispatcherID{}).GetSize() + size += int(m.DispatcherCount) * (dispatcherIDSize + 8) return size }
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/eventservice/event_broker.go (1)
1219-1241:⚠️ Potential issue | 🟠 MajorRemove unused
changedChangefeedsmap or complete its implementation.The
changedChangefeedsmap is created and populated during heartbeat processing (lines 1219, 1240) but is never read before the function returns. This appears to be dead code or an incomplete implementation—the map was likely intended to trigger scan window parameter updates for affected changefeeds but the follow-up logic is missing.
🤖 Fix all issues with AI agents
In `@pkg/eventservice/event_broker.go`:
- Around line 1259-1271: The counters usageCount and zeroMaxCount are
incremented inside the loop over availables but never read; remove the dead code
by deleting the declarations "usageCount := 0" and "zeroMaxCount := 0" and
removing the increments "usageCount++" and "zeroMaxCount++" from the loop that
populates holder and usage (symbols: availables, holder, usage, usageInfo); if
the counts were intended for observability instead, instead of removing them,
wire them into a log/metric export after the loop (e.g., processLogger.Infof or
metrics.Inc/Set) so the values are actually used.
🧹 Nitpick comments (3)
pkg/eventservice/event_broker.go (3)
261-262: Nit: Simplifytime.Second * 1totime.Second.The
* 1multiplier is redundant.♻️ Suggested fix
- ticker := time.NewTicker(time.Second * 1) + ticker := time.NewTicker(time.Second)
1010-1015: Consider extracting the changefeed cleanup into a helper to reduce duplication.The pattern of deleting the changefeed map entry + three metric gauge labels is repeated in four locations (
addDispatcher×2,removeDispatcher, and potentially more). A small helper likecleanupChangefeedMetrics(changefeedID)would centralize this.♻️ Example helper
func (c *eventBroker) cleanupChangefeedStatus(changefeedID common.ChangeFeedID) { c.changefeedMap.Delete(changefeedID) metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeedID.String()) metrics.EventServiceScanWindowBaseTsGaugeVec.DeleteLabelValues(changefeedID.String()) metrics.EventServiceScanWindowIntervalGaugeVec.DeleteLabelValues(changefeedID.String()) }Then replace each occurrence with
c.cleanupChangefeedStatus(changefeedID).
1205-1214: UseLoadOrStoreto prevent race condition ingetOrSetChangefeedStatus.The current
Load+Storepattern is not atomic. Two concurrent callers can both see!okfromLoadand each create a separatechangefeedStatus, with one silently overwriting the other. The new metric initialization (lines 1211-1212) could emit duplicate registrations in the race window. A similar pattern indownstreamadapter/eventcollector/event_collector.gocorrectly usesLoadOrStore.♻️ Proposed fix
func (c *eventBroker) getOrSetChangefeedStatus(changefeedID common.ChangeFeedID) *changefeedStatus { - stat, ok := c.changefeedMap.Load(changefeedID) - if !ok { - stat = newChangefeedStatus(changefeedID) - log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID)) - c.changefeedMap.Store(changefeedID, stat) + newStat := newChangefeedStatus(changefeedID) + stat, loaded := c.changefeedMap.LoadOrStore(changefeedID, newStat) + if !loaded { + log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID)) metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(changefeedID.String()).Set(0) metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(changefeedID.String()).Set(defaultScanInterval.Seconds()) } return stat.(*changefeedStatus) }
Signed-off-by: dongmen <414110582@qq.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/eventservice/event_broker.go (1)
1219-1241:⚠️ Potential issue | 🟠 Major
changedChangefeedsis written but never consumed — dead code.The map is allocated, populated with
dispatcher.changefeedStatentries at line 1240, and then completely ignored. No code downstream reads it after the loop. Remove both the declaration and the write, analogous to theusageCount/zeroMaxCountcleanup in d4ec73c.🧹 Proposed cleanup
responseMap := make(map[string]*event.DispatcherHeartbeatResponse) - changedChangefeeds := make(map[*changefeedStatus]struct{}) now := time.Now().Unix() for _, dp := range heartbeat.heartbeat.DispatcherProgresses { ... dispatcher := dispatcherPtr.Load() if dispatcher.checkpointTs.Load() < dp.CheckpointTs { dispatcher.checkpointTs.Store(dp.CheckpointTs) } dispatcher.lastReceivedHeartbeatTime.Store(now) - changedChangefeeds[dispatcher.changefeedStat] = struct{}{} }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/eventservice/event_broker.go` around lines 1219 - 1241, Remove the dead map changedChangefeeds and its writes: delete the declaration "changedChangefeeds := make(map[*changefeedStatus]struct{})" and the line that assigns "changedChangefeeds[dispatcher.changefeedStat] = struct{}{}" inside the loop (in the section iterating heartbeat.heartbeat.DispatcherProgresses). Ensure no other code references changedChangefeeds or relies on it; if references exist elsewhere, replace them with the intended logic, otherwise simply remove both lines to match the cleanup style used for usageCount/zeroMaxCount.
♻️ Duplicate comments (1)
pkg/eventservice/scan_window.go (1)
202-206:⚠️ Potential issue | 🟡 MinorThreshold percentages in doc/inline comments still mismatch the actual constants.
The partial fix in d4ec73c changed
<40%→<30%in the doc comment (line 205) butmemoryUsageLowThreshold = 0.2(20%). The inline comments (lines 262, 266) were not fixed. All three should reflect the correct constants:
Location Current text Correct text Line 205 (doc) Low (<30% max AND avg)Low (<20% max AND avg)Line 262 (inline) Very low pressure (<20%)Very low pressure (<10%)Line 266 (inline) Low pressure (<40%)Low pressure (<20%)Also applies to: 262-266
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/eventservice/scan_window.go` around lines 202 - 206, The inline/doc comments in scan_window.go don't match the actual threshold constants: update the doc comment and inline comments to reflect memoryUsageLowThreshold = 0.2 and memoryUsageVeryLowThreshold = 0.1; specifically change the doc string "Low (<30% max AND avg)" to "Low (<20% max AND avg)", change the inline comment "Very low pressure (<20%)" to "Very low pressure (<10%)", and change "Low pressure (<40%)" to "Low pressure (<20%)" so they match the constants referenced by the memory pressure logic (look for symbols memoryUsageLowThreshold and memoryUsageVeryLowThreshold in the file).
🧹 Nitpick comments (1)
pkg/eventservice/scan_window.go (1)
151-160:pruneLockedretains the original backing array after pruning.
w.samples = w.samples[idx:]keeps the full backing array alive in memory indefinitely. Over time, with frequent sampling, the GC cannot reclaim discarded entries. Copy to a fresh slice when pruning a significant portion.♻️ Proposed fix
func (w *memoryUsageWindow) pruneLocked(now time.Time) { cutoff := now.Add(-w.window) idx := 0 for idx < len(w.samples) && w.samples[idx].ts.Before(cutoff) { idx++ } if idx > 0 { - w.samples = w.samples[idx:] + w.samples = append([]memoryUsageSample(nil), w.samples[idx:]...) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/eventservice/scan_window.go` around lines 151 - 160, pruneLocked in memoryUsageWindow currently sets w.samples = w.samples[idx:] which retains the original backing array and prevents GC; update pruneLocked to allocate a new slice and copy the kept entries into it when idx > 0 (e.g., create a new slice of length len(w.samples)-idx, copy from w.samples[idx:] into it, then assign that new slice to w.samples) so the old backing array can be reclaimed; keep the logic inside pruneLocked and reference w.samples and memoryUsageWindow.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/eventservice/scan_window.go`:
- Line 334: The Info log inside refreshMinSentResolvedTs is emitted every second
and should be downgraded to Debug to avoid log spam; update the call from
log.Info(...) to log.Debug(...) and fix the message grammar by changing "it's"
to "its" while keeping the structured fields (zap.Stringer("changefeedID",
c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) intact so the
dispatcher stale message remains but at Debug level.
---
Outside diff comments:
In `@pkg/eventservice/event_broker.go`:
- Around line 1219-1241: Remove the dead map changedChangefeeds and its writes:
delete the declaration "changedChangefeeds :=
make(map[*changefeedStatus]struct{})" and the line that assigns
"changedChangefeeds[dispatcher.changefeedStat] = struct{}{}" inside the loop (in
the section iterating heartbeat.heartbeat.DispatcherProgresses). Ensure no other
code references changedChangefeeds or relies on it; if references exist
elsewhere, replace them with the intended logic, otherwise simply remove both
lines to match the cleanup style used for usageCount/zeroMaxCount.
---
Duplicate comments:
In `@pkg/eventservice/scan_window.go`:
- Around line 202-206: The inline/doc comments in scan_window.go don't match the
actual threshold constants: update the doc comment and inline comments to
reflect memoryUsageLowThreshold = 0.2 and memoryUsageVeryLowThreshold = 0.1;
specifically change the doc string "Low (<30% max AND avg)" to "Low (<20% max
AND avg)", change the inline comment "Very low pressure (<20%)" to "Very low
pressure (<10%)", and change "Low pressure (<40%)" to "Low pressure (<20%)" so
they match the constants referenced by the memory pressure logic (look for
symbols memoryUsageLowThreshold and memoryUsageVeryLowThreshold in the file).
---
Nitpick comments:
In `@pkg/eventservice/scan_window.go`:
- Around line 151-160: pruneLocked in memoryUsageWindow currently sets w.samples
= w.samples[idx:] which retains the original backing array and prevents GC;
update pruneLocked to allocate a new slice and copy the kept entries into it
when idx > 0 (e.g., create a new slice of length len(w.samples)-idx, copy from
w.samples[idx:] into it, then assign that new slice to w.samples) so the old
backing array can be reclaimed; keep the logic inside pruneLocked and reference
w.samples and memoryUsageWindow.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pkg/eventservice/dispatcher_stat.gopkg/eventservice/event_broker.gopkg/eventservice/scan_window.gopkg/sink/mysql/mysql_writer_ddl.go
💤 Files with no reviewable changes (1)
- pkg/sink/mysql/mysql_writer_ddl.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/eventservice/dispatcher_stat.go
| lastHeartbeatTime := dispatcher.lastReceivedHeartbeatTime.Load() | ||
| if lastHeartbeatTime > 0 && | ||
| now.Sub(time.Unix(lastHeartbeatTime, 0)) > scanWindowStaleDispatcherHeartbeatThreshold { | ||
| log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) |
There was a problem hiding this comment.
log.Info in a 1-second hot path will spam logs for stale dispatchers; also has a grammatical error.
refreshMinSentResolvedTs is called every second. Any stale dispatcher will produce an Info log on every tick, which floods production logs. Downgrade to log.Debug. Fix "it's" → "its".
📝 Proposed fix
- log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))
+ log.Debug("dispatcher is stale, skip its sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) | |
| log.Debug("dispatcher is stale, skip its sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/eventservice/scan_window.go` at line 334, The Info log inside
refreshMinSentResolvedTs is emitted every second and should be downgraded to
Debug to avoid log spam; update the call from log.Info(...) to log.Debug(...)
and fix the message grammar by changing "it's" to "its" while keeping the
structured fields (zap.Stringer("changefeedID", c.changefeedID),
zap.Stringer("dispatcherID", dispatcher.id)) intact so the dispatcher stale
message remains but at Debug level.
…tcher-scan-priority-v2
Signed-off-by: dongmen <414110582@qq.com>
There was a problem hiding this comment.
♻️ Duplicate comments (1)
downstreamadapter/eventcollector/event_collector.go (1)
645-647:⚠️ Potential issue | 🟠 MajorTaking
minof used memory between main and redo still underreports pressure.Line 646 merges
changefeedUsedMemorywithmin, which picks the lower usage of the two streams. For a conservative (safe) estimate of memory pressure, you want the higher usage.changefeedTotalMemory(available) andchangefeedMaxMemory(capacity) are correct withmin, but used memory should usemax.Consider either:
- A separate
updateMaxUint64MapValuehelper for used memory, or- Making the helper accept a merge function.
Proposed fix (option 1)
updateMinUint64MapValue(changefeedTotalMemory, cfID, uint64(quota.AvailableMemory())) - updateMinUint64MapValue(changefeedUsedMemory, cfID, uint64(quota.MemoryUsage())) + updateMaxUint64MapValue(changefeedUsedMemory, cfID, uint64(quota.MemoryUsage())) updateMinUint64MapValue(changefeedMaxMemory, cfID, uint64(quota.MaxMemory()))And add a corresponding helper:
func updateMaxUint64MapValue(m map[common.ChangeFeedID]uint64, key common.ChangeFeedID, value uint64) { if existing, exists := m[key]; exists { m[key] = max(existing, value) } else { m[key] = value } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/eventcollector/event_collector.go` around lines 645 - 647, The code incorrectly uses updateMinUint64MapValue for changefeedUsedMemory, underreporting memory pressure; replace that call with a new updateMaxUint64MapValue helper (or extend updateMinUint64MapValue to accept a merge function) so changefeedUsedMemory records the maximum of existing and new values; implement updateMaxUint64MapValue mirroring updateMinUint64MapValue but using max(existing, value) and call updateMaxUint64MapValue(changefeedUsedMemory, cfID, uint64(quota.MemoryUsage())) instead of the current min call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@downstreamadapter/eventcollector/event_collector.go`:
- Around line 645-647: The code incorrectly uses updateMinUint64MapValue for
changefeedUsedMemory, underreporting memory pressure; replace that call with a
new updateMaxUint64MapValue helper (or extend updateMinUint64MapValue to accept
a merge function) so changefeedUsedMemory records the maximum of existing and
new values; implement updateMaxUint64MapValue mirroring updateMinUint64MapValue
but using max(existing, value) and call
updateMaxUint64MapValue(changefeedUsedMemory, cfID, uint64(quota.MemoryUsage()))
instead of the current min call.
|
/test all |
Signed-off-by: dongmen <414110582@qq.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
pkg/eventservice/scan_window.go (2)
321-321:⚠️ Potential issue | 🟠 MajorDowngrade stale-dispatcher log from Info to Debug in this 1s hot path.
Line 321 can spam logs continuously for stale dispatchers; also fix
"it's"→"its"in the message text.🛠️ Suggested fix
- log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) + log.Debug("dispatcher is stale, skip its sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/eventservice/scan_window.go` at line 321, Change the log level from Info to Debug and fix the typo in the message: replace the call log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) with log.Debug("dispatcher is stale, skip its sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) so the hot 1s path doesn't spam Info and the message uses "its" correctly.
188-194:⚠️ Potential issue | 🟡 MinorUpdate threshold comments to match actual constants.
The docs/comments still state percentages that differ from the implemented thresholds (
memoryUsageLowThreshold=20%,memoryUsageVeryLowThreshold=10%), which is misleading.📝 Suggested comment fix
-// - Low (<30% max AND avg): Increase interval by 25% +// - Low (<20% max AND avg): Increase interval by 25% // - Very low (<10% max AND avg): Increase interval by 50%, may exceed normal cap @@ - // Very low pressure (<20%): increase by 50%, allowed to exceed sync point cap. + // Very low pressure (<10%): increase by 50%, allowed to exceed sync point cap. @@ - // Low pressure (<40%): increase by 25%, capped by sync point interval. + // Low pressure (<20%): increase by 25%, capped by sync point interval.Also applies to: 248-254
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/eventservice/scan_window.go` around lines 188 - 194, Update the comment block above the adjustScanInterval function to reflect the actual threshold constants used (memoryUsageLowThreshold = 20% and memoryUsageVeryLowThreshold = 10%) and ensure the listed behaviors and percentage values match the implemented logic; specifically, change "Low (<30% max AND avg)" to "Low (<20% max AND avg)" and "Very low (<10% max AND avg)" remains 10% but ensure wording matches the memoryUsageVeryLowThreshold constant, and apply the same corrected wording in the second comment block later in the same function (the block around the other threshold descriptions).
🧹 Nitpick comments (1)
pkg/eventservice/event_broker.go (1)
1219-1241: Remove unused heartbeat-side changefeed tracking map.
changedChangefeedsis populated but never consumed, so it adds per-heartbeat allocation/work without effect.♻️ Suggested cleanup
- changedChangefeeds := make(map[*changefeedStatus]struct{}) now := time.Now().Unix() @@ - changedChangefeeds[dispatcher.changefeedStat] = struct{}{}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/eventservice/event_broker.go` around lines 1219 - 1241, The map changedChangefeeds (type map[*changefeedStatus]struct{}) is never used; remove its allocation and the only population site to avoid unnecessary per-heartbeat work: delete the declaration changedChangefeeds := make(map[*changefeedStatus]struct{}) and remove the line changedChangefeeds[dispatcher.changefeedStat] = struct{}{} in the heartbeat loop that processes DispatcherProgresses (referencing dispatcher.changefeedStat and changefeedStatus to locate the code). Ensure no other code depends on changedChangefeeds; if it was meant to be used later, either implement that usage or keep and document it.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/common/event/congestion_control.go`:
- Around line 97-107: AvailableMemory.unmarshalV2 currently reads fixed-width
fields from buf without validating the buffer length, which can panic on
truncated payloads; before each buf.Next(...) call (including the Gid.Unmarshal
input, the 8-byte reads for Available and UsageRatio, the 4-byte read for
DispatcherCount, and each dispatcher entry read) verify buf.Len() >=
expectedBytes, and when insufficient either return an error or abort decoding
safely; also fix the dispatcher loop to iterate for i := 0; i <
int(m.DispatcherCount); i++ (instead of for range m.DispatcherCount) and read
each dispatcherID and its 8-byte value only after confirming the buffer has
dispatcherID.GetSize()+8 bytes available. Ensure the same guards are applied to
the other unmarshalV2 block at the later location (lines ~249-257) as well.
---
Duplicate comments:
In `@pkg/eventservice/scan_window.go`:
- Line 321: Change the log level from Info to Debug and fix the typo in the
message: replace the call log.Info("dispatcher is stale, skip it's sent resolved
ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID",
dispatcher.id)) with log.Debug("dispatcher is stale, skip its sent resolved ts",
zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID",
dispatcher.id)) so the hot 1s path doesn't spam Info and the message uses "its"
correctly.
- Around line 188-194: Update the comment block above the adjustScanInterval
function to reflect the actual threshold constants used (memoryUsageLowThreshold
= 20% and memoryUsageVeryLowThreshold = 10%) and ensure the listed behaviors and
percentage values match the implemented logic; specifically, change "Low (<30%
max AND avg)" to "Low (<20% max AND avg)" and "Very low (<10% max AND avg)"
remains 10% but ensure wording matches the memoryUsageVeryLowThreshold constant,
and apply the same corrected wording in the second comment block later in the
same function (the block around the other threshold descriptions).
---
Nitpick comments:
In `@pkg/eventservice/event_broker.go`:
- Around line 1219-1241: The map changedChangefeeds (type
map[*changefeedStatus]struct{}) is never used; remove its allocation and the
only population site to avoid unnecessary per-heartbeat work: delete the
declaration changedChangefeeds := make(map[*changefeedStatus]struct{}) and
remove the line changedChangefeeds[dispatcher.changefeedStat] = struct{}{} in
the heartbeat loop that processes DispatcherProgresses (referencing
dispatcher.changefeedStat and changefeedStatus to locate the code). Ensure no
other code depends on changedChangefeeds; if it was meant to be used later,
either implement that usage or keep and document it.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
downstreamadapter/eventcollector/event_collector.gopkg/common/event/congestion_control.gopkg/common/event/congestion_control_test.gopkg/eventservice/event_broker.gopkg/eventservice/event_broker_test.gopkg/eventservice/scan_window.gopkg/eventservice/scan_window_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/common/event/congestion_control_test.go
| func (m *AvailableMemory) unmarshalV2(buf *bytes.Buffer) { | ||
| m.Gid.Unmarshal(buf.Next(m.Gid.GetSize())) | ||
| m.Available = binary.BigEndian.Uint64(buf.Next(8)) | ||
| m.UsageRatio = math.Float64frombits(binary.BigEndian.Uint64(buf.Next(8))) | ||
| m.DispatcherCount = binary.BigEndian.Uint32(buf.Next(4)) | ||
| m.DispatcherAvailable = make(map[common.DispatcherID]uint64) | ||
| for range m.DispatcherCount { | ||
| dispatcherID := common.DispatcherID{} | ||
| dispatcherID.Unmarshal(buf.Next(dispatcherID.GetSize())) | ||
| m.DispatcherAvailable[dispatcherID] = binary.BigEndian.Uint64(buf.Next(8)) | ||
| } |
There was a problem hiding this comment.
Guard V2 decoding against truncated payloads to avoid panics.
Line 100 / Line 101 / Line 106 read fixed-width fields from buf.Next(...) without checking length first. A malformed payload can panic during Uint64/Uint32 reads.
🛠️ Suggested hardening patch
-func (m *AvailableMemory) unmarshalV2(buf *bytes.Buffer) {
+func (m *AvailableMemory) unmarshalV2(buf *bytes.Buffer) error {
+ if buf.Len() < m.Gid.GetSize()+8+8+4 {
+ return fmt.Errorf("invalid AvailableMemory V2 payload length: %d", buf.Len())
+ }
m.Gid.Unmarshal(buf.Next(m.Gid.GetSize()))
m.Available = binary.BigEndian.Uint64(buf.Next(8))
m.UsageRatio = math.Float64frombits(binary.BigEndian.Uint64(buf.Next(8)))
m.DispatcherCount = binary.BigEndian.Uint32(buf.Next(4))
m.DispatcherAvailable = make(map[common.DispatcherID]uint64)
- for range m.DispatcherCount {
+ for i := uint32(0); i < m.DispatcherCount; i++ {
dispatcherID := common.DispatcherID{}
+ if buf.Len() < dispatcherID.GetSize()+8 {
+ return fmt.Errorf("invalid dispatcher payload length: %d", buf.Len())
+ }
dispatcherID.Unmarshal(buf.Next(dispatcherID.GetSize()))
m.DispatcherAvailable[dispatcherID] = binary.BigEndian.Uint64(buf.Next(8))
}
+ return nil
}
@@
func (c *CongestionControl) decodeV2(data []byte) error {
buf := bytes.NewBuffer(data)
+ if buf.Len() < 8+4 {
+ return fmt.Errorf("invalid CongestionControl V2 payload length: %d", buf.Len())
+ }
c.clusterID = binary.BigEndian.Uint64(buf.Next(8))
c.changefeedCount = binary.BigEndian.Uint32(buf.Next(4))
c.availables = make([]AvailableMemory, 0, c.changefeedCount)
for i := uint32(0); i < c.changefeedCount; i++ {
var item AvailableMemory
- item.unmarshalV2(buf)
+ if err := item.unmarshalV2(buf); err != nil {
+ return err
+ }
c.availables = append(c.availables, item)
}
return nil
}Also applies to: 249-257
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/common/event/congestion_control.go` around lines 97 - 107,
AvailableMemory.unmarshalV2 currently reads fixed-width fields from buf without
validating the buffer length, which can panic on truncated payloads; before each
buf.Next(...) call (including the Gid.Unmarshal input, the 8-byte reads for
Available and UsageRatio, the 4-byte read for DispatcherCount, and each
dispatcher entry read) verify buf.Len() >= expectedBytes, and when insufficient
either return an error or abort decoding safely; also fix the dispatcher loop to
iterate for i := 0; i < int(m.DispatcherCount); i++ (instead of for range
m.DispatcherCount) and read each dispatcherID and its 8-byte value only after
confirming the buffer has dispatcherID.GetSize()+8 bytes available. Ensure the
same guards are applied to the other unmarshalV2 block at the later location
(lines ~249-257) as well.
Signed-off-by: dongmen <414110582@qq.com>
|
/test all |
What problem does this PR solve?
Issue Number: close #4172
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Improvements
Tests
Documentation