-
Notifications
You must be signed in to change notification settings - Fork 25
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
streamsorter, an accumulator example, aborted when it took certain inputs from generator source (yes, I modified example's spec). Reproduced on both v0.10.0 and 23bc5d0 (Feb 28 2026).
Full logs are attached in Logs section. Here is some logs of streamsorter:
2026-03-05 05:45:51 CRITICAL accumulator task not found
NoneType: None
2026-03-05 05:45:51 CRITICAL BaseException: accumulator task not found
2026-03-05 05:45:51 CRITICAL accumulator task not found
2026-03-05 05:45:51 CRITICAL Accumulator Error: AbortError('Locally aborted.')
Traceback (most recent call last):
File "/opt/pysetup/examples/accumulator/streamsorter/.venv/lib/python3.10/site-packages/pynumaflow/accumulator/servicer/task_manager.py", line 225, in process_input_stream
async for request in request_iterator:
File "/opt/pysetup/examples/accumulator/streamsorter/.venv/lib/python3.10/site-packages/pynumaflow/accumulator/servicer/async_servicer.py", line 24, in datum_generator
async for d in request_iterator:
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 607, in _async_message_receiver
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 132, in read
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi", line 82, in grpc._cython.cygrpc.RPCState.raise_for_termination
grpc._cython.cygrpc.AbortError: Locally aborted.
2026-03-05 05:45:51 INFO Killing process: Got exception UDF_EXECUTION_ERROR(udf): BaseException('accumulator task not found')
Killed
There are two generators. Their specs are as follows. Note that the duration (20s) is longer than the accumulator's timeout (10s), so I believe the accumulator will be timed out.
- name: input-one
scale:
min: 1
max: 1
source:
generator:
rpu: 10
keyCount: 10
duration: 20s
- name: input-two
scale:
min: 1
max: 1
source:
generator:
rpu: 10
keyCount: 10
duration: 20sTo Reproduce
I have my two branches for reproducing the issue:
- https://github.com/tmenjo/numaflow-python/tree/streamsorter-v0.10.0
- https://github.com/tmenjo/numaflow-python/tree/streamsorter-main
Steps:
- Run
make image - Run
./reproduce.shto apply pipeline.yaml - Wait until the status of the
stream-sorter-py-accum-0-XXXXXpod changes
Expected behavior
streamsorter should not abort. It should keep running.
Logs
Environment
- Kubernetes: v1.33.2
- Numaflow: v1.7.1
- Numaflow-python: v0.10.0 or 23bc5d0
Message from the maintainers:
Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working