Skip to content

feat: add chunked file upload support Streaming Upload API (rx.upload_files_chunk)#6190

Open
FarhanAliRaza wants to merge 8 commits intoreflex-dev:mainfrom
FarhanAliRaza:chunked-upload
Open

feat: add chunked file upload support Streaming Upload API (rx.upload_files_chunk)#6190
FarhanAliRaza wants to merge 8 commits intoreflex-dev:mainfrom
FarhanAliRaza:chunked-upload

Conversation

@FarhanAliRaza
Copy link
Collaborator

@FarhanAliRaza FarhanAliRaza commented Mar 18, 2026

Implement chunked/streaming file uploads to handle large files without loading them entirely into memory. Moves upload handling logic from app.py to event.py, adds chunked upload JS helpers, and updates the upload component to support the new upload_files_chunk API. Includes unit and integration tests for chunked upload, cancel, and streaming.

All Submissions:

  • Have you followed the guidelines stated in CONTRIBUTING.md file?
  • Have you checked to ensure there aren't any other open Pull Requests for the desired changed?

Type of change

Please delete options that are not relevant.

  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

New Feature Submission:

  • Does your submission pass the tests?
  • Have you linted your code locally prior to submission?

Changes To Core Features:

  • Have you added an explanation of what your changes do and why you'd like us to include them?
  • Have you written new tests for your core changes, as applicable?
  • Have you successfully ran tests with your changes locally?

closes #6184

…_files_chunk)

Implement chunked/streaming file uploads to handle large files without
loading them entirely into memory. Moves upload handling logic from
app.py to event.py, adds chunked upload JS helpers, and updates the
upload component to support the new upload_files_chunk API. Includes
unit and integration tests for chunked upload, cancel, and streaming.
@codspeed-hq
Copy link

codspeed-hq bot commented Mar 18, 2026

Merging this PR will not alter performance

✅ 8 untouched benchmarks


Comparing FarhanAliRaza:chunked-upload (ba6a28e) with main (d45a1bb)

Open in CodSpeed

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 18, 2026

Greptile Summary

This PR introduces a streaming/chunked file upload API (rx.upload_files_chunk) that avoids loading entire files into memory. Upload handling is extracted from app.py into a new reflex/uploads.py module, a new /_upload_chunk endpoint is registered, and a custom async multipart parser (_UploadChunkMultipartParser) pushes raw bytes incrementally to a background event handler via the new UploadChunkIterator. The frontend gains a parallel uploadFilesChunk JS helper that posts to the new endpoint. Test coverage includes both unit and integration tests for streaming, cancellation, and progress tracking.

Key issues found:

  • on_drop_rejected incorrectly typed with _on_drop_args_spec — rejected-file callbacks should never receive an UploadChunkIterator; using the chunk spec here allows users to accidentally wire a streaming handler to on_drop_rejected. The on_drop_rejected field in both GhostUpload and Upload should keep the original _on_drop_spec.
  • No timeout when awaiting a cancelled background task — on ClientDisconnect, task.cancel() is followed by an unbounded await task. A handler with a slow finally block will stall the route indefinitely.
  • O(n) list.pop(0) in _flush_emitted_chunks — high-throughput small-chunk uploads repeatedly shift a list; use a deque or swap-and-iterate pattern instead.
  • UploadFile imported via reflex.app re-export in event.py — since UploadFile now lives in reflex.uploads, the deferred import in resolve_upload_handler_param should reference reflex.uploads directly.
  • Hardcoded handler name strings "uploadFiles" / "uploadFilesChunk" — used as identifiers in both upload.py and state.js; should be extracted into shared constants.

Confidence Score: 2/5

  • Two logic bugs (unbounded await on disconnect and incorrect on_drop_rejected spec) need to be fixed before merging.
  • The architecture is sound and tests are thorough, but the unbounded await task on client disconnect can permanently stall the upload route, and the on_drop_rejected spec change can silently allow users to wire an incompatible streaming handler — both are observable runtime issues that should be addressed before shipping.
  • Pay close attention to reflex/uploads.py (disconnect handling) and reflex/components/core/upload.py (on_drop_rejected spec).

Important Files Changed

Filename Overview
reflex/uploads.py New module housing upload route handlers and the streaming multipart parser; contains an unbounded await task on disconnect and an O(n) pop(0) in the hot flush path.
reflex/event.py Adds UploadChunk, UploadChunkIterator, and UploadFilesChunk; imports UploadFile via reflex.app re-export instead of directly from reflex.uploads.
reflex/components/core/upload.py Introduces _on_drop_args_spec to support chunk uploads on on_drop, but incorrectly applies it to on_drop_rejected as well; also uses hardcoded handler name string literals.
reflex/.templates/web/utils/helpers/upload.js Refactors upload logic into shared sendUploadRequest helper and adds uploadFilesChunk for the new _upload_chunk endpoint; env.UPLOAD_CHUNK is populated automatically via set_env_json.
reflex/.templates/web/utils/state.js Routes uploadFilesChunk handler name to the new JS function; uses hardcoded handler name strings that should be constants.
reflex/app.py Moves upload route logic to reflex/uploads.py and registers the new _upload_chunk endpoint; clean refactor with no new issues.
reflex/constants/event.py Adds UPLOAD_CHUNK = "_upload_chunk" to the Endpoint enum; this is automatically picked up by set_env_json() so env.UPLOAD_CHUNK is correctly populated in the frontend.
tests/units/test_app.py Adds thorough unit tests for chunk streaming, invalid multipart data (returns 400), and background task draining.
tests/integration/test_upload.py Adds integration tests for streaming upload (chunk verification), cancel-on-disconnect, and progress tracking.

Sequence Diagram

sequenceDiagram
    participant Browser
    participant UploadChunkRoute as POST /_upload_chunk
    participant StateManager
    participant BackgroundTask as Background Handler
    participant UploadChunkIterator

    Browser->>UploadChunkRoute: POST multipart/form-data (headers: token, handler)
    UploadChunkRoute->>StateManager: get_state + _process_background(event)
    StateManager-->>UploadChunkRoute: asyncio.Task
    UploadChunkRoute->>UploadChunkIterator: set_consumer_task(task)
    Note over BackgroundTask: Task scheduled, awaiting chunks

    loop Stream parsing
        Browser->>UploadChunkRoute: HTTP body bytes (streamed)
        UploadChunkRoute->>UploadChunkIterator: push(UploadChunk)
        UploadChunkIterator-->>BackgroundTask: yield UploadChunk
        BackgroundTask->>StateManager: async with self (emit state update)
        StateManager-->>Browser: WebSocket state delta
    end

    UploadChunkRoute->>UploadChunkIterator: finish()
    BackgroundTask-->>StateManager: (completes, final state update)
    UploadChunkRoute-->>Browser: 202 Accepted

    alt Client Disconnects Mid-Upload
        Browser->>UploadChunkRoute: TCP close / ClientDisconnect
        UploadChunkRoute->>BackgroundTask: task.cancel()
        UploadChunkRoute->>BackgroundTask: await task (no timeout ⚠️)
        UploadChunkRoute-->>Browser: 200 (empty)
    end
Loading

Last reviewed commit: "feat: add chunked fi..."

reflex/upload.py Outdated
Comment on lines +522 to +526
except ClientDisconnect:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
return Response()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unbounded await task on client disconnect

When the client disconnects, task.cancel() is called and then await task waits for it to complete. However, if the background handler has a long finally block or ignores cancellation (e.g., due to a blocking I/O call), this await will block indefinitely, leaving the upload route handler permanently stalled.

A timeout should be added so that the handler eventually returns regardless:

except ClientDisconnect:
    task.cancel()
    with contextlib.suppress(asyncio.CancelledError, asyncio.TimeoutError):
        await asyncio.wait_for(task, timeout=5.0)
    return Response()

Comment on lines +220 to +223
on_drop: EventHandler[_on_drop_args_spec]

# Fired when dropped files do not meet the specified criteria.
on_drop_rejected: EventHandler[_on_drop_spec]
on_drop_rejected: EventHandler[_on_drop_args_spec]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 on_drop_rejected should not use _on_drop_args_spec

on_drop_rejected fires when dropped files fail validation (wrong type, too large, etc.) — it is called with a list of rejected file records, never with an UploadChunkIterator. Changing its spec to _on_drop_args_spec (which includes passthrough_event_spec(UploadChunkIterator)) allows users to accidentally attach a streaming-upload handler to on_drop_rejected, which is semantically wrong and will never work as intended.

This is also true for the Upload class at line 274.

Suggested change
on_drop: EventHandler[_on_drop_args_spec]
# Fired when dropped files do not meet the specified criteria.
on_drop_rejected: EventHandler[_on_drop_spec]
on_drop_rejected: EventHandler[_on_drop_args_spec]
# Fired when files are dropped.
on_drop: EventHandler[_on_drop_args_spec]
# Fired when dropped files do not meet the specified criteria.
on_drop_rejected: EventHandler[_on_drop_spec]

Comment on lines +216 to +219
async def _flush_emitted_chunks(self) -> None:
"""Push parsed upload chunks into the handler iterator."""
while self._chunks_to_emit:
await self.chunk_iter.push(self._chunks_to_emit.pop(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 O(n) list.pop(0) in hot upload path

self._chunks_to_emit is declared as list[UploadChunk]. Calling pop(0) on a list is O(n) because it shifts all elements left. For a high-throughput streaming upload that emits many small chunks, this degrades performance.

Either change _chunks_to_emit to a collections.deque and use popleft(), or simply iterate and clear:

async def _flush_emitted_chunks(self) -> None:
    """Push parsed upload chunks into the handler iterator."""
    chunks, self._chunks_to_emit = self._chunks_to_emit, []
    for chunk in chunks:
        await self.chunk_iter.push(chunk)

reflex/event.py Outdated
UploadTypeError: If the handler is a background task.
UploadValueError: If the handler does not accept ``list[rx.UploadFile]``.
"""
from reflex.app import UploadFile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Indirect import of UploadFile via reflex.app re-export

UploadFile was moved from reflex.app to reflex.uploads in this PR, but this function still imports it through reflex.app's re-export. reflex.app itself imports from reflex.uploads, so this creates an unnecessary level of indirection. The deferred import should reference the defining module directly:

Suggested change
from reflex.app import UploadFile
from reflex.uploads import UploadFile

Comment on lines +318 to +321
if event.client_handler_name not in {
"uploadFiles",
"uploadFilesChunk",
}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Hardcoded string identifiers should be extracted into constants

The string literals "uploadFiles" and "uploadFilesChunk" are used as identifiers in multiple places — here in upload.py and also inside state.js (applyRestEvent). Per the project's style guidelines, string literals used as identifiers or keys should be extracted into named constants to avoid typos and make future renaming easier.

Consider defining them as module-level constants (e.g., in reflex/constants/event.py or a dedicated location):

_UPLOAD_FILES_HANDLER = "uploadFiles"
_UPLOAD_FILES_CHUNK_HANDLER = "uploadFilesChunk"

and referencing them from both upload.py and state.js.

Rule Used: String literals that are used as identifiers or ke... (source)

Learnt From
reflex-dev/flexgen#2170

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link
Collaborator

@masenf masenf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if possible, the frontend code should be consolidated. i don't think there's a need to change the frontend code at all. you should be able to detect which type of upload is used in the backend and dispatch to the correct upload type based on the resolved handler arg type

)

assert chunk.filename == "foo.txt"
assert isinstance(rx.UploadChunkIterator(), rx.UploadChunkIterator)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

up_comp_5 = Upload.create(
id="foo_id",
on_drop=StreamingUploadStateTest.chunk_drop_handler(
cast(Any, rx.upload_files_chunk(upload_id="foo_id"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use # pyright: ignore[...] comments so if we eventually fix the typing down the road, we can detect "useless type ignore" and remove them

@@ -0,0 +1,537 @@
"""Backend upload helpers and routes for Reflex apps."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to match the other non plural names throughout reflex, can this be renamed as reflex/upload.py

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming it to _upload naming it upload contradict with upload component.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. i do want to move more toward framework internals being marked with the leading underscore and only exposing a well vetted public API for our upcoming 1.0 release

Move upload helpers from reflex/upload.py to reflex/_upload.py, unify
the frontend to use a single uploadFiles function instead of separate
uploadFiles/uploadFilesChunk paths, and normalize upload payload keys
server-side in state.py instead of branching in the JS client.
@FarhanAliRaza
Copy link
Collaborator Author

Don't know why pre-commit is failing.

@masenf
Copy link
Collaborator

masenf commented Mar 18, 2026

Don't know why pre-commit is failing.

The pyi_generator script created files that different from the last known hash. i.e. "something" in the pyi output changed and it looks like it was something that most components are inheriting, so probably in the base component class

EDIT: actually i see that you added UploadFile as default import, so every pyi file got its hash changed. do we need UploadFile as a default import?

@FarhanAliRaza
Copy link
Collaborator Author

EDIT: actually i see that you added UploadFile as default import, so every pyi file got its hash changed. do we need UploadFile as a default import?

No we dont need it in default imports.
But it has another issue it was generating verbose pyi like this
image

even when we have a top-level import UploadFile in upload.pyi it still tries to do an absolute import. but can be made better.
I fixed this in pyi_generator.

@FarhanAliRaza FarhanAliRaza requested a review from masenf March 19, 2026 09:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming Upload API (rx.upload_files_chunk)

2 participants