logpuller: fix region leak after remove subscription#4267
logpuller: fix region leak after remove subscription#4267
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a region leak that occurred when subscriptions were removed in the logpuller component. It refactors the handling of stopped regions to ensure that pending request counts are correctly decremented and resources are released, preventing resource exhaustion and improving system stability during subscription lifecycle management. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughCentralizes request-slot accounting by adding Changes
Sequence DiagramsequenceDiagram
participant SC as SubscriptionClient
participant Store as Store (store with workers)
participant W as StoreWorker
participant RC as RequestCache
SC->>SC: enqueueRegionToAllStores(region)
SC->>Store: for each store
Store->>W: for each worker: add(ctx, region, stop=true)
alt worker busy (cache full)
W-->>Store: ok==false
Note over Store: skip remaining workers in this store
else enqueued
W-->>Store: ok==true
end
Store-->>SC: (enqueued/busy/err)
alt err encountered
SC-->>SC: return (false, err)
else all enqueued
SC-->>SC: return (true, nil)
end
Note over W,RC: when a request completes or is removed
W->>RC: markDone()
RC->>RC: pendingCount atomic.Dec (clamped >=0)
RC-->>W: non-blocking signal on spaceAvailable (if any waiter)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request effectively addresses a resource leak issue in the logpuller by ensuring the pending region request counter is correctly decremented in all relevant scenarios, particularly for stopped or canceled subscriptions. The introduction of the markDone helper function and its consistent application is a solid improvement. Additionally, the new logic to retry enqueuing stop requests enhances the system's robustness. The changes are logical and well-supported by a new unit test. I have one minor suggestion to optimize the process of enqueuing stop requests.
| if !ok { | ||
| enqueued = false | ||
| } |
There was a problem hiding this comment.
When worker.add returns !ok, it indicates the worker's queue is full. In this case, it's more efficient to break out of the inner loop that iterates over workers for the current store. Since the entire enqueueRegionToAllStores operation will be retried later if enqueued is false, continuing to try other workers for the same (and likely busy) store is unnecessary. This change is a small optimization to avoid redundant work.
if !ok {
enqueued = false
break
}There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
logservice/logpuller/region_req_cache.go (1)
305-319:markStopped/resolveduplicate themarkDonenotification tail — consider extracting
markStopped(lines 179–184) andresolve(lines 204–216) both calldecPendingCount()followed by an identical non-blockingspaceAvailablesend — exactly whatmarkDonedoes. Since both methods handlesentRequestsremoval separately, they could end withc.markDone()instead of inlining the same two-step pattern, eliminating the duplication.♻️ Proposed refactor for `markStopped` and `resolve`
func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) { ... delete(regionReqs, regionID) - c.decPendingCount() - select { - case c.spaceAvailable <- struct{}{}: - default: - } + c.markDone() } // inside resolve, replace the tail: - c.decPendingCount() - ... - select { - case c.spaceAvailable <- struct{}{}: - default: - } + c.markDone()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_req_cache.go` around lines 305 - 319, Replace the duplicated tail logic in markStopped and resolve with a call to the existing markDone method: both functions currently call decPendingCount() and then perform the same non-blocking send to c.spaceAvailable; remove those two lines in markStopped and resolve and invoke c.markDone() instead to centralize pending-count decrement and notification logic (keep existing sentRequests removal logic in markStopped/resolve intact).logservice/logpuller/region_request_worker.go (1)
353-356:regionReqwraps a freshcreateTime, discarding the original enqueue time
regionReq := newRegionReq(region)(line 355) re-timestamps the pre-fetched region withtime.Now(). The original enqueue time fromwaitForPreFetchingis lost, soRegionRequestFinishScanDurationmetrics will slightly undercount latency (the time waiting in the prefetch queue is excluded).This is minor, but if accurate latency tracking matters, you can pass the original
reqthroughpreFetchForConnecting(or store bothregionInfoandcreateTime):💡 Preserving original timestamp
-// preFetchForConnecting *regionInfo +// preFetchForConnecting *regionReq // keep original createTime // in waitForPreFetching: -worker.preFetchForConnecting = new(regionInfo) -*worker.preFetchForConnecting = req.regionInfo +worker.preFetchForConnecting = new(regionReq) +*worker.preFetchForConnecting = req // in processRegionSendTask: -region := *s.preFetchForConnecting -s.preFetchForConnecting = nil -regionReq := newRegionReq(region) +regionReq := *s.preFetchForConnecting +s.preFetchForConnecting = nil +region := regionReq.regionInfo🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_request_worker.go` around lines 353 - 356, preFetchForConnecting currently holds only a regionInfo which is re-wrapped with newRegionReq(region) losing the original enqueue timestamp and skewing RegionRequestFinishScanDuration; modify preFetchForConnecting to carry the original request timestamp (or the entire RegionReq) from waitForPreFetching and use that preserved createTime when constructing regionReq instead of calling newRegionReq with a fresh time so the original enqueue time is retained for accurate latency metrics (update references in waitForPreFetching, preFetchForConnecting, and where regionReq is consumed).logservice/logpuller/subscription_client_test.go (1)
219-261:TestEnqueueRegionToAllStoresNoRedundantStopRequests— LGTM; consider adding an error-path testThe deduplication invariant (worker1's already-enqueued stop is not re-sent on retry) is correctly validated. For completeness, consider a third test covering the
worker.adderror path (returning a non-nil error) to assert thatenqueueRegionToAllStoresreturns(false, err)and thatenqueuedWorkersentries are not leaked (since the cleanup at lines 690–695 ofsubscription_client.gois skipped on error).Would you like me to draft a
TestEnqueueRegionToAllStoresErrorPathtest? I can also open a new issue to track this gap.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client_test.go` around lines 219 - 261, Add a new test (e.g., TestEnqueueRegionToAllStoresErrorPath) that simulates regionRequestWorker.add returning a non-nil error and asserts that enqueueRegionToAllStores(ctx, region) returns (false, err) and does not leak enqueued worker state; specifically, create a subscriptionClient with two regionRequestWorker instances, make one worker's add return an error (inject by overriding or wrapping regionRequestWorker.add), call enqueueRegionToAllStores and require a non-nil error and enqueued==false, and finally verify no workers were left marked/enqueued (check pendingQueue lengths on both workers and that client.stores still contains the store and that no per-worker enqueued flags/entries were added).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 644-698: enqueueRegionToAllStores records worker IDs in
s.stopTasks.enqueuedWorkers[subID] but if worker.add returns an error (firstErr
set) the function returns immediately without removing the partially-populated
map entry; update enqueueRegionToAllStores to perform defensive cleanup before
returning on error: acquire s.stopTasks lock, delete
s.stopTasks.enqueuedWorkers[subID] (or undo only the entries added by this call)
and then release the lock, ensuring any early return paths that set firstErr
remove the subID entry so enqueuedWorkers is not left stale; reference the
symbols enqueueRegionToAllStores, s.stopTasks.enqueuedWorkers, subID, firstErr
and the worker.add error-return path to locate where to insert the cleanup.
- Around line 600-608: When enqueueRegionToAllStores returns enqueued==false we
currently re-push regionTask immediately (regionTaskQueue.Push(regionTask)),
which causes tight retry spinning and high CPU when workers are busy; modify the
retry branch in the function that calls enqueueRegionToAllStores to add a small
backoff before re-pushing (e.g. time.Sleep with a few milliseconds or use a
non-blocking time.After/select) so that regionTask isn't immediately popped
again; update the block around enqueueRegionToAllStores, regionTaskQueue.Push,
and regionTask to perform the delay only on the enqueued==false path and keep
existing logging intact.
---
Nitpick comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 305-319: Replace the duplicated tail logic in markStopped and
resolve with a call to the existing markDone method: both functions currently
call decPendingCount() and then perform the same non-blocking send to
c.spaceAvailable; remove those two lines in markStopped and resolve and invoke
c.markDone() instead to centralize pending-count decrement and notification
logic (keep existing sentRequests removal logic in markStopped/resolve intact).
In `@logservice/logpuller/region_request_worker.go`:
- Around line 353-356: preFetchForConnecting currently holds only a regionInfo
which is re-wrapped with newRegionReq(region) losing the original enqueue
timestamp and skewing RegionRequestFinishScanDuration; modify
preFetchForConnecting to carry the original request timestamp (or the entire
RegionReq) from waitForPreFetching and use that preserved createTime when
constructing regionReq instead of calling newRegionReq with a fresh time so the
original enqueue time is retained for accurate latency metrics (update
references in waitForPreFetching, preFetchForConnecting, and where regionReq is
consumed).
In `@logservice/logpuller/subscription_client_test.go`:
- Around line 219-261: Add a new test (e.g.,
TestEnqueueRegionToAllStoresErrorPath) that simulates regionRequestWorker.add
returning a non-nil error and asserts that enqueueRegionToAllStores(ctx, region)
returns (false, err) and does not leak enqueued worker state; specifically,
create a subscriptionClient with two regionRequestWorker instances, make one
worker's add return an error (inject by overriding or wrapping
regionRequestWorker.add), call enqueueRegionToAllStores and require a non-nil
error and enqueued==false, and finally verify no workers were left
marked/enqueued (check pendingQueue lengths on both workers and that
client.stores still contains the store and that no per-worker enqueued
flags/entries were added).
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
logservice/logpuller/region_req_cache.gologservice/logpuller/region_request_worker.gologservice/logpuller/subscription_client.gologservice/logpuller/subscription_client_test.go
| enqueued, err := s.enqueueRegionToAllStores(ctx, region) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if !enqueued { | ||
| log.Debug("enqueue stop request failed, retry later", | ||
| zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID))) | ||
| s.regionTaskQueue.Push(regionTask) | ||
| } |
There was a problem hiding this comment.
Tight retry spin when workers are busy may cause high CPU burn
When enqueueRegionToAllStores returns enqueued=false (all cache slots occupied), the task is re-pushed to regionTaskQueue with no delay. If the queue is otherwise empty, Pop returns it immediately, and the 3 ms of back-off inside add is the only throttle, resulting in a busy-polling loop that burns CPU while waiting for a worker to drain its cache.
Consider adding a brief sleep before re-pushing, or using a small time.After in the retry branch:
💡 Suggested mitigation
if !enqueued {
log.Debug("enqueue stop request failed, retry later",
zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)))
+ if err := util.Hang(ctx, 10*time.Millisecond); err != nil {
+ return err
+ }
s.regionTaskQueue.Push(regionTask)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@logservice/logpuller/subscription_client.go` around lines 600 - 608, When
enqueueRegionToAllStores returns enqueued==false we currently re-push regionTask
immediately (regionTaskQueue.Push(regionTask)), which causes tight retry
spinning and high CPU when workers are busy; modify the retry branch in the
function that calls enqueueRegionToAllStores to add a small backoff before
re-pushing (e.g. time.Sleep with a few milliseconds or use a non-blocking
time.After/select) so that regionTask isn't immediately popped again; update the
block around enqueueRegionToAllStores, regionTaskQueue.Push, and regionTask to
perform the delay only on the enqueued==false path and keep existing logging
intact.
| func (s *subscriptionClient) enqueueRegionToAllStores(ctx context.Context, region regionInfo) (bool, error) { | ||
| subID := region.subscribedSpan.subID | ||
| enqueued := true | ||
| var firstErr error | ||
| s.stores.Range(func(_ any, value any) bool { | ||
| rs := value.(*requestedStore) | ||
| rs.requestWorkers.RLock() | ||
| workers := rs.requestWorkers.s | ||
| rs.requestWorkers.RUnlock() | ||
| for _, worker := range workers { | ||
| s.stopTasks.Lock() | ||
| workersEnqueued, ok := s.stopTasks.enqueuedWorkers[subID] | ||
| if ok { | ||
| _, ok = workersEnqueued[worker.workerID] | ||
| } | ||
| s.stopTasks.Unlock() | ||
| if ok { | ||
| continue | ||
| } | ||
|
|
||
| ok, err := worker.add(ctx, region, true) | ||
| if err != nil { | ||
| firstErr = err | ||
| enqueued = false | ||
| return false | ||
| } | ||
| if !ok { | ||
| enqueued = false | ||
| // It is likely the store is busy, no need to try other workers in this store now. | ||
| break | ||
| } | ||
|
|
||
| s.stopTasks.Lock() | ||
| if s.stopTasks.enqueuedWorkers == nil { | ||
| s.stopTasks.enqueuedWorkers = make(map[SubscriptionID]map[uint64]struct{}) | ||
| } | ||
| workersEnqueued = s.stopTasks.enqueuedWorkers[subID] | ||
| if workersEnqueued == nil { | ||
| workersEnqueued = make(map[uint64]struct{}) | ||
| s.stopTasks.enqueuedWorkers[subID] = workersEnqueued | ||
| } | ||
| workersEnqueued[worker.workerID] = struct{}{} | ||
| s.stopTasks.Unlock() | ||
| } | ||
| return true | ||
| }) | ||
| if enqueued { | ||
| s.stopTasks.Lock() | ||
| if s.stopTasks.enqueuedWorkers != nil { | ||
| delete(s.stopTasks.enqueuedWorkers, subID) | ||
| } | ||
| s.stopTasks.Unlock() | ||
| } | ||
| return enqueued, firstErr | ||
| } |
There was a problem hiding this comment.
enqueuedWorkers[subID] is never cleaned up on the error-return path
When firstErr != nil (line 666), the function returns (false, firstErr) immediately. Any workers that were already recorded in enqueuedWorkers[subID] during this invocation remain there permanently. In practice this is harmless because handleRegions propagates the error and the entire client shuts down (line 601–603), but it does make the cleanup logic asymmetric and would cause a stale-map entry if the shutdown path were ever changed to be recoverable.
🛡️ Defensive cleanup on error path
ok, err := worker.add(ctx, region, true)
if err != nil {
firstErr = err
enqueued = false
+ s.stopTasks.Lock()
+ if s.stopTasks.enqueuedWorkers != nil {
+ delete(s.stopTasks.enqueuedWorkers, subID)
+ }
+ s.stopTasks.Unlock()
return false
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (s *subscriptionClient) enqueueRegionToAllStores(ctx context.Context, region regionInfo) (bool, error) { | |
| subID := region.subscribedSpan.subID | |
| enqueued := true | |
| var firstErr error | |
| s.stores.Range(func(_ any, value any) bool { | |
| rs := value.(*requestedStore) | |
| rs.requestWorkers.RLock() | |
| workers := rs.requestWorkers.s | |
| rs.requestWorkers.RUnlock() | |
| for _, worker := range workers { | |
| s.stopTasks.Lock() | |
| workersEnqueued, ok := s.stopTasks.enqueuedWorkers[subID] | |
| if ok { | |
| _, ok = workersEnqueued[worker.workerID] | |
| } | |
| s.stopTasks.Unlock() | |
| if ok { | |
| continue | |
| } | |
| ok, err := worker.add(ctx, region, true) | |
| if err != nil { | |
| firstErr = err | |
| enqueued = false | |
| return false | |
| } | |
| if !ok { | |
| enqueued = false | |
| // It is likely the store is busy, no need to try other workers in this store now. | |
| break | |
| } | |
| s.stopTasks.Lock() | |
| if s.stopTasks.enqueuedWorkers == nil { | |
| s.stopTasks.enqueuedWorkers = make(map[SubscriptionID]map[uint64]struct{}) | |
| } | |
| workersEnqueued = s.stopTasks.enqueuedWorkers[subID] | |
| if workersEnqueued == nil { | |
| workersEnqueued = make(map[uint64]struct{}) | |
| s.stopTasks.enqueuedWorkers[subID] = workersEnqueued | |
| } | |
| workersEnqueued[worker.workerID] = struct{}{} | |
| s.stopTasks.Unlock() | |
| } | |
| return true | |
| }) | |
| if enqueued { | |
| s.stopTasks.Lock() | |
| if s.stopTasks.enqueuedWorkers != nil { | |
| delete(s.stopTasks.enqueuedWorkers, subID) | |
| } | |
| s.stopTasks.Unlock() | |
| } | |
| return enqueued, firstErr | |
| } | |
| func (s *subscriptionClient) enqueueRegionToAllStores(ctx context.Context, region regionInfo) (bool, error) { | |
| subID := region.subscribedSpan.subID | |
| enqueued := true | |
| var firstErr error | |
| s.stores.Range(func(_ any, value any) bool { | |
| rs := value.(*requestedStore) | |
| rs.requestWorkers.RLock() | |
| workers := rs.requestWorkers.s | |
| rs.requestWorkers.RUnlock() | |
| for _, worker := range workers { | |
| s.stopTasks.Lock() | |
| workersEnqueued, ok := s.stopTasks.enqueuedWorkers[subID] | |
| if ok { | |
| _, ok = workersEnqueued[worker.workerID] | |
| } | |
| s.stopTasks.Unlock() | |
| if ok { | |
| continue | |
| } | |
| ok, err := worker.add(ctx, region, true) | |
| if err != nil { | |
| firstErr = err | |
| enqueued = false | |
| s.stopTasks.Lock() | |
| if s.stopTasks.enqueuedWorkers != nil { | |
| delete(s.stopTasks.enqueuedWorkers, subID) | |
| } | |
| s.stopTasks.Unlock() | |
| return false | |
| } | |
| if !ok { | |
| enqueued = false | |
| // It is likely the store is busy, no need to try other workers in this store now. | |
| break | |
| } | |
| s.stopTasks.Lock() | |
| if s.stopTasks.enqueuedWorkers == nil { | |
| s.stopTasks.enqueuedWorkers = make(map[SubscriptionID]map[uint64]struct{}) | |
| } | |
| workersEnqueued = s.stopTasks.enqueuedWorkers[subID] | |
| if workersEnqueued == nil { | |
| workersEnqueued = make(map[uint64]struct{}) | |
| s.stopTasks.enqueuedWorkers[subID] = workersEnqueued | |
| } | |
| workersEnqueued[worker.workerID] = struct{}{} | |
| s.stopTasks.Unlock() | |
| } | |
| return true | |
| }) | |
| if enqueued { | |
| s.stopTasks.Lock() | |
| if s.stopTasks.enqueuedWorkers != nil { | |
| delete(s.stopTasks.enqueuedWorkers, subID) | |
| } | |
| s.stopTasks.Unlock() | |
| } | |
| return enqueued, firstErr | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@logservice/logpuller/subscription_client.go` around lines 644 - 698,
enqueueRegionToAllStores records worker IDs in
s.stopTasks.enqueuedWorkers[subID] but if worker.add returns an error (firstErr
set) the function returns immediately without removing the partially-populated
map entry; update enqueueRegionToAllStores to perform defensive cleanup before
returning on error: acquire s.stopTasks lock, delete
s.stopTasks.enqueuedWorkers[subID] (or undo only the entries added by this call)
and then release the lock, ensuring any early return paths that set firstErr
remove the subID entry so enqueuedWorkers is not left stale; reference the
symbols enqueueRegionToAllStores, s.stopTasks.enqueuedWorkers, subID, firstErr
and the worker.add error-return path to locate where to insert the cleanup.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
logservice/logpuller/subscription_client.go (1)
597-601:⚠️ Potential issue | 🟠 MajorNo backoff before retry when workers are busy
When
enqueueRegionToAllStoresreturnsenqueued=false, the task is re-pushed immediately toregionTaskQueue. If the queue is otherwise empty,Popreturns it right away, causing a tight polling loop that burns CPU while waiting for workers to drain their caches.Consider adding a brief delay before re-pushing:
if !enqueued { log.Debug("enqueue stop request failed, retry later", zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID))) + if err := util.Hang(ctx, 10*time.Millisecond); err != nil { + return err + } s.regionTaskQueue.Push(regionTask) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client.go` around lines 597 - 601, When enqueueRegionToAllStores returns enqueued == false the code immediately calls s.regionTaskQueue.Push(regionTask) which can create a tight loop; modify the handler in subscription_client.go (the code surrounding enqueueRegionToAllStores, regionTaskQueue.Push and regionTask) to wait briefly before re-pushing—e.g., apply a short fixed sleep or exponential backoff with jitter (using time.Sleep and a small cap) when enqueued is false, then re-push the task; ensure the backoff is per-task (not global) so Pop/Push semantics remain unchanged and avoid blocking worker shutdown paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 597-601: When enqueueRegionToAllStores returns enqueued == false
the code immediately calls s.regionTaskQueue.Push(regionTask) which can create a
tight loop; modify the handler in subscription_client.go (the code surrounding
enqueueRegionToAllStores, regionTaskQueue.Push and regionTask) to wait briefly
before re-pushing—e.g., apply a short fixed sleep or exponential backoff with
jitter (using time.Sleep and a small cap) when enqueued is false, then re-push
the task; ensure the backoff is per-task (not global) so Pop/Push semantics
remain unchanged and avoid blocking worker shutdown paths.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
logservice/logpuller/subscription_client.gologservice/logpuller/subscription_client_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- logservice/logpuller/subscription_client_test.go
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
logservice/logpuller/region_req_cache_test.go (1)
87-87:⚠️ Potential issue | 🟡 MinorTypo: "ann" should be "and".
📝 Fix
- // consume the pending queue ann add with force + // consume the pending queue and add with force🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_req_cache_test.go` at line 87, Fix the typo in the test comment "consume the pending queue ann add with force" by replacing "ann" with "and" so it reads "consume the pending queue and add with force"; update the comment near the test that contains that exact string to correct the wording.
🧹 Nitpick comments (1)
logservice/logpuller/region_req_cache.go (1)
100-100: Consider renaming local variable to avoid shadowing the package constant.
addReqRetryLimit := addReqRetryLimitshadows the package-level constant with the same name, which can be confusing. Consider using a distinct name likeretriesRemaining.♻️ Suggested rename
- addReqRetryLimit := addReqRetryLimit + retriesRemaining := addReqRetryLimit for { current := c.pendingCount.Load() if current < c.maxPendingCount || force { // Try to add the request req := newRegionReq(region) select { case <-ctx.Done(): return false, ctx.Err() case c.pendingQueue <- req: c.pendingCount.Inc() cost := time.Since(start) metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) return true, nil case <-c.spaceAvailable: continue case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { + retriesRemaining-- + if retriesRemaining <= 0 { return false, nil } continue } } // Wait for space to become available select { case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { + retriesRemaining-- + if retriesRemaining <= 0 { return false, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_req_cache.go` at line 100, The local variable currently declared as addReqRetryLimit := addReqRetryLimit shadows the package-level constant and is confusing; rename the local variable (for example to retriesRemaining or localAddReqRetryLimit) wherever it's declared/used in region_req_cache.go (look for the identifier addReqRetryLimit in the scope around the request-retry logic) and update all references to use the new name so the package-level constant remains unshadowed and the intent is clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@logservice/logpuller/region_req_cache_test.go`:
- Line 87: Fix the typo in the test comment "consume the pending queue ann add
with force" by replacing "ann" with "and" so it reads "consume the pending queue
and add with force"; update the comment near the test that contains that exact
string to correct the wording.
---
Nitpick comments:
In `@logservice/logpuller/region_req_cache.go`:
- Line 100: The local variable currently declared as addReqRetryLimit :=
addReqRetryLimit shadows the package-level constant and is confusing; rename the
local variable (for example to retriesRemaining or localAddReqRetryLimit)
wherever it's declared/used in region_req_cache.go (look for the identifier
addReqRetryLimit in the scope around the request-retry logic) and update all
references to use the new name so the package-level constant remains unshadowed
and the intent is clear.
What problem does this PR solve?
Issue Number: close #4217
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Refactor
Tests