Skip to content

Comments

*:improve memory control#4030

Open
asddongmen wants to merge 42 commits intopingcap:masterfrom
asddongmen:0119-refine-dispatcher-scan-priority-v2
Open

*:improve memory control#4030
asddongmen wants to merge 42 commits intopingcap:masterfrom
asddongmen:0119-refine-dispatcher-scan-priority-v2

Conversation

@asddongmen
Copy link
Collaborator

@asddongmen asddongmen commented Jan 20, 2026

What problem does this PR solve?

Issue Number: close #4172

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • New Features

    • Versioned congestion control with per-changefeed memory usage ratio reporting.
    • Memory-pressure aware scan window scaling with adaptive interval adjustments.
    • DDL workload generator: configurable modes, per-DDL rates, worker pool, and executor.
  • Improvements

    • More detailed memory reporting and adjusted release thresholds.
    • New scan-window metrics for base timestamp and interval.
  • Tests

    • Expanded tests for congestion control v2, scan window behavior, and DDL config/flows.
  • Documentation

    • Added DDL workload usage examples and config templates.

Signed-off-by: dongmen <414110582@qq.com>
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 20, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Jan 20, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @asddongmen, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant enhancement to the system's memory control by upgrading the congestion control messaging to include detailed memory usage and maximum memory limits. A new dynamic scan interval adjustment mechanism has been implemented, allowing changefeeds to adapt their scanning rate based on real-time memory consumption. This adaptive approach is designed to improve system stability and performance by proactively managing memory pressure.

Highlights

  • Versioned Congestion Control: Introduced CongestionControlVersion2 to include Used and Max memory information for changefeeds in congestion control messages, allowing for more granular memory management.
  • Dynamic Scan Interval Adjustment: Implemented a mechanism to dynamically adjust the scan interval for changefeeds based on their observed memory usage, aiming to prevent memory overruns and improve stability.
  • Memory Usage Tracking: Added a new memoryUsageWindow component to sample and average memory usage ratios over time, which informs the dynamic scan interval adjustments.
  • Enhanced Event Collector: The EventCollector now gathers and reports detailed memory usage metrics (Used and Max) from memory quotas, sending them via the new CongestionControlVersion2 messages.
  • Event Broker Integration: The EventBroker was updated to process the new versioned congestion control messages, extract memory usage data, and apply the dynamic scan interval adjustments to changefeeds.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request enhances memory control by introducing a version 2 for congestion control messages, which now include used and max memory details. This new information is leveraged to dynamically adjust the event scanning interval, making memory management more adaptive. The implementation is solid and includes relevant tests for the new functionality. My feedback primarily focuses on opportunities to refactor duplicated code in the serialization logic to improve long-term maintainability.

Comment on lines 650 to 659
if existing, exists := changefeedUsedMemory[cfID]; exists {
changefeedUsedMemory[cfID] = min(existing, uint64(quota.MemoryUsage()))
} else {
changefeedUsedMemory[cfID] = uint64(quota.MemoryUsage())
}
if existing, exists := changefeedMaxMemory[cfID]; exists {
changefeedMaxMemory[cfID] = min(existing, uint64(quota.MaxMemory()))
} else {
changefeedMaxMemory[cfID] = uint64(quota.MaxMemory())
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for updating changefeedUsedMemory and changefeedMaxMemory is duplicated. To improve maintainability and reduce redundancy, consider extracting this logic into a helper function.

Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
@ti-chi-bot ti-chi-bot bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Jan 21, 2026
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
@asddongmen
Copy link
Collaborator Author

/test all

@asddongmen asddongmen removed do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. do-not-merge/needs-triage-completed labels Feb 10, 2026
@asddongmen asddongmen self-assigned this Feb 10, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 10, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please ask for approval from asddongmen. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@asddongmen asddongmen marked this pull request as ready for review February 10, 2026 11:27
@asddongmen
Copy link
Collaborator Author

/test all

@asddongmen asddongmen removed the release-note Denotes a PR that will be considered when it comes time to generate release notes. label Feb 11, 2026
Signed-off-by: dongmen <414110582@qq.com>
@ti-chi-bot ti-chi-bot bot added the release-note Denotes a PR that will be considered when it comes time to generate release notes. label Feb 13, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/eventservice/event_broker.go (1)

1218-1239: ⚠️ Potential issue | 🟡 Minor

changedChangefeeds is populated but never consumed — dead code.

The map is built at line 1218 and entries are inserted at line 1239, but it is never read after the loop. If this was meant to trigger a per-changefeed action (e.g., eagerly refreshing minSentResolvedTs), the follow-up logic is missing. Otherwise, remove it to avoid confusion.

🤖 Fix all issues with AI agents
In `@downstreamadapter/eventcollector/event_collector.go`:
- Around line 650-659: The current update uses min(...) for changefeedUsedMemory
which can underreport memory pressure; changefeedUsedMemory[cfID] should use the
larger of the two sources so replace the min(...) logic with max(...) when
computing used memory (keep the existing branching that checks if existing,
exists := changefeedUsedMemory[cfID] and use uint64(quota.MemoryUsage()) and
cfID identifiers). Leave the changefeedMaxMemory logic as-is (or continue using
min for capacity) and ensure you import/define max if not already available.

In `@pkg/eventservice/dispatcher_stat.go`:
- Around line 430-438: Remove the unused field lastRatio atomic.Float64 from the
struct where it's declared (the field named lastRatio); update the struct
definition in dispatcher_stat.go to delete that field declaration and run
tests/linters to ensure no references remain—no other code changes required
since lastRatio is not read or written anywhere.

In `@pkg/eventservice/scan_window.go`:
- Around line 192-206: Update the top-of-function doc comment for
adjustScanInterval (and the two inline comments near the trend checks) so the
threshold percentages match the actual constants used:
memoryUsageCriticalThreshold = 90% (critical), memoryUsageHighThreshold = 70%
(high), increasingTrendStartRatio = 30% (trend detection),
memoryUsageLowThreshold = 20% (low), and memoryUsageVeryLowThreshold = 10% (very
low); keep the described actions (1/4 aggressive reduction, 1/2 reduction, 10%
trend damping, +25% low increase, +50% very-low increase) but replace the
mismatched percent values in the doc and inline comments to reference these
constant names/percentages for clarity.

In `@pkg/sink/mysql/mysql_writer_ddl.go`:
- Line 38: Replace the hardcoded time.Sleep(time.Second * 5) with the
configurable dry-run delay: locate the time.Sleep call in mysql_writer_ddl.go
(the block using DryRun behavior) and use the DryRunDelay configuration value
(or a named constant if the config field was intentionally removed) to compute
the sleep duration; e.g., derive a time.Duration from the DryRunDelay field on
the writer/config struct and pass that into time.Sleep so tests can control the
delay (or define and use a descriptive constant like DefaultDryRunDelay if
config is absent).

In `@tools/workload/ddl_config_test.go`:
- Around line 1-7: This file is missing the standard Apache 2.0 copyright header
which causes CI failure; insert the same Apache 2.0 header used by the other new
files in this PR at the very top of tools/workload/ddl_config_test.go (above the
package main declaration) so the file's header matches the project's copyright
template.

In `@tools/workload/ddl_executor.go`:
- Around line 53-58: The initial connection attempt using getConnWithTimeout in
the worker should not log, sleep, and return (which causes wg.Done and
permanently loses the worker); instead implement a retry loop similar to the
later reconnection logic: repeatedly call getConnWithTimeout(db.DB,
10*time.Second) with exponential/backoff or fixed delay (e.g., 5s) until
success, logging errors via plog.Info/zap.Error each attempt, and only proceed
once conn is established so the worker thread (in ddl_executor.go) doesn't exit
prematurely; update the block around getConnWithTimeout, plog.Info, and the
surrounding startup sequence to retry rather than return.
- Around line 67-82: Currently a connection error path closes conn and then
continues, which lets the outer loop consume the next task from r.taskCh and
lose work; change the logic so that after conn.Close() (inside the error branch
where r.app.isConnectionError(err) is true) you retry reconnecting (using
getConnWithTimeout and the same backoff/delay strategy) in a loop without
returning to the outer receive (do not read from r.taskCh while reconnecting),
assign conn only on successful reconnect, and then re-run r.executeTask(conn,
task) for the same task (i.e., do not drop the current task); reference
r.taskCh, r.executeTask, conn, getConnWithTimeout and r.app.isConnectionError
when locating and updating the code.
🧹 Nitpick comments (9)
utils/dynstream/memory_control_test.go (1)

275-292: Helper mirrors production sort order implicitly through argument ordering.

calcExpectedReleasedPaths iterates paths in the caller-supplied order, while releaseMemory sorts by lastHandleEventTs descending before iterating. The test calls pass paths pre-sorted (300, 200, 100), so results happen to match. If a future edit changes the argument order or the path timestamps, this helper will silently produce wrong expectations.

Consider sorting inside the helper the same way releaseMemory does:

♻️ Suggested improvement
 	calcExpectedReleasedPaths := func(
 		as *areaMemStat[int, string, *mockEvent, any, *mockHandler],
 		paths ...*pathInfo[int, string, *mockEvent, any, *mockHandler],
 	) []string {
+		sort.Slice(paths, func(i, j int) bool {
+			return paths[i].lastHandleEventTs.Load() > paths[j].lastHandleEventTs.Load()
+		})
 		sizeToRelease := int64(float64(as.totalPendingSize.Load()) * defaultReleaseMemoryRatio)

(You'd also need to add "sort" to the imports.)

tools/workload/ddl_config.go (1)

68-86: Minor: mode defaulting uses pre-trimmed table list.

normalize() checks len(c.Tables) > 0 before filtering empty/whitespace entries. If all entries are whitespace-only, Mode defaults to ddlModeFixed, then the subsequent validate() correctly rejects it. Not a bug, but reordering the trim-and-filter block before the mode-defaulting logic would make the intent clearer.

♻️ Suggested reorder
 func (c *DDLConfig) normalize() {
 	c.Mode = strings.ToLower(strings.TrimSpace(c.Mode))
-	if c.Mode == "" {
-		if len(c.Tables) > 0 {
-			c.Mode = ddlModeFixed
-		} else {
-			c.Mode = ddlModeRandom
-		}
-	}
 
 	// Trim and drop empty entries.
 	tables := make([]string, 0, len(c.Tables))
 	for _, t := range c.Tables {
 		t = strings.TrimSpace(t)
 		if t != "" {
 			tables = append(tables, t)
 		}
 	}
 	c.Tables = tables
+
+	if c.Mode == "" {
+		if len(c.Tables) > 0 {
+			c.Mode = ddlModeFixed
+		} else {
+			c.Mode = ddlModeRandom
+		}
+	}
 }
tools/workload/ddl_config_test.go (1)

9-45: Use testify/require instead of manual if err != nil { t.Fatalf(...) } patterns.

The coding guidelines mandate testify/require for test assertions. This applies to all subtests in both TestParseTableName and TestLoadDDLConfig. For example:

// Before
table, err := ParseTableName("test.sbtest1", "ignored")
if err != nil {
    t.Fatalf("unexpected error: %v", err)
}
if table.Schema != "test" || table.Name != "sbtest1" {
    t.Fatalf("unexpected table: %+v", table)
}

// After
table, err := ParseTableName("test.sbtest1", "ignored")
require.NoError(t, err)
require.Equal(t, "test", table.Schema)
require.Equal(t, "sbtest1", table.Name)

Similarly, error-expecting tests can use require.Error(t, err). As per coding guidelines, **/*_test.go: favor deterministic tests and use testify/require.

tools/workload/ddl_runner.go (2)

115-131: Scheduler goroutines have no shutdown path and can block indefinitely on taskCh.

startTypeScheduler spawns a goroutine with no context.Context or stop channel. If workers are slow or dead, r.taskCh <- DDLTask{...} blocks forever with no way to cancel. Consider accepting a context.Context and using a select with ctx.Done() for both the ticker wait and the channel send.

♻️ Sketch with context-based cancellation
-func (r *DDLRunner) startTypeScheduler(ddlType DDLType, perMinute int) {
+func (r *DDLRunner) startTypeScheduler(ctx context.Context, ddlType DDLType, perMinute int) {
 	if perMinute <= 0 {
 		return
 	}
 
 	go func() {
 		ticker := time.NewTicker(time.Minute)
 		defer ticker.Stop()
 
 		for {
 			for i := 0; i < perMinute; i++ {
 				table, ok := r.selector.Next()
 				if !ok {
 					r.app.Stats.DDLSkipped.Add(1)
 					continue
 				}
-				r.taskCh <- DDLTask{Type: ddlType, Table: table}
+				select {
+				case r.taskCh <- DDLTask{Type: ddlType, Table: table}:
+				case <-ctx.Done():
+					return
+				}
 			}
-			<-ticker.C
+			select {
+			case <-ticker.C:
+			case <-ctx.Done():
+				return
+			}
 		}
 	}()
 }

133-145: Refresh error logged at Info level; consider Warn.

Line 141 logs a table-refresh failure at Info level. Since this is an error condition (database query failure), Warn would be more appropriate for operational visibility.

♻️ Proposed fix
-			plog.Info("refresh random tables failed", zap.Error(err))
+			plog.Warn("refresh random tables failed", zap.Error(err))
pkg/eventservice/scan_window.go (2)

151-160: Sliding window reslice may retain underlying array, causing a slow memory leak under sustained load.

w.samples = w.samples[idx:] keeps a reference to the original backing array. Over time, if samples are continuously added and pruned, the old portion of the slice is never GC'd. For a 30s window this is unlikely to be a practical issue, but worth noting.

♻️ Optional fix to copy remaining samples
 func (w *memoryUsageWindow) pruneLocked(now time.Time) {
 	cutoff := now.Add(-w.window)
 	idx := 0
 	for idx < len(w.samples) && w.samples[idx].ts.Before(cutoff) {
 		idx++
 	}
 	if idx > 0 {
-		w.samples = w.samples[idx:]
+		remaining := make([]memoryUsageSample, len(w.samples)-idx)
+		copy(remaining, w.samples[idx:])
+		w.samples = remaining
 	}
 }

313-354: Stale dispatcher log at Info level may be noisy.

refreshMinSentResolvedTs is called every second (from event_broker.go line 262). If a dispatcher stays stale for an extended period, this log.Info fires once per second per stale dispatcher. Consider downgrading to log.Debug or rate-limiting.

pkg/eventservice/event_broker.go (1)

1185-1190: Resetting scan interval for the entire changefeed on a single dispatcher reset.

When one dispatcher resets (epoch > 1), the scan interval is reset to defaultScanInterval for the whole changefeed, potentially disrupting all other dispatchers' scan windows. This is safe (the interval will adapt back quickly), but worth documenting the intent — is it intentional to "fast-brake" the entire changefeed when any dispatcher resets?

pkg/common/event/congestion_control.go (1)

112-134: Size calculation allocates DispatcherID objects unnecessarily in a loop.

Both sizeV1 and sizeV2 allocate a new common.DispatcherID per iteration just to call GetSize(), which returns a constant. This can be simplified.

♻️ Suggested simplification
 func (m AvailableMemory) sizeV1() int {
 	size := m.Gid.GetSize() + 8
 	size += 4 // dispatcher count
-	for range m.DispatcherCount {
-		dispatcherID := &common.DispatcherID{}
-		size += dispatcherID.GetSize() + 8
-	}
+	dispatcherIDSize := (&common.DispatcherID{}).GetSize()
+	size += int(m.DispatcherCount) * (dispatcherIDSize + 8)
 	return size
 }

 func (m AvailableMemory) sizeV2() int {
 	size := m.Gid.GetSize() + 8 + 8 + 8
 	size += 4 // dispatcher count
-	for range m.DispatcherCount {
-		dispatcherID := &common.DispatcherID{}
-		size += dispatcherID.GetSize() + 8
-	}
+	dispatcherIDSize := (&common.DispatcherID{}).GetSize()
+	size += int(m.DispatcherCount) * (dispatcherIDSize + 8)
 	return size
 }

Signed-off-by: dongmen <414110582@qq.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/eventservice/event_broker.go (1)

1219-1241: ⚠️ Potential issue | 🟠 Major

Remove unused changedChangefeeds map or complete its implementation.

The changedChangefeeds map is created and populated during heartbeat processing (lines 1219, 1240) but is never read before the function returns. This appears to be dead code or an incomplete implementation—the map was likely intended to trigger scan window parameter updates for affected changefeeds but the follow-up logic is missing.

🤖 Fix all issues with AI agents
In `@pkg/eventservice/event_broker.go`:
- Around line 1259-1271: The counters usageCount and zeroMaxCount are
incremented inside the loop over availables but never read; remove the dead code
by deleting the declarations "usageCount := 0" and "zeroMaxCount := 0" and
removing the increments "usageCount++" and "zeroMaxCount++" from the loop that
populates holder and usage (symbols: availables, holder, usage, usageInfo); if
the counts were intended for observability instead, instead of removing them,
wire them into a log/metric export after the loop (e.g., processLogger.Infof or
metrics.Inc/Set) so the values are actually used.
🧹 Nitpick comments (3)
pkg/eventservice/event_broker.go (3)

261-262: Nit: Simplify time.Second * 1 to time.Second.

The * 1 multiplier is redundant.

♻️ Suggested fix
-	ticker := time.NewTicker(time.Second * 1)
+	ticker := time.NewTicker(time.Second)

1010-1015: Consider extracting the changefeed cleanup into a helper to reduce duplication.

The pattern of deleting the changefeed map entry + three metric gauge labels is repeated in four locations (addDispatcher ×2, removeDispatcher, and potentially more). A small helper like cleanupChangefeedMetrics(changefeedID) would centralize this.

♻️ Example helper
func (c *eventBroker) cleanupChangefeedStatus(changefeedID common.ChangeFeedID) {
	c.changefeedMap.Delete(changefeedID)
	metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeedID.String())
	metrics.EventServiceScanWindowBaseTsGaugeVec.DeleteLabelValues(changefeedID.String())
	metrics.EventServiceScanWindowIntervalGaugeVec.DeleteLabelValues(changefeedID.String())
}

Then replace each occurrence with c.cleanupChangefeedStatus(changefeedID).


1205-1214: Use LoadOrStore to prevent race condition in getOrSetChangefeedStatus.

The current Load + Store pattern is not atomic. Two concurrent callers can both see !ok from Load and each create a separate changefeedStatus, with one silently overwriting the other. The new metric initialization (lines 1211-1212) could emit duplicate registrations in the race window. A similar pattern in downstreamadapter/eventcollector/event_collector.go correctly uses LoadOrStore.

♻️ Proposed fix
 func (c *eventBroker) getOrSetChangefeedStatus(changefeedID common.ChangeFeedID) *changefeedStatus {
-	stat, ok := c.changefeedMap.Load(changefeedID)
-	if !ok {
-		stat = newChangefeedStatus(changefeedID)
-		log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID))
-		c.changefeedMap.Store(changefeedID, stat)
+	newStat := newChangefeedStatus(changefeedID)
+	stat, loaded := c.changefeedMap.LoadOrStore(changefeedID, newStat)
+	if !loaded {
+		log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID))
 		metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(changefeedID.String()).Set(0)
 		metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(changefeedID.String()).Set(defaultScanInterval.Seconds())
 	}
 	return stat.(*changefeedStatus)
 }

Signed-off-by: dongmen <414110582@qq.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/eventservice/event_broker.go (1)

1219-1241: ⚠️ Potential issue | 🟠 Major

changedChangefeeds is written but never consumed — dead code.

The map is allocated, populated with dispatcher.changefeedStat entries at line 1240, and then completely ignored. No code downstream reads it after the loop. Remove both the declaration and the write, analogous to the usageCount/zeroMaxCount cleanup in d4ec73c.

🧹 Proposed cleanup
 	responseMap := make(map[string]*event.DispatcherHeartbeatResponse)
-	changedChangefeeds := make(map[*changefeedStatus]struct{})
 	now := time.Now().Unix()
 	for _, dp := range heartbeat.heartbeat.DispatcherProgresses {
 		...
 		dispatcher := dispatcherPtr.Load()
 		if dispatcher.checkpointTs.Load() < dp.CheckpointTs {
 			dispatcher.checkpointTs.Store(dp.CheckpointTs)
 		}
 		dispatcher.lastReceivedHeartbeatTime.Store(now)
-		changedChangefeeds[dispatcher.changefeedStat] = struct{}{}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/event_broker.go` around lines 1219 - 1241, Remove the dead
map changedChangefeeds and its writes: delete the declaration
"changedChangefeeds := make(map[*changefeedStatus]struct{})" and the line that
assigns "changedChangefeeds[dispatcher.changefeedStat] = struct{}{}" inside the
loop (in the section iterating heartbeat.heartbeat.DispatcherProgresses). Ensure
no other code references changedChangefeeds or relies on it; if references exist
elsewhere, replace them with the intended logic, otherwise simply remove both
lines to match the cleanup style used for usageCount/zeroMaxCount.
♻️ Duplicate comments (1)
pkg/eventservice/scan_window.go (1)

202-206: ⚠️ Potential issue | 🟡 Minor

Threshold percentages in doc/inline comments still mismatch the actual constants.

The partial fix in d4ec73c changed <40%<30% in the doc comment (line 205) but memoryUsageLowThreshold = 0.2 (20%). The inline comments (lines 262, 266) were not fixed. All three should reflect the correct constants:

Location Current text Correct text
Line 205 (doc) Low (<30% max AND avg) Low (<20% max AND avg)
Line 262 (inline) Very low pressure (<20%) Very low pressure (<10%)
Line 266 (inline) Low pressure (<40%) Low pressure (<20%)

Also applies to: 262-266

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/scan_window.go` around lines 202 - 206, The inline/doc
comments in scan_window.go don't match the actual threshold constants: update
the doc comment and inline comments to reflect memoryUsageLowThreshold = 0.2 and
memoryUsageVeryLowThreshold = 0.1; specifically change the doc string "Low (<30%
max AND avg)" to "Low (<20% max AND avg)", change the inline comment "Very low
pressure (<20%)" to "Very low pressure (<10%)", and change "Low pressure (<40%)"
to "Low pressure (<20%)" so they match the constants referenced by the memory
pressure logic (look for symbols memoryUsageLowThreshold and
memoryUsageVeryLowThreshold in the file).
🧹 Nitpick comments (1)
pkg/eventservice/scan_window.go (1)

151-160: pruneLocked retains the original backing array after pruning.

w.samples = w.samples[idx:] keeps the full backing array alive in memory indefinitely. Over time, with frequent sampling, the GC cannot reclaim discarded entries. Copy to a fresh slice when pruning a significant portion.

♻️ Proposed fix
 func (w *memoryUsageWindow) pruneLocked(now time.Time) {
 	cutoff := now.Add(-w.window)
 	idx := 0
 	for idx < len(w.samples) && w.samples[idx].ts.Before(cutoff) {
 		idx++
 	}
 	if idx > 0 {
-		w.samples = w.samples[idx:]
+		w.samples = append([]memoryUsageSample(nil), w.samples[idx:]...)
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/scan_window.go` around lines 151 - 160, pruneLocked in
memoryUsageWindow currently sets w.samples = w.samples[idx:] which retains the
original backing array and prevents GC; update pruneLocked to allocate a new
slice and copy the kept entries into it when idx > 0 (e.g., create a new slice
of length len(w.samples)-idx, copy from w.samples[idx:] into it, then assign
that new slice to w.samples) so the old backing array can be reclaimed; keep the
logic inside pruneLocked and reference w.samples and memoryUsageWindow.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/eventservice/scan_window.go`:
- Line 334: The Info log inside refreshMinSentResolvedTs is emitted every second
and should be downgraded to Debug to avoid log spam; update the call from
log.Info(...) to log.Debug(...) and fix the message grammar by changing "it's"
to "its" while keeping the structured fields (zap.Stringer("changefeedID",
c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) intact so the
dispatcher stale message remains but at Debug level.

---

Outside diff comments:
In `@pkg/eventservice/event_broker.go`:
- Around line 1219-1241: Remove the dead map changedChangefeeds and its writes:
delete the declaration "changedChangefeeds :=
make(map[*changefeedStatus]struct{})" and the line that assigns
"changedChangefeeds[dispatcher.changefeedStat] = struct{}{}" inside the loop (in
the section iterating heartbeat.heartbeat.DispatcherProgresses). Ensure no other
code references changedChangefeeds or relies on it; if references exist
elsewhere, replace them with the intended logic, otherwise simply remove both
lines to match the cleanup style used for usageCount/zeroMaxCount.

---

Duplicate comments:
In `@pkg/eventservice/scan_window.go`:
- Around line 202-206: The inline/doc comments in scan_window.go don't match the
actual threshold constants: update the doc comment and inline comments to
reflect memoryUsageLowThreshold = 0.2 and memoryUsageVeryLowThreshold = 0.1;
specifically change the doc string "Low (<30% max AND avg)" to "Low (<20% max
AND avg)", change the inline comment "Very low pressure (<20%)" to "Very low
pressure (<10%)", and change "Low pressure (<40%)" to "Low pressure (<20%)" so
they match the constants referenced by the memory pressure logic (look for
symbols memoryUsageLowThreshold and memoryUsageVeryLowThreshold in the file).

---

Nitpick comments:
In `@pkg/eventservice/scan_window.go`:
- Around line 151-160: pruneLocked in memoryUsageWindow currently sets w.samples
= w.samples[idx:] which retains the original backing array and prevents GC;
update pruneLocked to allocate a new slice and copy the kept entries into it
when idx > 0 (e.g., create a new slice of length len(w.samples)-idx, copy from
w.samples[idx:] into it, then assign that new slice to w.samples) so the old
backing array can be reclaimed; keep the logic inside pruneLocked and reference
w.samples and memoryUsageWindow.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4f82641 and d4ec73c.

📒 Files selected for processing (4)
  • pkg/eventservice/dispatcher_stat.go
  • pkg/eventservice/event_broker.go
  • pkg/eventservice/scan_window.go
  • pkg/sink/mysql/mysql_writer_ddl.go
💤 Files with no reviewable changes (1)
  • pkg/sink/mysql/mysql_writer_ddl.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/eventservice/dispatcher_stat.go

lastHeartbeatTime := dispatcher.lastReceivedHeartbeatTime.Load()
if lastHeartbeatTime > 0 &&
now.Sub(time.Unix(lastHeartbeatTime, 0)) > scanWindowStaleDispatcherHeartbeatThreshold {
log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

log.Info in a 1-second hot path will spam logs for stale dispatchers; also has a grammatical error.

refreshMinSentResolvedTs is called every second. Any stale dispatcher will produce an Info log on every tick, which floods production logs. Downgrade to log.Debug. Fix "it's" → "its".

📝 Proposed fix
-		log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))
+		log.Debug("dispatcher is stale, skip its sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))
log.Debug("dispatcher is stale, skip its sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/scan_window.go` at line 334, The Info log inside
refreshMinSentResolvedTs is emitted every second and should be downgraded to
Debug to avoid log spam; update the call from log.Info(...) to log.Debug(...)
and fix the message grammar by changing "it's" to "its" while keeping the
structured fields (zap.Stringer("changefeedID", c.changefeedID),
zap.Stringer("dispatcherID", dispatcher.id)) intact so the dispatcher stale
message remains but at Debug level.

Signed-off-by: dongmen <414110582@qq.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
downstreamadapter/eventcollector/event_collector.go (1)

645-647: ⚠️ Potential issue | 🟠 Major

Taking min of used memory between main and redo still underreports pressure.

Line 646 merges changefeedUsedMemory with min, which picks the lower usage of the two streams. For a conservative (safe) estimate of memory pressure, you want the higher usage. changefeedTotalMemory (available) and changefeedMaxMemory (capacity) are correct with min, but used memory should use max.

Consider either:

  1. A separate updateMaxUint64MapValue helper for used memory, or
  2. Making the helper accept a merge function.
Proposed fix (option 1)
 	updateMinUint64MapValue(changefeedTotalMemory, cfID, uint64(quota.AvailableMemory()))
-	updateMinUint64MapValue(changefeedUsedMemory, cfID, uint64(quota.MemoryUsage()))
+	updateMaxUint64MapValue(changefeedUsedMemory, cfID, uint64(quota.MemoryUsage()))
 	updateMinUint64MapValue(changefeedMaxMemory, cfID, uint64(quota.MaxMemory()))

And add a corresponding helper:

func updateMaxUint64MapValue(m map[common.ChangeFeedID]uint64, key common.ChangeFeedID, value uint64) {
	if existing, exists := m[key]; exists {
		m[key] = max(existing, value)
	} else {
		m[key] = value
	}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/eventcollector/event_collector.go` around lines 645 - 647,
The code incorrectly uses updateMinUint64MapValue for changefeedUsedMemory,
underreporting memory pressure; replace that call with a new
updateMaxUint64MapValue helper (or extend updateMinUint64MapValue to accept a
merge function) so changefeedUsedMemory records the maximum of existing and new
values; implement updateMaxUint64MapValue mirroring updateMinUint64MapValue but
using max(existing, value) and call
updateMaxUint64MapValue(changefeedUsedMemory, cfID, uint64(quota.MemoryUsage()))
instead of the current min call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@downstreamadapter/eventcollector/event_collector.go`:
- Around line 645-647: The code incorrectly uses updateMinUint64MapValue for
changefeedUsedMemory, underreporting memory pressure; replace that call with a
new updateMaxUint64MapValue helper (or extend updateMinUint64MapValue to accept
a merge function) so changefeedUsedMemory records the maximum of existing and
new values; implement updateMaxUint64MapValue mirroring updateMinUint64MapValue
but using max(existing, value) and call
updateMaxUint64MapValue(changefeedUsedMemory, cfID, uint64(quota.MemoryUsage()))
instead of the current min call.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d4ec73c and f6ddfdc.

📒 Files selected for processing (1)
  • downstreamadapter/eventcollector/event_collector.go

@asddongmen
Copy link
Collaborator Author

/test all

Signed-off-by: dongmen <414110582@qq.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
pkg/eventservice/scan_window.go (2)

321-321: ⚠️ Potential issue | 🟠 Major

Downgrade stale-dispatcher log from Info to Debug in this 1s hot path.

Line 321 can spam logs continuously for stale dispatchers; also fix "it's""its" in the message text.

🛠️ Suggested fix
-			log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))
+			log.Debug("dispatcher is stale, skip its sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/scan_window.go` at line 321, Change the log level from Info
to Debug and fix the typo in the message: replace the call log.Info("dispatcher
is stale, skip it's sent resolved ts", zap.Stringer("changefeedID",
c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) with
log.Debug("dispatcher is stale, skip its sent resolved ts",
zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID",
dispatcher.id)) so the hot 1s path doesn't spam Info and the message uses "its"
correctly.

188-194: ⚠️ Potential issue | 🟡 Minor

Update threshold comments to match actual constants.

The docs/comments still state percentages that differ from the implemented thresholds (memoryUsageLowThreshold=20%, memoryUsageVeryLowThreshold=10%), which is misleading.

📝 Suggested comment fix
-//   - Low (<30% max AND avg): Increase interval by 25%
+//   - Low (<20% max AND avg): Increase interval by 25%
 //   - Very low (<10% max AND avg): Increase interval by 50%, may exceed normal cap
@@
-		// Very low pressure (<20%): increase by 50%, allowed to exceed sync point cap.
+		// Very low pressure (<10%): increase by 50%, allowed to exceed sync point cap.
@@
-		// Low pressure (<40%): increase by 25%, capped by sync point interval.
+		// Low pressure (<20%): increase by 25%, capped by sync point interval.

Also applies to: 248-254

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/scan_window.go` around lines 188 - 194, Update the comment
block above the adjustScanInterval function to reflect the actual threshold
constants used (memoryUsageLowThreshold = 20% and memoryUsageVeryLowThreshold =
10%) and ensure the listed behaviors and percentage values match the implemented
logic; specifically, change "Low (<30% max AND avg)" to "Low (<20% max AND avg)"
and "Very low (<10% max AND avg)" remains 10% but ensure wording matches the
memoryUsageVeryLowThreshold constant, and apply the same corrected wording in
the second comment block later in the same function (the block around the other
threshold descriptions).
🧹 Nitpick comments (1)
pkg/eventservice/event_broker.go (1)

1219-1241: Remove unused heartbeat-side changefeed tracking map.

changedChangefeeds is populated but never consumed, so it adds per-heartbeat allocation/work without effect.

♻️ Suggested cleanup
-	changedChangefeeds := make(map[*changefeedStatus]struct{})
 	now := time.Now().Unix()
@@
-		changedChangefeeds[dispatcher.changefeedStat] = struct{}{}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/eventservice/event_broker.go` around lines 1219 - 1241, The map
changedChangefeeds (type map[*changefeedStatus]struct{}) is never used; remove
its allocation and the only population site to avoid unnecessary per-heartbeat
work: delete the declaration changedChangefeeds :=
make(map[*changefeedStatus]struct{}) and remove the line
changedChangefeeds[dispatcher.changefeedStat] = struct{}{} in the heartbeat loop
that processes DispatcherProgresses (referencing dispatcher.changefeedStat and
changefeedStatus to locate the code). Ensure no other code depends on
changedChangefeeds; if it was meant to be used later, either implement that
usage or keep and document it.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/common/event/congestion_control.go`:
- Around line 97-107: AvailableMemory.unmarshalV2 currently reads fixed-width
fields from buf without validating the buffer length, which can panic on
truncated payloads; before each buf.Next(...) call (including the Gid.Unmarshal
input, the 8-byte reads for Available and UsageRatio, the 4-byte read for
DispatcherCount, and each dispatcher entry read) verify buf.Len() >=
expectedBytes, and when insufficient either return an error or abort decoding
safely; also fix the dispatcher loop to iterate for i := 0; i <
int(m.DispatcherCount); i++ (instead of for range m.DispatcherCount) and read
each dispatcherID and its 8-byte value only after confirming the buffer has
dispatcherID.GetSize()+8 bytes available. Ensure the same guards are applied to
the other unmarshalV2 block at the later location (lines ~249-257) as well.

---

Duplicate comments:
In `@pkg/eventservice/scan_window.go`:
- Line 321: Change the log level from Info to Debug and fix the typo in the
message: replace the call log.Info("dispatcher is stale, skip it's sent resolved
ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID",
dispatcher.id)) with log.Debug("dispatcher is stale, skip its sent resolved ts",
zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID",
dispatcher.id)) so the hot 1s path doesn't spam Info and the message uses "its"
correctly.
- Around line 188-194: Update the comment block above the adjustScanInterval
function to reflect the actual threshold constants used (memoryUsageLowThreshold
= 20% and memoryUsageVeryLowThreshold = 10%) and ensure the listed behaviors and
percentage values match the implemented logic; specifically, change "Low (<30%
max AND avg)" to "Low (<20% max AND avg)" and "Very low (<10% max AND avg)"
remains 10% but ensure wording matches the memoryUsageVeryLowThreshold constant,
and apply the same corrected wording in the second comment block later in the
same function (the block around the other threshold descriptions).

---

Nitpick comments:
In `@pkg/eventservice/event_broker.go`:
- Around line 1219-1241: The map changedChangefeeds (type
map[*changefeedStatus]struct{}) is never used; remove its allocation and the
only population site to avoid unnecessary per-heartbeat work: delete the
declaration changedChangefeeds := make(map[*changefeedStatus]struct{}) and
remove the line changedChangefeeds[dispatcher.changefeedStat] = struct{}{} in
the heartbeat loop that processes DispatcherProgresses (referencing
dispatcher.changefeedStat and changefeedStatus to locate the code). Ensure no
other code depends on changedChangefeeds; if it was meant to be used later,
either implement that usage or keep and document it.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f6ddfdc and c6f9f05.

📒 Files selected for processing (7)
  • downstreamadapter/eventcollector/event_collector.go
  • pkg/common/event/congestion_control.go
  • pkg/common/event/congestion_control_test.go
  • pkg/eventservice/event_broker.go
  • pkg/eventservice/event_broker_test.go
  • pkg/eventservice/scan_window.go
  • pkg/eventservice/scan_window_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/common/event/congestion_control_test.go

Comment on lines +97 to +107
func (m *AvailableMemory) unmarshalV2(buf *bytes.Buffer) {
m.Gid.Unmarshal(buf.Next(m.Gid.GetSize()))
m.Available = binary.BigEndian.Uint64(buf.Next(8))
m.UsageRatio = math.Float64frombits(binary.BigEndian.Uint64(buf.Next(8)))
m.DispatcherCount = binary.BigEndian.Uint32(buf.Next(4))
m.DispatcherAvailable = make(map[common.DispatcherID]uint64)
for range m.DispatcherCount {
dispatcherID := common.DispatcherID{}
dispatcherID.Unmarshal(buf.Next(dispatcherID.GetSize()))
m.DispatcherAvailable[dispatcherID] = binary.BigEndian.Uint64(buf.Next(8))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard V2 decoding against truncated payloads to avoid panics.

Line 100 / Line 101 / Line 106 read fixed-width fields from buf.Next(...) without checking length first. A malformed payload can panic during Uint64/Uint32 reads.

🛠️ Suggested hardening patch
-func (m *AvailableMemory) unmarshalV2(buf *bytes.Buffer) {
+func (m *AvailableMemory) unmarshalV2(buf *bytes.Buffer) error {
+	if buf.Len() < m.Gid.GetSize()+8+8+4 {
+		return fmt.Errorf("invalid AvailableMemory V2 payload length: %d", buf.Len())
+	}
 	m.Gid.Unmarshal(buf.Next(m.Gid.GetSize()))
 	m.Available = binary.BigEndian.Uint64(buf.Next(8))
 	m.UsageRatio = math.Float64frombits(binary.BigEndian.Uint64(buf.Next(8)))
 	m.DispatcherCount = binary.BigEndian.Uint32(buf.Next(4))
 	m.DispatcherAvailable = make(map[common.DispatcherID]uint64)
-	for range m.DispatcherCount {
+	for i := uint32(0); i < m.DispatcherCount; i++ {
 		dispatcherID := common.DispatcherID{}
+		if buf.Len() < dispatcherID.GetSize()+8 {
+			return fmt.Errorf("invalid dispatcher payload length: %d", buf.Len())
+		}
 		dispatcherID.Unmarshal(buf.Next(dispatcherID.GetSize()))
 		m.DispatcherAvailable[dispatcherID] = binary.BigEndian.Uint64(buf.Next(8))
 	}
+	return nil
 }
@@
 func (c *CongestionControl) decodeV2(data []byte) error {
 	buf := bytes.NewBuffer(data)
+	if buf.Len() < 8+4 {
+		return fmt.Errorf("invalid CongestionControl V2 payload length: %d", buf.Len())
+	}
 	c.clusterID = binary.BigEndian.Uint64(buf.Next(8))
 	c.changefeedCount = binary.BigEndian.Uint32(buf.Next(4))
 	c.availables = make([]AvailableMemory, 0, c.changefeedCount)
 	for i := uint32(0); i < c.changefeedCount; i++ {
 		var item AvailableMemory
-		item.unmarshalV2(buf)
+		if err := item.unmarshalV2(buf); err != nil {
+			return err
+		}
 		c.availables = append(c.availables, item)
 	}
 	return nil
 }

Also applies to: 249-257

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/common/event/congestion_control.go` around lines 97 - 107,
AvailableMemory.unmarshalV2 currently reads fixed-width fields from buf without
validating the buffer length, which can panic on truncated payloads; before each
buf.Next(...) call (including the Gid.Unmarshal input, the 8-byte reads for
Available and UsageRatio, the 4-byte read for DispatcherCount, and each
dispatcher entry read) verify buf.Len() >= expectedBytes, and when insufficient
either return an error or abort decoding safely; also fix the dispatcher loop to
iterate for i := 0; i < int(m.DispatcherCount); i++ (instead of for range
m.DispatcherCount) and read each dispatcherID and its 8-byte value only after
confirming the buffer has dispatcherID.GetSize()+8 bytes available. Ensure the
same guards are applied to the other unmarshalV2 block at the later location
(lines ~249-257) as well.

Signed-off-by: dongmen <414110582@qq.com>
@asddongmen
Copy link
Collaborator Author

/test all

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve eventBroker Scan Strategy with Memory-Aware Scan Window Algorithm

1 participant