Skip to content

Comments

[WIP]#4242

Open
lidezhu wants to merge 1 commit intomasterfrom
ldz/add-pipeline0223
Open

[WIP]#4242
lidezhu wants to merge 1 commit intomasterfrom
ldz/add-pipeline0223

Conversation

@lidezhu
Copy link
Collaborator

@lidezhu lidezhu commented Feb 23, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

Release Notes

  • Documentation

    • Added comprehensive design documentation for the Log Puller event delivery system.
  • Refactor

    • Improved the event delivery pipeline to enforce stricter ordering guarantees and coordinate data persistence with timestamp resolution.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 23, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign tenfyzhong for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Feb 23, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 23, 2026

📝 Walkthrough

Walkthrough

This PR replaces the log puller's dynstream-based event delivery system with a new Span Pipeline architecture featuring a sharded, multi-worker pipeline manager that coordinates data and resolved-timestamp delivery across subscriptions with per-subscription state machines, quota-bounded in-flight data, and strict resolution barriers ensuring resolved-ts advances only after prior data is persisted.

Changes

Cohort / File(s) Summary
Span Pipeline Core
logservice/logpuller/span_pipeline.go, logservice/logpuller/span_pipeline_test.go
Introduces new spanPipelineManager with per-subscription worker goroutines, global quota semaphore for in-flight data, and state machine (spanPipelineState) managing nextDataSeq, ackedSeq, pendingResolved barriers. Implements Data/Resolved/Persisted event types with barrier compaction. Tests verify out-of-order persistence, resolved merge behavior, and multi-subscription independence.
Region Event Processing
logservice/logpuller/region_event.go, logservice/logpuller/region_event_processor.go, logservice/logpuller/region_event_metrics.go
Introduces regionEvent type, regionEventProcessor with worker pool for sharded region event dispatch, and per-region state tracking. Converts region entries to KV entries, updates resolved-ts, and coordinates with span pipeline. Adds metrics counters for resolved_ts and event tracking.
Integration & Refactoring
logservice/logpuller/subscription_client.go, logservice/logpuller/region_request_worker.go
Replaces dynstream with new pipeline and regionEventProcessor fields in SubscriptionClient. Updates Subscribe/Unsubscribe flow and metric tracking. Changes region_request_worker to dispatch events via dispatchRegionEvent instead of pushRegionEventToDS.
Test Updates & Removed Code
logservice/logpuller/region_event_handler_test.go, (removed) logservice/logpuller/region_event_handler.go
Removes region_event_handler.go (419 lines) containing prior event handling logic. Updates tests to use new spanPipelineManager and regionEventProcessor initialization instead of dynstream-based testing setup.
Metrics Expansion
pkg/metrics/log_puller.go
Adds seven new Prometheus metrics in log_puller_span_pipeline namespace: inflight_bytes, inflight_batches, pending_resolved_barriers, active_subscriptions, quota_acquire_duration_seconds, resolved_barrier_dropped_total, resolved_barrier_compaction_total.
Design Documentation
docs/design/2026-02-23-logpuller-span-pipeline.md
New design document detailing Span Pipeline architecture: per-subID state machine, event types, flushResolvedIfReady mechanism, barrier semantics, quota management, metrics, lifecycle, and relationship to region-based puller components.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Pipeline Manager
    participant Pipeline Worker
    participant Span State
    participant EventStore

    Client->>Pipeline Manager: EnqueueData(kvs)
    Pipeline Manager->>Pipeline Manager: Acquire Quota (weight)
    Pipeline Manager->>Pipeline Worker: Send Data Event
    Pipeline Worker->>Span State: Advance nextDataSeq
    Pipeline Worker->>Span State: Queue Persisted callback
    Span State-->>Client: Finish Callback

    Client->>Pipeline Manager: EnqueueResolved(ts)
    Pipeline Manager->>Pipeline Worker: Send Resolved Event
    Pipeline Worker->>Span State: Add Barrier (waitSeq, ts)
    Pipeline Worker->>Span State: Check flushResolvedIfReady
    alt All prior data acknowledged
        Span State->>EventStore: Advance resolved-ts
    else Waiting for persistence
        Span State->>Span State: Hold barrier pending
    end

    Client->>Pipeline Manager: Persisted Callback
    Pipeline Manager->>Pipeline Worker: Send Persisted Event
    Pipeline Worker->>Span State: Update ackedSeq, add to doneSet
    Pipeline Worker->>Pipeline Manager: Release Quota
    Pipeline Worker->>Span State: flushResolvedIfReady (retry)
    alt Now ready
        Span State->>EventStore: Advance resolved-ts
    end
Loading
sequenceDiagram
    participant Region Request Worker
    participant Region Event Processor
    participant Processor Worker
    participant Span Pipeline
    
    Region Request Worker->>Region Event Processor: dispatch(regionEvent)
    Region Event Processor->>Processor Worker: Shard by regionID to worker queue
    Processor Worker->>Processor Worker: handle(regionEvent)
    
    alt Event Type: Entries
        Processor Worker->>Processor Worker: appendKVEntriesFromRegionEntries
        Processor Worker->>Span Pipeline: EnqueueData(kvs)
    else Event Type: ResolvedTs
        Processor Worker->>Processor Worker: updateRegionResolvedTs
        Processor Worker->>Processor Worker: maybeAdvanceSpanResolvedTs
        Processor Worker->>Span Pipeline: EnqueueResolved(ts)
    else Event Type: RegionError
        Processor Worker->>Processor Worker: Handle error, reschedule if needed
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested labels

lgtm, size/XXL

Suggested reviewers

  • tenfyzhong
  • flowbehappy
  • wk989898
  • asddongmen

Poem

🐰 A pipeline springs forth, new and bright,
Replacing dynstreams in the night,
With barriers and quotas, state so tight,
Data flows resolved, in perfect sight! 🎀
The span pipeline hops to delight!

🚥 Pre-merge checks | ❌ 3

❌ Failed checks (2 warnings, 1 inconclusive)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is entirely unmodified template content with placeholder values; no concrete problem statement, change description, issue reference, test details, or release notes are provided. Fill in all required sections: provide a real issue reference, describe the span pipeline design and region event processor changes, confirm test coverage, and document any compatibility impacts and metrics additions.
Docstring Coverage ⚠️ Warning Docstring coverage is 12.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The PR title '[WIP]' is vague and non-descriptive; it does not convey meaningful information about the substantial pipeline and architecture changes in the changeset. Update the title to clearly describe the main change, such as 'Introduce span pipeline and region event processor for log puller' or similar.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ldz/add-pipeline0223

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the log puller's event delivery architecture by replacing the dynstream component with a custom-built 'Span Pipeline.' This change aims to improve the efficiency and correctness of event processing by enabling true pipelining of data and resolved-timestamp signals for individual subscription spans. The new design ensures that resolved timestamps only advance once all preceding data has been persisted, while also providing robust backpressure and memory control through a global quota system.

Highlights

  • Span Pipeline Introduction: Introduced a new 'Span Pipeline' as the core event delivery layer for the log puller, replacing the legacy dynstream implementation. This enables true pipelining of events (receive -> compress/group -> persist) for individual subscription spans.
  • Strict Resolved-TS Barrier Semantics: Implemented a strict resolved-timestamp barrier mechanism, ensuring that a resolved-ts signal is only advanced after all preceding data batches for that span (in puller receive order) have been successfully persisted.
  • New Concurrency Model: Adopted a shared worker pool (spanPipelineManager) to handle events. Events are routed by subID to a single worker goroutine, eliminating the need for per-span goroutines and associated locks, thereby simplifying state management.
  • Enhanced Backpressure and Memory Control: Replaced dynstream's pause/resume mechanism with a global semaphore-based quota system. This new system bounds in-flight data bytes during EnqueueData operations, preventing unbounded memory growth.
  • Refactored Event Processing Flow: The regionEventProcessor now directly dispatches region-level events (KV batches and resolved-ts signals) to the new Span Pipeline, streamlining the dataflow and removing the dependency on the old regionEventHandler.
  • Comprehensive Metrics: Added new Prometheus metrics to monitor the Span Pipeline's internal state, including in-flight bytes, in-flight batches, pending resolved barriers, active subscriptions, quota acquisition duration, and dropped/compacted barriers.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • docs/design/2026-02-23-logpuller-span-pipeline.md
    • Added a comprehensive design document detailing the Span Pipeline's architecture, semantics, concurrency model, and dataflow.
  • logservice/logpuller/region_event.go
    • Added a new file to define the regionEvent struct, centralizing its definition.
  • logservice/logpuller/region_event_handler.go
    • Removed the legacy regionEventHandler and its associated dynstream integration.
  • logservice/logpuller/region_event_handler_test.go
    • Updated tests to reflect the removal of dynstream and the introduction of the new pipeline components.
  • logservice/logpuller/region_event_metrics.go
    • Added a new file to define metrics specifically for region event processing.
  • logservice/logpuller/region_event_processor.go
    • Added a new component responsible for processing region-level events and dispatching them to the Span Pipeline.
  • logservice/logpuller/region_request_worker.go
    • Modified to dispatch region events to the new regionEventProcessor instead of the dynstream.
  • logservice/logpuller/span_pipeline.go
    • Added the core implementation of the Span Pipeline, including its manager, workers, state machine, and event handling logic.
  • logservice/logpuller/span_pipeline_test.go
    • Added unit tests to validate the functionality and semantics of the new Span Pipeline.
  • logservice/logpuller/subscription_client.go
    • Updated to integrate the new Span Pipeline and region event processor, removing all dynstream related code.
  • pkg/metrics/log_puller.go
    • Added new Prometheus metrics to monitor the Span Pipeline's internal state and performance.
Activity
  • The pull request is currently marked as Work In Progress (WIP), indicating that the author is still actively developing and refining the changes.
  • The description body is a template, suggesting that detailed information about the problem solved, specific changes, and testing has not yet been provided by the author.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 23, 2026

@lidezhu: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review aab7a8f link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 23, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the Span Pipeline, a new delivery layer for the log puller, replacing the legacy dynstream implementation. Its aim is to enable true pipelining of data batches and resolved-ts signals for subscription spans, removing serialization bottlenecks and improving performance. A critical security concern is that invalid external input from TiKV can trigger process-wide crashes via log.Fatal or log.Panic. Additionally, some panic statements log raw database events which may contain sensitive information (PII) without redaction. It is recommended to handle protocol violations more gracefully and ensure all logged data is properly redacted. Other feedback includes potential quota leaks during shutdown, a risk of stalling background threads in the EventStore due to blocking callbacks, and a minor logic issue in resolved-ts advancement that could affect stale lock detection.

Comment on lines +213 to +225
func (m *spanPipelineManager) enqueue(subID SubscriptionID, ev spanPipelineEvent) bool {
if len(m.workers) == 0 {
return false
}
idx := int(uint64(subID) % uint64(len(m.workers)))
w := m.workers[idx]
select {
case <-m.ctx.Done():
return false
case w.ch <- ev:
return true
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The enqueue method blocks if the worker's channel is full. Since enqueuePersisted is called from a persistence callback (likely from an EventStore background thread), blocking here can stall critical background processes in the EventStore (e.g., Pebble's write pipeline). Consider using a non-blocking enqueue with a fallback mechanism or a separate prioritized control channel for persistence signals to ensure callbacks return promptly.

resolvedTs := state.getLastResolvedTs()
if entry.CommitTs <= resolvedTs {
log.Fatal("The CommitTs must be greater than the resolvedTs",
zap.Int64("tableID", span.span.TableID),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The use of log.Fatal here can lead to a Denial of Service (DoS). If TiKV sends a COMMITTED event with a CommitTs less than or equal to the last resolvedTs, the entire TiCDC process will terminate. Since this data comes from an external component, it should be handled more gracefully, for example by stopping the affected changefeed instead of crashing the whole process.

continue
}
log.Fatal("prewrite not match",
zap.Int64("tableID", span.span.TableID),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The use of log.Fatal here can lead to a Denial of Service (DoS). If a COMMIT event is received without a matching PREWRITE, the entire TiCDC process will terminate. This external input should be handled gracefully to avoid process-wide crashes.

zap.String("EventType", "COMMIT"),
zap.Uint64("CommitTs", entry.CommitTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.String("key", util.RedactKey(entry.GetKey())))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The use of log.Fatal here can lead to a Denial of Service (DoS). If TiKV sends a COMMIT event with a CommitTs less than or equal to the last resolvedTs, the entire TiCDC process will terminate. This external input should be handled gracefully.


func (p *regionEventProcessor) dispatch(event regionEvent) {
if len(event.states) == 0 {
log.Panic("region event has empty states", zap.Any("event", event))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

Logging the entire event object in a panic statement can lead to sensitive data leakage. The regionEvent object contains raw database keys and values which are not redacted when logged via zap.Any. Avoid logging the entire object or ensure sensitive fields are redacted.

return
}

log.Panic("unknown region event type", zap.Any("event", event))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

Logging the entire event object in a panic statement can lead to sensitive data leakage. The regionEvent object contains raw database keys and values which are not redacted when logged via zap.Any.

opType = common.OpTypePut
default:
log.Panic("meet unknown op type", zap.Any("entry", entry))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

Logging the entire entry object in a panic statement can lead to sensitive data leakage. The cdcpb.Event_Row object contains raw database keys and values which are not redacted when logged via zap.Any.

}

func (event regionEvent) mustFirstState() *regionFeedState {
if len(event.states) == 0 || event.states[0] == nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

Logging the entire event object in a panic statement can lead to sensitive data leakage. The regionEvent object contains raw database keys and values which are not redacted when logged via zap.Any.

Comment on lines +204 to +211
func (m *spanPipelineManager) enqueuePersisted(subID SubscriptionID, seq uint64, weight int64) {
_ = m.enqueue(subID, spanPipelineEvent{
subID: subID,
typ: spanPipelineEventPersisted,
seq: seq,
weight: weight,
})
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If m.enqueue returns false (e.g., because the manager's context is cancelled during shutdown), the persistence signal is lost and the associated quota weight is never released. This could lead to a quota leak if the manager or semaphore is reused, or simply inconsistent metrics during shutdown.

func (m *spanPipelineManager) enqueuePersisted(subID SubscriptionID, seq uint64, weight int64) {
	if !m.enqueue(subID, spanPipelineEvent{
		subID:  subID,
		typ:    spanPipelineEventPersisted,
		seq:    seq,
		weight: weight,
	}) {
		m.releaseQuota(weight)
	}
}

Comment on lines +396 to +401
if lastResolvedTs == span.startTs {
span.resolvedTsUpdated.Store(time.Now().Unix())
return ts
}
return 0
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When ts == lastResolvedTs == span.startTs, the code repeatedly returns ts and updates span.resolvedTsUpdated. This behavior can hide a stuck region from the stale lock checker, as resolvedTsUpdated will keep being refreshed even though no progress is being made beyond the initial checkpoint. It should only return and update if it's the very first time the span is being initialized.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
logservice/logpuller/subscription_client.go (1)

402-410: ⚠️ Potential issue | 🟠 Major

Unregister is synchronous/blocking, but onTableDrained is called from onRegionFail which must not block.

pipeline.Unregister(rt.subID) (line 406) creates a channel and blocks waiting on it to be closed after the worker processes the unregister event. The underlying enqueue call uses a select with no default case, so it will block if the worker channel is full. However, onTableDrained can be reached from onRegionFail (line 418), which has an explicit comment on line 412: "don't block the caller, otherwise there may be deadlock".

This blocking call in onTableDrained violates the non-blocking guarantee on the onRegionFail path, creating a potential deadlock risk when worker channels experience backpressure.

Consider making the Unregister call in onTableDrained non-blocking — or refactor to avoid calling blocking operations on paths that must remain non-blocking.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/subscription_client.go` around lines 402 - 410, The call
to pipeline.Unregister in subscriptionClient.onTableDrained can block and thus
violate the non-blocking requirement of the onRegionFail path; make the
Unregister non-blocking by invoking s.pipeline.Unregister(rt.subID) in a
separate goroutine (e.g., go func(id uint64){ s.pipeline.Unregister(id)
}(rt.subID)) so onTableDrained returns immediately, and keep the rest of the
method (s.totalSpans.Lock/Unlock and delete) intact; alternatively, if you
prefer a library change, update pipeline.Unregister to use a non-blocking
enqueue (select with default) or accept a context/timeout and call it with a
short timeout from onTableDrained to avoid blocking.
🧹 Nitpick comments (7)
logservice/logpuller/region_request_worker.go (1)

209-210: Stale comment: still references "ds" (dynstream).

The comment says "dispatches them to ds" but dynstream has been removed. Update to reflect the new dispatch path.

✏️ Suggested fix
-// receiveAndDispatchChangeEventsToProcessor receives events from the grpc stream and dispatches them to ds.
+// receiveAndDispatchChangeEvents receives events from the grpc stream and dispatches them to the region event processor.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_request_worker.go` around lines 209 - 210, The
function comment for receiveAndDispatchChangeEvents (on type
regionRequestWorker) still says "dispatches them to ds" which is stale; update
the comment to reflect the current dispatch target (e.g., "dispatches them to
the processor" or the actual component now used) so it no longer references
dynstream/ds—edit the comment line above receiveAndDispatchChangeEvents to name
the new dispatch path/component consistent with how events are forwarded in the
function body.
logservice/logpuller/region_event_handler_test.go (1)

136-142: Consider using require.Fail instead of require.True(t, false, ...).

Multiple places (lines 139, 169, 201, 300, 305) use require.True(t, false, msg) to signal failure. require.Fail(t, msg) or require.FailNow(t, msg) is more idiomatic and communicates intent more clearly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_event_handler_test.go` around lines 136 - 142,
Replace the anti-idiomatic assertions that use require.True(t, false, ...) with
require.Fail or require.FailNow to clearly signal test failures; locate the
occurrences in region_event_handler_test.go where the select on eventCh uses
require.True(t, false, ...) (and the other similar asserts referencing
require.True) and change them to require.Fail(t, "unexpected event received") or
require.FailNow(t, "unexpected event received") depending on whether you want
the test to continue, ensuring the failure message describes the unexpected
event.
logservice/logpuller/region_event_processor.go (2)

77-84: Worker goroutines are fire-and-forget — no WaitGroup or errgroup for lifecycle tracking.

Workers are started with bare go w.run() and rely solely on ctx.Done() for shutdown. If the processor's owner needs to wait for clean shutdown (e.g., to ensure all in-flight events are drained), there's no mechanism for that.

Consider adding a sync.WaitGroup to track workers for graceful shutdown, or document that the caller must not depend on drain semantics.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_event_processor.go` around lines 77 - 84, The
worker goroutines in regionEventWorker are started with bare go w.run() and
aren't tracked, so add lifecycle tracking: add a sync.WaitGroup field (e.g., wg)
to the processor (or to regionEventWorker), call wg.Add(1) before starting each
worker in the loop that creates regionEventWorker (referencing workerCount,
p.workers), have run() call wg.Done() on exit (and continue to respect
ctx.Done()), and expose a Shutdown/Wait method on the processor that calls
wg.Wait() so callers can wait for all workers to drain; alternatively document
explicitly that there is no drain guarantee if you choose not to add the
WaitGroup.

159-172: EnqueueData blocks the worker goroutine under quota pressure, coupling unrelated subscriptions.

When EnqueueData blocks on the quota semaphore (line 171), all region events mapped to this worker (by regionID % workerCount) are stalled — including regions belonging to other subscriptions. This is a consequence of sharding by regionID in the processor but applying a global quota in the pipeline.

This is the designed behavior per the design doc, but operationally: a single slow-persisting subscription can delay resolved-ts progress for unrelated subscriptions whose regions happen to hash to the same worker.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_event_processor.go` around lines 159 - 172, The
worker goroutine is being blocked when calling
w.processor.pipeline.EnqueueData(span, kvs), stalling other regions mapped to
the same worker; change the call so EnqueueData does not block the worker:
capture local variables (span, kvs, state.region.ID/subscription id if needed)
and offload the pipeline.EnqueueData invocation to a separate goroutine or a
dedicated per-subscription goroutine/queue so the worker returns immediately
after metricsEventCount.Add(...); ensure context propagation by passing
w.processor.ctx (or a derived context) into the offloaded call and preserve
error handling/metrics in that goroutine rather than blocking inside
region_event_processor.go's handling of event.entries / mustFirstState /
subscribedSpan.
logservice/logpuller/subscription_client.go (1)

240-241: All-zero arguments rely on hidden defaults; consider making configuration explicit.

Passing 0, 0, 0 to newSpanPipelineManager silently falls through to runtime.GOMAXPROCS(0) workers, a 4096-element queue, and 1 GiB quota. Similarly for newRegionEventProcessor. This makes tuning opaque and disconnects the subscription client's config struct from the pipeline's parameters.

Consider wiring these through SubscriptionClientConfig or at least adding a brief comment explaining the defaults.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/subscription_client.go` around lines 240 - 241, The
constructor calls to newSpanPipelineManager(subClient.ctx, 0, 0, 0) and
newRegionEventProcessor(subClient.ctx, subClient, subClient.pipeline, 0, 0) rely
on implicit zero-argument defaults (GOMAXPROCS(0), large queue, 1GiB quota)
which hides configuration; update SubscriptionClient to surface these parameters
(e.g., add fields to SubscriptionClientConfig for workerCount, queueSize,
memoryQuota) and thread those config values into the calls to
newSpanPipelineManager and newRegionEventProcessor instead of passing zeros,
and/or add a concise comment next to the calls documenting the exact defaults
used if exposing config is not possible.
logservice/logpuller/span_pipeline.go (2)

277-291: Re-registration preserves stale pipeline state (sequences, barriers, doneSet).

If handleRegister is called for an already-registered subID, it only updates the span pointer (line 288) but retains the old nextDataSeq, ackedSeq, doneSet, and pendingResolved. If a subscription is ever re-registered (even accidentally), the stale sequence/barrier state will corrupt the ordering guarantees.

Since SubscriptionID is atomically incremented, this path shouldn't be hit in normal operation. But for defensiveness, consider either resetting the state on re-register or panicking to surface the unexpected condition.

♻️ Suggested defensive handling
 func (w *spanPipelineWorker) handleRegister(ev spanPipelineEvent) {
 	state := w.states[ev.subID]
 	if state == nil {
 		state = &spanPipelineState{
 			span:        ev.span,
 			nextDataSeq: 1,
 			ackedSeq:    0,
 		}
 		w.states[ev.subID] = state
 		metrics.LogPullerSpanPipelineActiveSubscriptions.Inc()
 	} else {
-		state.span = ev.span
+		log.Warn("span pipeline re-registration for existing subID, resetting state",
+			zap.Uint64("subID", uint64(ev.subID)))
+		state.span = ev.span
+		state.nextDataSeq = 1
+		state.ackedSeq = 0
+		state.doneSet = nil
+		state.pendingResolved = state.pendingResolved[:0]
+		state.pendingResolvedHead = 0
 	}
 	close(ev.doneCh)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/span_pipeline.go` around lines 277 - 291, handleRegister
currently only updates the span for an existing subscription and preserves stale
ordering state; update the handler in spanPipelineWorker::handleRegister to
defensively reset the per-subscription spanPipelineState when ev.subID already
exists (or alternatively panic to make the unexpected re-registration explicit).
Specifically, when w.states[ev.subID] != nil, reinitialize that entry
(spanPipelineState) so nextDataSeq, ackedSeq, doneSet, and pendingResolved are
cleared and span is set to ev.span (or call panic with a clear message if you
prefer failing fast) to avoid preserving stale sequencing/barrier state.

306-323: Quota and resolved-ts barrier hinge on the callback being invoked exactly once.

If consumeKVEvents returns true (line 319: await) but the wakeCallback closure is never called (consumer bug, panic, or lost reference), the acquired quota (ev.weight) is leaked permanently and the subscription's resolved-ts is stuck because ackedSeq will never advance past seq.

Consider adding a safety net, such as a timeout or periodic audit of long-outstanding sequences, to detect and recover from dropped callbacks — especially important given this is a WIP change.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/span_pipeline.go` around lines 306 - 323, The callback
from state.span.consumeKVEvents must be guarded so a lost/never-invoked
wakeCallback doesn't leak quota or stall ack advancement; update
spanPipelineWorker.handleData and the state tracking to record outstanding seq
with a deadline/timestamp when await==true, and add a background auditor
goroutine (e.g., on spanPipelineWorker) that periodically scans long-pending
seqs and for timeouts invokes the same recovery path: releaseQuota(ev.weight)
and call onPersisted(state, seq) (or otherwise simulate the wakeCallback) to
advance ackedSeq; alternatively pass a cancellable context/timeout into
consumeKVEvents if supported and treat a timeout as a failed callback, ensuring
every path that sets await==true has a corresponding guaranteed eventual
cleanup.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@logservice/logpuller/region_event_processor.go`:
- Around line 134-143: run() currently returns immediately on
w.processor.ctx.Done(), discarding buffered events and leaking quota; modify
run() in regionEventWorker so that when ctx.Done() is observed it drains w.ch
and calls w.handle(event) for each buffered event before returning (e.g., after
the ctx.Done() branch enter a non-blocking loop: repeatedly select { case event
:= <-w.ch: w.handle(event) default: return } ), ensuring all pending data events
are processed and their finishCallback executed.

In `@logservice/logpuller/span_pipeline.go`:
- Around line 102-133: The worker goroutines started in newSpanPipelineManager
via go w.run() can silently die on panic; update spanPipelineWorker.run to
include a top-level defer that recovers panics, logs the error (including stack)
and signals the manager (e.g., call a failure method on spanPipelineManager or
cancel the manager context) so enqueue won't block forever; alternatively,
construct workers inside an errgroup in newSpanPipelineManager and run w.run()
via the group's goroutine so worker errors propagate and trigger shutdown—ensure
the chosen approach references spanPipelineWorker.run, spanPipelineWorker,
newSpanPipelineManager and the manager's context/quit handling.
- Around line 167-191: The EnqueueData path can block forever when a single
batch weight (from approximateRawKVEntriesSize) exceeds the semaphore capacity
used by m.quota.Acquire; change EnqueueData to guard against oversized batches
by checking weight against the configured quotaBytes (or manager capacity), and
if weight > quotaBytes either split kvs into multiple smaller batches that each
have weight <= quotaBytes and enqueue them as separate spanPipelineEvent entries
(recomputing weight per chunk) or cap the chunk size to quotaBytes and loop
until all kvs are enqueued; also emit a warning log when an incoming batch is
near or above the quota threshold, and ensure any early returns still call
m.releaseQuota where appropriate and use the existing spanPipelineEvent,
m.quota.Acquire, m.releaseQuota, EnqueueData and approximateRawKVEntriesSize
symbols to locate the code to change.

---

Outside diff comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 402-410: The call to pipeline.Unregister in
subscriptionClient.onTableDrained can block and thus violate the non-blocking
requirement of the onRegionFail path; make the Unregister non-blocking by
invoking s.pipeline.Unregister(rt.subID) in a separate goroutine (e.g., go
func(id uint64){ s.pipeline.Unregister(id) }(rt.subID)) so onTableDrained
returns immediately, and keep the rest of the method (s.totalSpans.Lock/Unlock
and delete) intact; alternatively, if you prefer a library change, update
pipeline.Unregister to use a non-blocking enqueue (select with default) or
accept a context/timeout and call it with a short timeout from onTableDrained to
avoid blocking.

---

Nitpick comments:
In `@logservice/logpuller/region_event_handler_test.go`:
- Around line 136-142: Replace the anti-idiomatic assertions that use
require.True(t, false, ...) with require.Fail or require.FailNow to clearly
signal test failures; locate the occurrences in region_event_handler_test.go
where the select on eventCh uses require.True(t, false, ...) (and the other
similar asserts referencing require.True) and change them to require.Fail(t,
"unexpected event received") or require.FailNow(t, "unexpected event received")
depending on whether you want the test to continue, ensuring the failure message
describes the unexpected event.

In `@logservice/logpuller/region_event_processor.go`:
- Around line 77-84: The worker goroutines in regionEventWorker are started with
bare go w.run() and aren't tracked, so add lifecycle tracking: add a
sync.WaitGroup field (e.g., wg) to the processor (or to regionEventWorker), call
wg.Add(1) before starting each worker in the loop that creates regionEventWorker
(referencing workerCount, p.workers), have run() call wg.Done() on exit (and
continue to respect ctx.Done()), and expose a Shutdown/Wait method on the
processor that calls wg.Wait() so callers can wait for all workers to drain;
alternatively document explicitly that there is no drain guarantee if you choose
not to add the WaitGroup.
- Around line 159-172: The worker goroutine is being blocked when calling
w.processor.pipeline.EnqueueData(span, kvs), stalling other regions mapped to
the same worker; change the call so EnqueueData does not block the worker:
capture local variables (span, kvs, state.region.ID/subscription id if needed)
and offload the pipeline.EnqueueData invocation to a separate goroutine or a
dedicated per-subscription goroutine/queue so the worker returns immediately
after metricsEventCount.Add(...); ensure context propagation by passing
w.processor.ctx (or a derived context) into the offloaded call and preserve
error handling/metrics in that goroutine rather than blocking inside
region_event_processor.go's handling of event.entries / mustFirstState /
subscribedSpan.

In `@logservice/logpuller/region_request_worker.go`:
- Around line 209-210: The function comment for receiveAndDispatchChangeEvents
(on type regionRequestWorker) still says "dispatches them to ds" which is stale;
update the comment to reflect the current dispatch target (e.g., "dispatches
them to the processor" or the actual component now used) so it no longer
references dynstream/ds—edit the comment line above
receiveAndDispatchChangeEvents to name the new dispatch path/component
consistent with how events are forwarded in the function body.

In `@logservice/logpuller/span_pipeline.go`:
- Around line 277-291: handleRegister currently only updates the span for an
existing subscription and preserves stale ordering state; update the handler in
spanPipelineWorker::handleRegister to defensively reset the per-subscription
spanPipelineState when ev.subID already exists (or alternatively panic to make
the unexpected re-registration explicit). Specifically, when w.states[ev.subID]
!= nil, reinitialize that entry (spanPipelineState) so nextDataSeq, ackedSeq,
doneSet, and pendingResolved are cleared and span is set to ev.span (or call
panic with a clear message if you prefer failing fast) to avoid preserving stale
sequencing/barrier state.
- Around line 306-323: The callback from state.span.consumeKVEvents must be
guarded so a lost/never-invoked wakeCallback doesn't leak quota or stall ack
advancement; update spanPipelineWorker.handleData and the state tracking to
record outstanding seq with a deadline/timestamp when await==true, and add a
background auditor goroutine (e.g., on spanPipelineWorker) that periodically
scans long-pending seqs and for timeouts invokes the same recovery path:
releaseQuota(ev.weight) and call onPersisted(state, seq) (or otherwise simulate
the wakeCallback) to advance ackedSeq; alternatively pass a cancellable
context/timeout into consumeKVEvents if supported and treat a timeout as a
failed callback, ensuring every path that sets await==true has a corresponding
guaranteed eventual cleanup.

In `@logservice/logpuller/subscription_client.go`:
- Around line 240-241: The constructor calls to
newSpanPipelineManager(subClient.ctx, 0, 0, 0) and
newRegionEventProcessor(subClient.ctx, subClient, subClient.pipeline, 0, 0) rely
on implicit zero-argument defaults (GOMAXPROCS(0), large queue, 1GiB quota)
which hides configuration; update SubscriptionClient to surface these parameters
(e.g., add fields to SubscriptionClientConfig for workerCount, queueSize,
memoryQuota) and thread those config values into the calls to
newSpanPipelineManager and newRegionEventProcessor instead of passing zeros,
and/or add a concise comment next to the calls documenting the exact defaults
used if exposing config is not possible.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f221af5 and aab7a8f.

📒 Files selected for processing (11)
  • docs/design/2026-02-23-logpuller-span-pipeline.md
  • logservice/logpuller/region_event.go
  • logservice/logpuller/region_event_handler.go
  • logservice/logpuller/region_event_handler_test.go
  • logservice/logpuller/region_event_metrics.go
  • logservice/logpuller/region_event_processor.go
  • logservice/logpuller/region_request_worker.go
  • logservice/logpuller/span_pipeline.go
  • logservice/logpuller/span_pipeline_test.go
  • logservice/logpuller/subscription_client.go
  • pkg/metrics/log_puller.go
💤 Files with no reviewable changes (1)
  • logservice/logpuller/region_event_handler.go

Comment on lines +134 to +143
func (w *regionEventWorker) run() {
for {
select {
case <-w.processor.ctx.Done():
return
case event := <-w.ch:
w.handle(event)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, find the file and understand the context around the run() function
cat -n logservice/logpuller/region_event_processor.go | head -200

Repository: pingcap/ticdc

Length of output: 6359


🏁 Script executed:

# Search for Unregister method in the codebase
rg -A 15 "func.*Unregister" logservice/

Repository: pingcap/ticdc

Length of output: 7210


🏁 Script executed:

# Look for quota-related code and finishCallback
rg -B 3 -A 3 "finishCallback" logservice/logpuller/

Repository: pingcap/ticdc

Length of output: 1880


🏁 Script executed:

# Look for the EnqueueData implementation to understand quota handling
rg -A 20 "func.*EnqueueData" logservice/logpuller/span_pipeline.go

Repository: pingcap/ticdc

Length of output: 690


🏁 Script executed:

# Search for quota-related code and any cleanup on shutdown
rg -B 5 -A 10 "quota" logservice/logpuller/ | head -100

Repository: pingcap/ticdc

Length of output: 5645


🏁 Script executed:

# Check how the processor is shut down and if there's any graceful shutdown logic
rg -B 5 -A 10 "regionEventProcessor" logservice/logpuller/ | grep -A 10 -B 10 "ctx.Done\|shutdown\|close"

Repository: pingcap/ticdc

Length of output: 1259


🏁 Script executed:

# Look for spanPipelineWorker shutdown/cleanup logic
rg -B 5 -A 30 "func.*spanPipelineWorker.*run" logservice/logpuller/span_pipeline.go

Repository: pingcap/ticdc

Length of output: 802


🏁 Script executed:

# Check handleData implementation in spanPipelineWorker
rg -A 30 "func.*spanPipelineWorker.*handleData" logservice/logpuller/span_pipeline.go

Repository: pingcap/ticdc

Length of output: 840


🏁 Script executed:

# Check what happens when finishCallback is called
rg -B 5 -A 10 "finishCallback\(" logservice/logpuller/span_pipeline.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check if there's any quota cleanup on shutdown or unregister
rg -B 5 -A 15 "handleUnregister" logservice/logpuller/span_pipeline.go

Repository: pingcap/ticdc

Length of output: 1236


🏁 Script executed:

# Check if there's any graceful shutdown mechanism for the processors
rg -B 5 -A 10 "regionEventProcessor\|spanPipelineManager" logservice/logpuller/ | grep -B 5 -A 10 "Close\|Stop\|Shutdown"

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check if handleUnregister releases any in-flight quota for pending data events
rg -B 10 -A 20 "func.*handleUnregister" logservice/logpuller/span_pipeline.go

Repository: pingcap/ticdc

Length of output: 823


🏁 Script executed:

# Look for any shutdown/cleanup hooks or graceful draining mechanisms
rg -B 5 -A 15 "func.*spanPipelineManager\)" logservice/logpuller/span_pipeline.go | grep -A 15 "func.*"

Repository: pingcap/ticdc

Length of output: 2389


🏁 Script executed:

# Check the full span_pipeline.go to understand the event workflow and any cleanup
cat logservice/logpuller/span_pipeline.go | grep -n "type spanPipelineEvent\|eventType\|EventData\|EventPersisted" | head -20

Repository: pingcap/ticdc

Length of output: 334


🏁 Script executed:

# Check if there's cleanup for in-flight events anywhere in the codebase
rg "drainChannel\|drain\|cleanup.*event\|flush.*queue" logservice/logpuller/

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check the full flow: how EnqueueData and handle interact
rg -B 5 -A 20 "func.*handle.*event regionEvent" logservice/logpuller/region_event_processor.go

Repository: pingcap/ticdc

Length of output: 650


🏁 Script executed:

# Verify if there's any cleanup that happens when Unregister is called on the regionEventProcessor side
rg -B 10 -A 20 "regionEventProcessor\|regionEventWorker" logservice/logpuller/ | grep -B 10 -A 10 "Unregister\|cleanup\|close\|Stop"

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check where regionEventProcessor is used and if there's any shutdown sequence
fd -t f "\.go$" logservice/logpuller/ -exec grep -l "newRegionEventProcessor\|regionEventProcessor" {} \;

Repository: pingcap/ticdc

Length of output: 296


🏁 Script executed:

# Look at logservice subscribers or main flow to understand shutdown order
rg -B 3 -A 10 "newRegionEventProcessor\|regionEventProcessor" logservice/logpuller/subscription_client.go | head -50

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Find files that reference regionEventProcessor
fd -t f "\.go$" logservice/logpuller/ | xargs grep -l "regionEventProcessor"

Repository: pingcap/ticdc

Length of output: 147


🏁 Script executed:

# Check subscription_client.go for regionEventProcessor usage
cat -n logservice/logpuller/subscription_client.go | grep -B 10 -A 10 "regionEventProcessor" | head -80

Repository: pingcap/ticdc

Length of output: 2157


🏁 Script executed:

# Search for where EnqueueData is called and if quota is properly managed
rg -B 5 -A 5 "EnqueueData" logservice/logpuller/region_event_processor.go

Repository: pingcap/ticdc

Length of output: 787


🏁 Script executed:

# Look for any shutdown hooks or defer statements that clean up in-flight events
rg -B 5 -A 10 "defer\|close\|Stop\|Shutdown" logservice/logpuller/subscription_client.go | head -100

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check if subscriptionClient has any graceful shutdown mechanism that calls Unregister before cancelling ctx
rg -B 5 -A 20 "func.*subscriptionClient.*Close\|func.*subscriptionClient.*Stop\|func.*subscriptionClient.*Shutdown" logservice/logpuller/subscription_client.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check what happens when subClient.cancel is called
rg -B 10 -A 15 "subClient.cancel\|s.cancel" logservice/logpuller/subscription_client.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check all places where context cancellation happens
rg -B 5 -A 5 "cancel()" logservice/logpuller/subscription_client.go

Repository: pingcap/ticdc

Length of output: 1084


🏁 Script executed:

# Check if there's a graceful shutdown that drains pending events before cancelling context
rg -B 10 -A 20 "func.*subscriptionClient" logservice/logpuller/subscription_client.go | grep -A 20 "Close"

Repository: pingcap/ticdc

Length of output: 943


🏁 Script executed:

# Verify the exact sequence: check if EnqueueData can fail silently due to context being done
cat -n logservice/logpuller/span_pipeline.go | sed -n '160,195p'

Repository: pingcap/ticdc

Length of output: 1155


🏁 Script executed:

# Check if there's any cleanup for in-flight data when the pipeline manager context is cancelled
rg -B 5 "ctx.Done()" logservice/logpuller/span_pipeline.go | head -40

Repository: pingcap/ticdc

Length of output: 454


🏁 Script executed:

# Final verification: check if there's any cleanup hook that calls Unregister before cancel
rg -B 5 -A 15 "Unregister" logservice/logpuller/subscription_client.go

Repository: pingcap/ticdc

Length of output: 796


run() silently discards buffered data events on context cancellation, causing quota leaks.

When ctx.Done() fires, the worker exits immediately without draining w.ch. Any buffered data events are lost without being processed by spanPipelineWorker.handleData(), preventing their finishCallback from triggering the persisted event that would release acquired quota. This occurs because subscriptionClient.Close() cancels the context directly without first calling Unregister for in-flight subscriptions—Unregister only happens asynchronously when a table is drained, not during shutdown.

For resolved-ts events this is benign (no quota acquired), but data events that acquired quota will leak the quota allocation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_event_processor.go` around lines 134 - 143, run()
currently returns immediately on w.processor.ctx.Done(), discarding buffered
events and leaking quota; modify run() in regionEventWorker so that when
ctx.Done() is observed it drains w.ch and calls w.handle(event) for each
buffered event before returning (e.g., after the ctx.Done() branch enter a
non-blocking loop: repeatedly select { case event := <-w.ch: w.handle(event)
default: return } ), ensuring all pending data events are processed and their
finishCallback executed.

Comment on lines +102 to +133
func newSpanPipelineManager(
ctx context.Context, workerCount int, queueSize int, quotaBytes int64,
) *spanPipelineManager {
if workerCount <= 0 {
workerCount = runtime.GOMAXPROCS(0)
if workerCount <= 0 {
workerCount = 1
}
}
if queueSize <= 0 {
queueSize = defaultSpanPipelineQueueSize
}
if quotaBytes <= 0 {
quotaBytes = defaultSpanPipelineQuotaBytes
}

mgr := &spanPipelineManager{
ctx: ctx,
quota: semaphore.NewWeighted(quotaBytes),
workers: make([]*spanPipelineWorker, 0, workerCount),
}
for i := 0; i < workerCount; i++ {
w := &spanPipelineWorker{
ch: make(chan spanPipelineEvent, queueSize),
states: make(map[SubscriptionID]*spanPipelineState),
mgr: mgr,
}
mgr.workers = append(mgr.workers, w)
go w.run()
}
return mgr
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Worker goroutines are fire-and-forget with no panic recovery or lifecycle tracking.

Goroutines started with go w.run() (line 130) have no defer recover(), no errgroup membership, and no mechanism to signal failure. If a handler panics (e.g., a nil-pointer in consumeKVEvents), the worker silently dies, and every subscription hashed to that worker becomes permanently stuck — enqueue will block on the full/dead channel until the context is cancelled.

Consider:

  1. Adding a defer with panic recovery and logging in run(), or
  2. Starting workers inside an errgroup so that a failed worker propagates its error and triggers a shutdown.
🛡️ Suggested panic recovery in the worker loop
 func (w *spanPipelineWorker) run() {
+	defer func() {
+		if r := recover(); r != nil {
+			log.Error("span pipeline worker panicked",
+				zap.Any("recover", r),
+				zap.Stack("stack"))
+		}
+	}()
 	for {
 		select {
 		case <-w.mgr.ctx.Done():
 			return
 		case ev := <-w.ch:
 			w.handleEvent(ev)
 		}
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/span_pipeline.go` around lines 102 - 133, The worker
goroutines started in newSpanPipelineManager via go w.run() can silently die on
panic; update spanPipelineWorker.run to include a top-level defer that recovers
panics, logs the error (including stack) and signals the manager (e.g., call a
failure method on spanPipelineManager or cancel the manager context) so enqueue
won't block forever; alternatively, construct workers inside an errgroup in
newSpanPipelineManager and run w.run() via the group's goroutine so worker
errors propagate and trigger shutdown—ensure the chosen approach references
spanPipelineWorker.run, spanPipelineWorker, newSpanPipelineManager and the
manager's context/quit handling.

Comment on lines +167 to +191
func (m *spanPipelineManager) EnqueueData(
ctx context.Context, span *subscribedSpan, kvs []common.RawKVEntry,
) {
if span == nil || len(kvs) == 0 {
return
}
weight := approximateRawKVEntriesSize(kvs)
start := time.Now()
err := m.quota.Acquire(ctx, weight)
metrics.LogPullerSpanPipelineQuotaAcquireDuration.Observe(time.Since(start).Seconds())
if err != nil {
return
}
metrics.LogPullerSpanPipelineInflightBytes.Add(float64(weight))
metrics.LogPullerSpanPipelineInflightBatches.Inc()

if !m.enqueue(span.subID, spanPipelineEvent{
subID: span.subID,
typ: spanPipelineEventData,
kvs: kvs,
weight: weight,
}) {
m.releaseQuota(weight)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Acquire will block forever if a single batch's weight exceeds quotaBytes.

semaphore.Weighted.Acquire(ctx, n) blocks until n tokens are available. If weight exceeds the total semaphore capacity (default 1 GiB), the call can never succeed and will hang until ctx is cancelled. While unlikely with typical KV batches, there's no guard against it.

Consider capping weight at quotaBytes, or splitting oversized batches, or at minimum logging a warning when weight approaches the limit.

🛡️ Suggested guard
 func (m *spanPipelineManager) EnqueueData(
 	ctx context.Context, span *subscribedSpan, kvs []common.RawKVEntry,
 ) {
 	if span == nil || len(kvs) == 0 {
 		return
 	}
 	weight := approximateRawKVEntriesSize(kvs)
+	// Cap weight to quota capacity to prevent permanent blocking.
+	if weight > defaultSpanPipelineQuotaBytes {
+		log.Warn("span pipeline data batch exceeds quota capacity, capping weight",
+			zap.Int64("weight", weight),
+			zap.Int64("quota", defaultSpanPipelineQuotaBytes))
+		weight = defaultSpanPipelineQuotaBytes
+	}
 	start := time.Now()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/span_pipeline.go` around lines 167 - 191, The
EnqueueData path can block forever when a single batch weight (from
approximateRawKVEntriesSize) exceeds the semaphore capacity used by
m.quota.Acquire; change EnqueueData to guard against oversized batches by
checking weight against the configured quotaBytes (or manager capacity), and if
weight > quotaBytes either split kvs into multiple smaller batches that each
have weight <= quotaBytes and enqueue them as separate spanPipelineEvent entries
(recomputing weight per chunk) or cap the chunk size to quotaBytes and loop
until all kvs are enqueued; also emit a warning log when an incoming batch is
near or above the quota threshold, and ensure any early returns still call
m.releaseQuota where appropriate and use the existing spanPipelineEvent,
m.quota.Acquire, m.releaseQuota, EnqueueData and approximateRawKVEntriesSize
symbols to locate the code to change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant