feat: add chunked file upload support Streaming Upload API (rx.upload_files_chunk)#6190
feat: add chunked file upload support Streaming Upload API (rx.upload_files_chunk)#6190FarhanAliRaza wants to merge 8 commits intoreflex-dev:mainfrom
Conversation
…_files_chunk) Implement chunked/streaming file uploads to handle large files without loading them entirely into memory. Moves upload handling logic from app.py to event.py, adds chunked upload JS helpers, and updates the upload component to support the new upload_files_chunk API. Includes unit and integration tests for chunked upload, cancel, and streaming.
Greptile SummaryThis PR introduces a streaming/chunked file upload API ( Key issues found:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Browser
participant UploadChunkRoute as POST /_upload_chunk
participant StateManager
participant BackgroundTask as Background Handler
participant UploadChunkIterator
Browser->>UploadChunkRoute: POST multipart/form-data (headers: token, handler)
UploadChunkRoute->>StateManager: get_state + _process_background(event)
StateManager-->>UploadChunkRoute: asyncio.Task
UploadChunkRoute->>UploadChunkIterator: set_consumer_task(task)
Note over BackgroundTask: Task scheduled, awaiting chunks
loop Stream parsing
Browser->>UploadChunkRoute: HTTP body bytes (streamed)
UploadChunkRoute->>UploadChunkIterator: push(UploadChunk)
UploadChunkIterator-->>BackgroundTask: yield UploadChunk
BackgroundTask->>StateManager: async with self (emit state update)
StateManager-->>Browser: WebSocket state delta
end
UploadChunkRoute->>UploadChunkIterator: finish()
BackgroundTask-->>StateManager: (completes, final state update)
UploadChunkRoute-->>Browser: 202 Accepted
alt Client Disconnects Mid-Upload
Browser->>UploadChunkRoute: TCP close / ClientDisconnect
UploadChunkRoute->>BackgroundTask: task.cancel()
UploadChunkRoute->>BackgroundTask: await task (no timeout ⚠️)
UploadChunkRoute-->>Browser: 200 (empty)
end
Last reviewed commit: "feat: add chunked fi..." |
reflex/upload.py
Outdated
| except ClientDisconnect: | ||
| task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError): | ||
| await task | ||
| return Response() |
There was a problem hiding this comment.
Unbounded
await task on client disconnect
When the client disconnects, task.cancel() is called and then await task waits for it to complete. However, if the background handler has a long finally block or ignores cancellation (e.g., due to a blocking I/O call), this await will block indefinitely, leaving the upload route handler permanently stalled.
A timeout should be added so that the handler eventually returns regardless:
except ClientDisconnect:
task.cancel()
with contextlib.suppress(asyncio.CancelledError, asyncio.TimeoutError):
await asyncio.wait_for(task, timeout=5.0)
return Response()
reflex/components/core/upload.py
Outdated
| on_drop: EventHandler[_on_drop_args_spec] | ||
|
|
||
| # Fired when dropped files do not meet the specified criteria. | ||
| on_drop_rejected: EventHandler[_on_drop_spec] | ||
| on_drop_rejected: EventHandler[_on_drop_args_spec] |
There was a problem hiding this comment.
on_drop_rejected should not use _on_drop_args_spec
on_drop_rejected fires when dropped files fail validation (wrong type, too large, etc.) — it is called with a list of rejected file records, never with an UploadChunkIterator. Changing its spec to _on_drop_args_spec (which includes passthrough_event_spec(UploadChunkIterator)) allows users to accidentally attach a streaming-upload handler to on_drop_rejected, which is semantically wrong and will never work as intended.
This is also true for the Upload class at line 274.
| on_drop: EventHandler[_on_drop_args_spec] | |
| # Fired when dropped files do not meet the specified criteria. | |
| on_drop_rejected: EventHandler[_on_drop_spec] | |
| on_drop_rejected: EventHandler[_on_drop_args_spec] | |
| # Fired when files are dropped. | |
| on_drop: EventHandler[_on_drop_args_spec] | |
| # Fired when dropped files do not meet the specified criteria. | |
| on_drop_rejected: EventHandler[_on_drop_spec] |
reflex/_upload.py
Outdated
| async def _flush_emitted_chunks(self) -> None: | ||
| """Push parsed upload chunks into the handler iterator.""" | ||
| while self._chunks_to_emit: | ||
| await self.chunk_iter.push(self._chunks_to_emit.pop(0)) |
There was a problem hiding this comment.
O(n)
list.pop(0) in hot upload path
self._chunks_to_emit is declared as list[UploadChunk]. Calling pop(0) on a list is O(n) because it shifts all elements left. For a high-throughput streaming upload that emits many small chunks, this degrades performance.
Either change _chunks_to_emit to a collections.deque and use popleft(), or simply iterate and clear:
async def _flush_emitted_chunks(self) -> None:
"""Push parsed upload chunks into the handler iterator."""
chunks, self._chunks_to_emit = self._chunks_to_emit, []
for chunk in chunks:
await self.chunk_iter.push(chunk)
reflex/event.py
Outdated
| UploadTypeError: If the handler is a background task. | ||
| UploadValueError: If the handler does not accept ``list[rx.UploadFile]``. | ||
| """ | ||
| from reflex.app import UploadFile |
There was a problem hiding this comment.
Indirect import of
UploadFile via reflex.app re-export
UploadFile was moved from reflex.app to reflex.uploads in this PR, but this function still imports it through reflex.app's re-export. reflex.app itself imports from reflex.uploads, so this creates an unnecessary level of indirection. The deferred import should reference the defining module directly:
| from reflex.app import UploadFile | |
| from reflex.uploads import UploadFile |
reflex/components/core/upload.py
Outdated
| if event.client_handler_name not in { | ||
| "uploadFiles", | ||
| "uploadFilesChunk", | ||
| }: |
There was a problem hiding this comment.
Hardcoded string identifiers should be extracted into constants
The string literals "uploadFiles" and "uploadFilesChunk" are used as identifiers in multiple places — here in upload.py and also inside state.js (applyRestEvent). Per the project's style guidelines, string literals used as identifiers or keys should be extracted into named constants to avoid typos and make future renaming easier.
Consider defining them as module-level constants (e.g., in reflex/constants/event.py or a dedicated location):
_UPLOAD_FILES_HANDLER = "uploadFiles"
_UPLOAD_FILES_CHUNK_HANDLER = "uploadFilesChunk"and referencing them from both upload.py and state.js.
Rule Used: String literals that are used as identifiers or ke... (source)
Learnt From
reflex-dev/flexgen#2170
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
masenf
left a comment
There was a problem hiding this comment.
if possible, the frontend code should be consolidated. i don't think there's a need to change the frontend code at all. you should be able to detect which type of upload is used in the backend and dispatch to the correct upload type based on the resolved handler arg type
| ) | ||
|
|
||
| assert chunk.filename == "foo.txt" | ||
| assert isinstance(rx.UploadChunkIterator(), rx.UploadChunkIterator) |
| up_comp_5 = Upload.create( | ||
| id="foo_id", | ||
| on_drop=StreamingUploadStateTest.chunk_drop_handler( | ||
| cast(Any, rx.upload_files_chunk(upload_id="foo_id")) |
There was a problem hiding this comment.
use # pyright: ignore[...] comments so if we eventually fix the typing down the road, we can detect "useless type ignore" and remove them
| @@ -0,0 +1,537 @@ | |||
| """Backend upload helpers and routes for Reflex apps.""" | |||
There was a problem hiding this comment.
to match the other non plural names throughout reflex, can this be renamed as reflex/upload.py
There was a problem hiding this comment.
Renaming it to _upload naming it upload contradict with upload component.
There was a problem hiding this comment.
sounds good. i do want to move more toward framework internals being marked with the leading underscore and only exposing a well vetted public API for our upcoming 1.0 release
Move upload helpers from reflex/upload.py to reflex/_upload.py, unify the frontend to use a single uploadFiles function instead of separate uploadFiles/uploadFilesChunk paths, and normalize upload payload keys server-side in state.py instead of branching in the JS client.
|
Don't know why pre-commit is failing. |
The pyi_generator script created files that different from the last known hash. i.e. "something" in the pyi output changed and it looks like it was something that most components are inheriting, so probably in the base component class EDIT: actually i see that you added UploadFile as default import, so every pyi file got its hash changed. do we need UploadFile as a default import? |

Implement chunked/streaming file uploads to handle large files without loading them entirely into memory. Moves upload handling logic from app.py to event.py, adds chunked upload JS helpers, and updates the upload component to support the new upload_files_chunk API. Includes unit and integration tests for chunked upload, cancel, and streaming.
All Submissions:
Type of change
Please delete options that are not relevant.
New Feature Submission:
Changes To Core Features:
closes #6184