Skip to content

logpuller: fix region leak after remove subscription#4267

Open
lidezhu wants to merge 4 commits intomasterfrom
ldz/fix-log-puller0224
Open

logpuller: fix region leak after remove subscription#4267
lidezhu wants to merge 4 commits intomasterfrom
ldz/fix-log-puller0224

Conversation

@lidezhu
Copy link
Collaborator

@lidezhu lidezhu commented Feb 25, 2026

What problem does this PR solve?

Issue Number: close #4217

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Refactor

    • Improved stop-request coordination across stores with explicit error propagation and retry/re-queue behavior when enqueueing fails.
    • Streamlined request lifecycle with atomic slot accounting and a lightweight notification to wake waiting enqueue operations, reducing redundant work and improving resource handling.
  • Tests

    • Added tests validating retry-to-enqueue behavior when per-store caches are full and verifying correct slot release when duplicate requests are replaced.

@ti-chi-bot ti-chi-bot bot added release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/needs-triage-completed labels Feb 25, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 25, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign asddongmen for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Feb 25, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a region leak that occurred when subscriptions were removed in the logpuller component. It refactors the handling of stopped regions to ensure that pending request counts are correctly decremented and resources are released, preventing resource exhaustion and improving system stability during subscription lifecycle management.

Highlights

  • Resource Release Mechanism: Introduced a markDone function in the request cache to consistently decrement pending request counts and signal available capacity, crucial for preventing resource leaks when regions are stopped or requests fail.
  • Stopped Region Handling: Modified region request workers to properly process and clear stopped regions from the cache, ensuring they do not block new requests or consume resources unnecessarily.
  • Robust Stop Request Enqueueing: Refactored the subscription client to use a new helper function, enqueueRegionToAllStores, which includes a retry mechanism to reliably enqueue stop requests to all relevant workers, even if their internal caches are temporarily full.
  • Error Handling Consistency: Updated error reporting for cancelled region requests to use a more specific requestCancelledErr instead of a generic send error.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • logservice/logpuller/region_req_cache.go
    • Simplified decPendingCount to use atomic decrement and added markDone to decrement count and notify for space.
  • logservice/logpuller/region_request_worker.go
    • Ensured markDone is called when a region is stopped or a request fails to send.
    • Updated the error type for failed region processing to requestCancelledErr.
    • Modified the region pop logic to skip stopped regions and mark them as done.
  • logservice/logpuller/subscription_client.go
    • Extracted the logic for enqueuing stop requests to all stores into a new enqueueRegionToAllStores function.
    • Implemented a retry mechanism in handleRegions for stop requests if worker caches are full.
  • logservice/logpuller/subscription_client_test.go
    • Added TestEnqueueRegionToAllStoresRetryWhenCacheFull to verify the retry logic for stop requests.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 25, 2026

📝 Walkthrough

Walkthrough

Centralizes request-slot accounting by adding spaceAvailable and markDone() to the request cache; updates workers to call markDone() on completion/error; adds enqueueRegionToAllStores() to propagate stop enqueue errors and support retry; and adds tests covering cache-full retry and duplicate-slot release scenarios.

Changes

Cohort / File(s) Summary
Request Cache
logservice/logpuller/region_req_cache.go
Adds spaceAvailable chan struct{} and markDone(); consolidates pending-slot decrement path to use atomic decrement and non-blocking space signaling; removes deprecated inc/dec helpers; updates clear/resolve/markStopped/markSent to use markDone().
Request Worker
logservice/logpuller/region_request_worker.go
Refactors waitForPreFetching to use popped request item; calls markDone() on send-error, canceled, and success paths to ensure slot accounting is updated.
Subscription Client
logservice/logpuller/subscription_client.go
Introduces enqueueRegionToAllStores() to enqueue stop requests across stores/workers with error propagation and busy-worker handling; handleRegions now checks results and retries or returns errors accordingly.
Tests — subscription client
logservice/logpuller/subscription_client_test.go
Adds TestEnqueueRegionToAllStoresRetryWhenCacheFull to validate retry and enqueue behavior when per-store request cache is full then partially drained.
Tests — request cache
logservice/logpuller/region_req_cache_test.go
Adds TestRequestCacheMarkSent_DuplicateReleaseSlot to validate duplicate add/markSent behavior and correct pending slot accounting.

Sequence Diagram

sequenceDiagram
    participant SC as SubscriptionClient
    participant Store as Store (store with workers)
    participant W as StoreWorker
    participant RC as RequestCache

    SC->>SC: enqueueRegionToAllStores(region)
    SC->>Store: for each store
    Store->>W: for each worker: add(ctx, region, stop=true)
    alt worker busy (cache full)
        W-->>Store: ok==false
        Note over Store: skip remaining workers in this store
    else enqueued
        W-->>Store: ok==true
    end
    Store-->>SC: (enqueued/busy/err)
    alt err encountered
        SC-->>SC: return (false, err)
    else all enqueued
        SC-->>SC: return (true, nil)
    end

    Note over W,RC: when a request completes or is removed
    W->>RC: markDone()
    RC->>RC: pendingCount atomic.Dec (clamped >=0)
    RC-->>W: non-blocking signal on spaceAvailable (if any waiter)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~30 minutes

Suggested labels

lgtm, approved, size/M

Suggested reviewers

  • hongyunyan
  • asddongmen
  • flowbehappy

Poem

🐰 I nibbled counters, gave a gentle chime,
A slot released — the queues now rhyme.
I hopped between stores, workers, and cache,
Sent stop requests in a tidy dash.
Hooray — no stale regions left to stash! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly identifies the main issue being addressed: fixing a region leak after subscription removal in logpuller.
Description check ✅ Passed The description includes the required issue reference (close #4217) and lists applicable test types, but lacks detailed explanation of what was changed and how it works.
Linked Issues check ✅ Passed The PR addresses issue #4217 by introducing flow control and proper slot release mechanisms to prevent regions from leaking after subscription removal.
Out of Scope Changes check ✅ Passed All changes focus on fixing region leak issues through flow control improvements and request lifecycle management in the logpuller subscription handling.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ldz/fix-log-puller0224

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a resource leak issue in the logpuller by ensuring the pending region request counter is correctly decremented in all relevant scenarios, particularly for stopped or canceled subscriptions. The introduction of the markDone helper function and its consistent application is a solid improvement. Additionally, the new logic to retry enqueuing stop requests enhances the system's robustness. The changes are logical and well-supported by a new unit test. I have one minor suggestion to optimize the process of enqueuing stop requests.

Comment on lines 651 to 653
if !ok {
enqueued = false
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When worker.add returns !ok, it indicates the worker's queue is full. In this case, it's more efficient to break out of the inner loop that iterates over workers for the current store. Since the entire enqueueRegionToAllStores operation will be retried later if enqueued is false, continuing to try other workers for the same (and likely busy) store is unnecessary. This change is a small optimization to avoid redundant work.

            if !ok {
                enqueued = false
                break
            }

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
logservice/logpuller/region_req_cache.go (1)

305-319: markStopped/resolve duplicate the markDone notification tail — consider extracting

markStopped (lines 179–184) and resolve (lines 204–216) both call decPendingCount() followed by an identical non-blocking spaceAvailable send — exactly what markDone does. Since both methods handle sentRequests removal separately, they could end with c.markDone() instead of inlining the same two-step pattern, eliminating the duplication.

♻️ Proposed refactor for `markStopped` and `resolve`
 func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) {
     ...
     delete(regionReqs, regionID)
-    c.decPendingCount()
-    select {
-    case c.spaceAvailable <- struct{}{}:
-    default:
-    }
+    c.markDone()
 }

 // inside resolve, replace the tail:
-    c.decPendingCount()
-    ...
-    select {
-    case c.spaceAvailable <- struct{}{}:
-    default:
-    }
+    c.markDone()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_req_cache.go` around lines 305 - 319, Replace the
duplicated tail logic in markStopped and resolve with a call to the existing
markDone method: both functions currently call decPendingCount() and then
perform the same non-blocking send to c.spaceAvailable; remove those two lines
in markStopped and resolve and invoke c.markDone() instead to centralize
pending-count decrement and notification logic (keep existing sentRequests
removal logic in markStopped/resolve intact).
logservice/logpuller/region_request_worker.go (1)

353-356: regionReq wraps a fresh createTime, discarding the original enqueue time

regionReq := newRegionReq(region) (line 355) re-timestamps the pre-fetched region with time.Now(). The original enqueue time from waitForPreFetching is lost, so RegionRequestFinishScanDuration metrics will slightly undercount latency (the time waiting in the prefetch queue is excluded).

This is minor, but if accurate latency tracking matters, you can pass the original req through preFetchForConnecting (or store both regionInfo and createTime):

💡 Preserving original timestamp
-// preFetchForConnecting *regionInfo
+// preFetchForConnecting *regionReq   // keep original createTime

 // in waitForPreFetching:
-worker.preFetchForConnecting = new(regionInfo)
-*worker.preFetchForConnecting = req.regionInfo
+worker.preFetchForConnecting = new(regionReq)
+*worker.preFetchForConnecting = req

 // in processRegionSendTask:
-region := *s.preFetchForConnecting
-s.preFetchForConnecting = nil
-regionReq := newRegionReq(region)
+regionReq := *s.preFetchForConnecting
+s.preFetchForConnecting = nil
+region := regionReq.regionInfo
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_request_worker.go` around lines 353 - 356,
preFetchForConnecting currently holds only a regionInfo which is re-wrapped with
newRegionReq(region) losing the original enqueue timestamp and skewing
RegionRequestFinishScanDuration; modify preFetchForConnecting to carry the
original request timestamp (or the entire RegionReq) from waitForPreFetching and
use that preserved createTime when constructing regionReq instead of calling
newRegionReq with a fresh time so the original enqueue time is retained for
accurate latency metrics (update references in waitForPreFetching,
preFetchForConnecting, and where regionReq is consumed).
logservice/logpuller/subscription_client_test.go (1)

219-261: TestEnqueueRegionToAllStoresNoRedundantStopRequests — LGTM; consider adding an error-path test

The deduplication invariant (worker1's already-enqueued stop is not re-sent on retry) is correctly validated. For completeness, consider a third test covering the worker.add error path (returning a non-nil error) to assert that enqueueRegionToAllStores returns (false, err) and that enqueuedWorkers entries are not leaked (since the cleanup at lines 690–695 of subscription_client.go is skipped on error).

Would you like me to draft a TestEnqueueRegionToAllStoresErrorPath test? I can also open a new issue to track this gap.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/subscription_client_test.go` around lines 219 - 261, Add
a new test (e.g., TestEnqueueRegionToAllStoresErrorPath) that simulates
regionRequestWorker.add returning a non-nil error and asserts that
enqueueRegionToAllStores(ctx, region) returns (false, err) and does not leak
enqueued worker state; specifically, create a subscriptionClient with two
regionRequestWorker instances, make one worker's add return an error (inject by
overriding or wrapping regionRequestWorker.add), call enqueueRegionToAllStores
and require a non-nil error and enqueued==false, and finally verify no workers
were left marked/enqueued (check pendingQueue lengths on both workers and that
client.stores still contains the store and that no per-worker enqueued
flags/entries were added).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 644-698: enqueueRegionToAllStores records worker IDs in
s.stopTasks.enqueuedWorkers[subID] but if worker.add returns an error (firstErr
set) the function returns immediately without removing the partially-populated
map entry; update enqueueRegionToAllStores to perform defensive cleanup before
returning on error: acquire s.stopTasks lock, delete
s.stopTasks.enqueuedWorkers[subID] (or undo only the entries added by this call)
and then release the lock, ensuring any early return paths that set firstErr
remove the subID entry so enqueuedWorkers is not left stale; reference the
symbols enqueueRegionToAllStores, s.stopTasks.enqueuedWorkers, subID, firstErr
and the worker.add error-return path to locate where to insert the cleanup.
- Around line 600-608: When enqueueRegionToAllStores returns enqueued==false we
currently re-push regionTask immediately (regionTaskQueue.Push(regionTask)),
which causes tight retry spinning and high CPU when workers are busy; modify the
retry branch in the function that calls enqueueRegionToAllStores to add a small
backoff before re-pushing (e.g. time.Sleep with a few milliseconds or use a
non-blocking time.After/select) so that regionTask isn't immediately popped
again; update the block around enqueueRegionToAllStores, regionTaskQueue.Push,
and regionTask to perform the delay only on the enqueued==false path and keep
existing logging intact.

---

Nitpick comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 305-319: Replace the duplicated tail logic in markStopped and
resolve with a call to the existing markDone method: both functions currently
call decPendingCount() and then perform the same non-blocking send to
c.spaceAvailable; remove those two lines in markStopped and resolve and invoke
c.markDone() instead to centralize pending-count decrement and notification
logic (keep existing sentRequests removal logic in markStopped/resolve intact).

In `@logservice/logpuller/region_request_worker.go`:
- Around line 353-356: preFetchForConnecting currently holds only a regionInfo
which is re-wrapped with newRegionReq(region) losing the original enqueue
timestamp and skewing RegionRequestFinishScanDuration; modify
preFetchForConnecting to carry the original request timestamp (or the entire
RegionReq) from waitForPreFetching and use that preserved createTime when
constructing regionReq instead of calling newRegionReq with a fresh time so the
original enqueue time is retained for accurate latency metrics (update
references in waitForPreFetching, preFetchForConnecting, and where regionReq is
consumed).

In `@logservice/logpuller/subscription_client_test.go`:
- Around line 219-261: Add a new test (e.g.,
TestEnqueueRegionToAllStoresErrorPath) that simulates regionRequestWorker.add
returning a non-nil error and asserts that enqueueRegionToAllStores(ctx, region)
returns (false, err) and does not leak enqueued worker state; specifically,
create a subscriptionClient with two regionRequestWorker instances, make one
worker's add return an error (inject by overriding or wrapping
regionRequestWorker.add), call enqueueRegionToAllStores and require a non-nil
error and enqueued==false, and finally verify no workers were left
marked/enqueued (check pendingQueue lengths on both workers and that
client.stores still contains the store and that no per-worker enqueued
flags/entries were added).

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a13afb3 and ba00712.

📒 Files selected for processing (4)
  • logservice/logpuller/region_req_cache.go
  • logservice/logpuller/region_request_worker.go
  • logservice/logpuller/subscription_client.go
  • logservice/logpuller/subscription_client_test.go

Comment on lines +600 to +608
enqueued, err := s.enqueueRegionToAllStores(ctx, region)
if err != nil {
return err
}
if !enqueued {
log.Debug("enqueue stop request failed, retry later",
zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)))
s.regionTaskQueue.Push(regionTask)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Tight retry spin when workers are busy may cause high CPU burn

When enqueueRegionToAllStores returns enqueued=false (all cache slots occupied), the task is re-pushed to regionTaskQueue with no delay. If the queue is otherwise empty, Pop returns it immediately, and the 3 ms of back-off inside add is the only throttle, resulting in a busy-polling loop that burns CPU while waiting for a worker to drain its cache.

Consider adding a brief sleep before re-pushing, or using a small time.After in the retry branch:

💡 Suggested mitigation
 			if !enqueued {
 				log.Debug("enqueue stop request failed, retry later",
 					zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)))
+				if err := util.Hang(ctx, 10*time.Millisecond); err != nil {
+					return err
+				}
 				s.regionTaskQueue.Push(regionTask)
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/subscription_client.go` around lines 600 - 608, When
enqueueRegionToAllStores returns enqueued==false we currently re-push regionTask
immediately (regionTaskQueue.Push(regionTask)), which causes tight retry
spinning and high CPU when workers are busy; modify the retry branch in the
function that calls enqueueRegionToAllStores to add a small backoff before
re-pushing (e.g. time.Sleep with a few milliseconds or use a non-blocking
time.After/select) so that regionTask isn't immediately popped again; update the
block around enqueueRegionToAllStores, regionTaskQueue.Push, and regionTask to
perform the delay only on the enqueued==false path and keep existing logging
intact.

Comment on lines 644 to 698
func (s *subscriptionClient) enqueueRegionToAllStores(ctx context.Context, region regionInfo) (bool, error) {
subID := region.subscribedSpan.subID
enqueued := true
var firstErr error
s.stores.Range(func(_ any, value any) bool {
rs := value.(*requestedStore)
rs.requestWorkers.RLock()
workers := rs.requestWorkers.s
rs.requestWorkers.RUnlock()
for _, worker := range workers {
s.stopTasks.Lock()
workersEnqueued, ok := s.stopTasks.enqueuedWorkers[subID]
if ok {
_, ok = workersEnqueued[worker.workerID]
}
s.stopTasks.Unlock()
if ok {
continue
}

ok, err := worker.add(ctx, region, true)
if err != nil {
firstErr = err
enqueued = false
return false
}
if !ok {
enqueued = false
// It is likely the store is busy, no need to try other workers in this store now.
break
}

s.stopTasks.Lock()
if s.stopTasks.enqueuedWorkers == nil {
s.stopTasks.enqueuedWorkers = make(map[SubscriptionID]map[uint64]struct{})
}
workersEnqueued = s.stopTasks.enqueuedWorkers[subID]
if workersEnqueued == nil {
workersEnqueued = make(map[uint64]struct{})
s.stopTasks.enqueuedWorkers[subID] = workersEnqueued
}
workersEnqueued[worker.workerID] = struct{}{}
s.stopTasks.Unlock()
}
return true
})
if enqueued {
s.stopTasks.Lock()
if s.stopTasks.enqueuedWorkers != nil {
delete(s.stopTasks.enqueuedWorkers, subID)
}
s.stopTasks.Unlock()
}
return enqueued, firstErr
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

enqueuedWorkers[subID] is never cleaned up on the error-return path

When firstErr != nil (line 666), the function returns (false, firstErr) immediately. Any workers that were already recorded in enqueuedWorkers[subID] during this invocation remain there permanently. In practice this is harmless because handleRegions propagates the error and the entire client shuts down (line 601–603), but it does make the cleanup logic asymmetric and would cause a stale-map entry if the shutdown path were ever changed to be recoverable.

🛡️ Defensive cleanup on error path
 		ok, err := worker.add(ctx, region, true)
 		if err != nil {
 			firstErr = err
 			enqueued = false
+			s.stopTasks.Lock()
+			if s.stopTasks.enqueuedWorkers != nil {
+				delete(s.stopTasks.enqueuedWorkers, subID)
+			}
+			s.stopTasks.Unlock()
 			return false
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (s *subscriptionClient) enqueueRegionToAllStores(ctx context.Context, region regionInfo) (bool, error) {
subID := region.subscribedSpan.subID
enqueued := true
var firstErr error
s.stores.Range(func(_ any, value any) bool {
rs := value.(*requestedStore)
rs.requestWorkers.RLock()
workers := rs.requestWorkers.s
rs.requestWorkers.RUnlock()
for _, worker := range workers {
s.stopTasks.Lock()
workersEnqueued, ok := s.stopTasks.enqueuedWorkers[subID]
if ok {
_, ok = workersEnqueued[worker.workerID]
}
s.stopTasks.Unlock()
if ok {
continue
}
ok, err := worker.add(ctx, region, true)
if err != nil {
firstErr = err
enqueued = false
return false
}
if !ok {
enqueued = false
// It is likely the store is busy, no need to try other workers in this store now.
break
}
s.stopTasks.Lock()
if s.stopTasks.enqueuedWorkers == nil {
s.stopTasks.enqueuedWorkers = make(map[SubscriptionID]map[uint64]struct{})
}
workersEnqueued = s.stopTasks.enqueuedWorkers[subID]
if workersEnqueued == nil {
workersEnqueued = make(map[uint64]struct{})
s.stopTasks.enqueuedWorkers[subID] = workersEnqueued
}
workersEnqueued[worker.workerID] = struct{}{}
s.stopTasks.Unlock()
}
return true
})
if enqueued {
s.stopTasks.Lock()
if s.stopTasks.enqueuedWorkers != nil {
delete(s.stopTasks.enqueuedWorkers, subID)
}
s.stopTasks.Unlock()
}
return enqueued, firstErr
}
func (s *subscriptionClient) enqueueRegionToAllStores(ctx context.Context, region regionInfo) (bool, error) {
subID := region.subscribedSpan.subID
enqueued := true
var firstErr error
s.stores.Range(func(_ any, value any) bool {
rs := value.(*requestedStore)
rs.requestWorkers.RLock()
workers := rs.requestWorkers.s
rs.requestWorkers.RUnlock()
for _, worker := range workers {
s.stopTasks.Lock()
workersEnqueued, ok := s.stopTasks.enqueuedWorkers[subID]
if ok {
_, ok = workersEnqueued[worker.workerID]
}
s.stopTasks.Unlock()
if ok {
continue
}
ok, err := worker.add(ctx, region, true)
if err != nil {
firstErr = err
enqueued = false
s.stopTasks.Lock()
if s.stopTasks.enqueuedWorkers != nil {
delete(s.stopTasks.enqueuedWorkers, subID)
}
s.stopTasks.Unlock()
return false
}
if !ok {
enqueued = false
// It is likely the store is busy, no need to try other workers in this store now.
break
}
s.stopTasks.Lock()
if s.stopTasks.enqueuedWorkers == nil {
s.stopTasks.enqueuedWorkers = make(map[SubscriptionID]map[uint64]struct{})
}
workersEnqueued = s.stopTasks.enqueuedWorkers[subID]
if workersEnqueued == nil {
workersEnqueued = make(map[uint64]struct{})
s.stopTasks.enqueuedWorkers[subID] = workersEnqueued
}
workersEnqueued[worker.workerID] = struct{}{}
s.stopTasks.Unlock()
}
return true
})
if enqueued {
s.stopTasks.Lock()
if s.stopTasks.enqueuedWorkers != nil {
delete(s.stopTasks.enqueuedWorkers, subID)
}
s.stopTasks.Unlock()
}
return enqueued, firstErr
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/subscription_client.go` around lines 644 - 698,
enqueueRegionToAllStores records worker IDs in
s.stopTasks.enqueuedWorkers[subID] but if worker.add returns an error (firstErr
set) the function returns immediately without removing the partially-populated
map entry; update enqueueRegionToAllStores to perform defensive cleanup before
returning on error: acquire s.stopTasks lock, delete
s.stopTasks.enqueuedWorkers[subID] (or undo only the entries added by this call)
and then release the lock, ensuring any early return paths that set firstErr
remove the subID entry so enqueuedWorkers is not left stale; reference the
symbols enqueueRegionToAllStores, s.stopTasks.enqueuedWorkers, subID, firstErr
and the worker.add error-return path to locate where to insert the cleanup.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
logservice/logpuller/subscription_client.go (1)

597-601: ⚠️ Potential issue | 🟠 Major

No backoff before retry when workers are busy

When enqueueRegionToAllStores returns enqueued=false, the task is re-pushed immediately to regionTaskQueue. If the queue is otherwise empty, Pop returns it right away, causing a tight polling loop that burns CPU while waiting for workers to drain their caches.

Consider adding a brief delay before re-pushing:

 			if !enqueued {
 				log.Debug("enqueue stop request failed, retry later",
 					zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)))
+				if err := util.Hang(ctx, 10*time.Millisecond); err != nil {
+					return err
+				}
 				s.regionTaskQueue.Push(regionTask)
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/subscription_client.go` around lines 597 - 601, When
enqueueRegionToAllStores returns enqueued == false the code immediately calls
s.regionTaskQueue.Push(regionTask) which can create a tight loop; modify the
handler in subscription_client.go (the code surrounding
enqueueRegionToAllStores, regionTaskQueue.Push and regionTask) to wait briefly
before re-pushing—e.g., apply a short fixed sleep or exponential backoff with
jitter (using time.Sleep and a small cap) when enqueued is false, then re-push
the task; ensure the backoff is per-task (not global) so Pop/Push semantics
remain unchanged and avoid blocking worker shutdown paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 597-601: When enqueueRegionToAllStores returns enqueued == false
the code immediately calls s.regionTaskQueue.Push(regionTask) which can create a
tight loop; modify the handler in subscription_client.go (the code surrounding
enqueueRegionToAllStores, regionTaskQueue.Push and regionTask) to wait briefly
before re-pushing—e.g., apply a short fixed sleep or exponential backoff with
jitter (using time.Sleep and a small cap) when enqueued is false, then re-push
the task; ensure the backoff is per-task (not global) so Pop/Push semantics
remain unchanged and avoid blocking worker shutdown paths.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ba00712 and 98074a7.

📒 Files selected for processing (2)
  • logservice/logpuller/subscription_client.go
  • logservice/logpuller/subscription_client_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • logservice/logpuller/subscription_client_test.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
logservice/logpuller/region_req_cache_test.go (1)

87-87: ⚠️ Potential issue | 🟡 Minor

Typo: "ann" should be "and".

📝 Fix
-	// consume the pending queue ann add with force
+	// consume the pending queue and add with force
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_req_cache_test.go` at line 87, Fix the typo in
the test comment "consume the pending queue ann add with force" by replacing
"ann" with "and" so it reads "consume the pending queue and add with force";
update the comment near the test that contains that exact string to correct the
wording.
🧹 Nitpick comments (1)
logservice/logpuller/region_req_cache.go (1)

100-100: Consider renaming local variable to avoid shadowing the package constant.

addReqRetryLimit := addReqRetryLimit shadows the package-level constant with the same name, which can be confusing. Consider using a distinct name like retriesRemaining.

♻️ Suggested rename
-	addReqRetryLimit := addReqRetryLimit
+	retriesRemaining := addReqRetryLimit

 	for {
 		current := c.pendingCount.Load()
 		if current < c.maxPendingCount || force {
 			// Try to add the request
 			req := newRegionReq(region)
 			select {
 			case <-ctx.Done():
 				return false, ctx.Err()
 			case c.pendingQueue <- req:
 				c.pendingCount.Inc()
 				cost := time.Since(start)
 				metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds())
 				return true, nil
 			case <-c.spaceAvailable:
 				continue
 			case <-ticker.C:
-				addReqRetryLimit--
-				if addReqRetryLimit <= 0 {
+				retriesRemaining--
+				if retriesRemaining <= 0 {
 					return false, nil
 				}
 				continue
 			}
 		}

 		// Wait for space to become available
 		select {
 		case <-ticker.C:
-			addReqRetryLimit--
-			if addReqRetryLimit <= 0 {
+			retriesRemaining--
+			if retriesRemaining <= 0 {
 				return false, nil
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_req_cache.go` at line 100, The local variable
currently declared as addReqRetryLimit := addReqRetryLimit shadows the
package-level constant and is confusing; rename the local variable (for example
to retriesRemaining or localAddReqRetryLimit) wherever it's declared/used in
region_req_cache.go (look for the identifier addReqRetryLimit in the scope
around the request-retry logic) and update all references to use the new name so
the package-level constant remains unshadowed and the intent is clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@logservice/logpuller/region_req_cache_test.go`:
- Line 87: Fix the typo in the test comment "consume the pending queue ann add
with force" by replacing "ann" with "and" so it reads "consume the pending queue
and add with force"; update the comment near the test that contains that exact
string to correct the wording.

---

Nitpick comments:
In `@logservice/logpuller/region_req_cache.go`:
- Line 100: The local variable currently declared as addReqRetryLimit :=
addReqRetryLimit shadows the package-level constant and is confusing; rename the
local variable (for example to retriesRemaining or localAddReqRetryLimit)
wherever it's declared/used in region_req_cache.go (look for the identifier
addReqRetryLimit in the scope around the request-retry logic) and update all
references to use the new name so the package-level constant remains unshadowed
and the intent is clear.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 98074a7 and 451baff.

📒 Files selected for processing (2)
  • logservice/logpuller/region_req_cache.go
  • logservice/logpuller/region_req_cache_test.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-triage-completed release-note Denotes a PR that will be considered when it comes time to generate release notes. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Some regions failed to be deregistered after unregistering the subscription

1 participant