Skip to content

streamsorter aborted with "accumulator task not found" in case of certain generator inputs #327

@tmenjo

Description

@tmenjo

Describe the bug

streamsorter, an accumulator example, aborted when it took certain inputs from generator source (yes, I modified example's spec). Reproduced on both v0.10.0 and 23bc5d0 (Feb 28 2026).

Full logs are attached in Logs section. Here is some logs of streamsorter:

2026-03-05 05:45:51 CRITICAL accumulator task not found
NoneType: None
2026-03-05 05:45:51 CRITICAL BaseException: accumulator task not found
2026-03-05 05:45:51 CRITICAL accumulator task not found
2026-03-05 05:45:51 CRITICAL Accumulator Error: AbortError('Locally aborted.')
Traceback (most recent call last):
  File "/opt/pysetup/examples/accumulator/streamsorter/.venv/lib/python3.10/site-packages/pynumaflow/accumulator/servicer/task_manager.py", line 225, in process_input_stream
    async for request in request_iterator:
  File "/opt/pysetup/examples/accumulator/streamsorter/.venv/lib/python3.10/site-packages/pynumaflow/accumulator/servicer/async_servicer.py", line 24, in datum_generator
    async for d in request_iterator:
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 607, in _async_message_receiver
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 132, in read
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 82, in grpc._cython.cygrpc.RPCState.raise_for_termination
grpc._cython.cygrpc.AbortError: Locally aborted.
2026-03-05 05:45:51 INFO     Killing process: Got exception UDF_EXECUTION_ERROR(udf): BaseException('accumulator task not found')
Killed

There are two generators. Their specs are as follows. Note that the duration (20s) is longer than the accumulator's timeout (10s), so I believe the accumulator will be timed out.

    - name: input-one
      scale:
        min: 1
        max: 1
      source:
        generator:
          rpu: 10
          keyCount: 10
          duration: 20s
    - name: input-two
      scale:
        min: 1
        max: 1
      source:
        generator:
          rpu: 10
          keyCount: 10
          duration: 20s

To Reproduce

I have my two branches for reproducing the issue:

Steps:

  1. Run make image
  2. Run ./reproduce.sh to apply pipeline.yaml
  3. Wait until the status of the stream-sorter-py-accum-0-XXXXX pod changes

Expected behavior

streamsorter should not abort. It should keep running.

Logs

Environment

  • Kubernetes: v1.33.2
  • Numaflow: v1.7.1
  • Numaflow-python: v0.10.0 or 23bc5d0

Message from the maintainers:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions