-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Problem
Reflex's current upload API (rx.upload_files) buffers the entire file into memory on the server before the event handler runs. For large files (video, datasets, etc.), this causes high memory usage and prevents the developer from processing data as it arrives (e.g., streaming to S3, computing checksums incrementally, showing server-side progress).
Goal
Add a new streaming upload API that allows event handlers to process file data chunk-by-chunk as it arrives, without buffering the entire file.
Proposed API
rx.upload_files_chunk(...) is used in the same way as rx.upload_files(...) — provided as an argument to an event handler passed to the rx.upload on_drop trigger or a button's on_click. The difference is that instead of the handler receiving a list[rx.UploadFile], it receives an rx.UploadChunkIterator.
import reflex as rx
from pathlib import Path
from typing import BinaryIO
class UploadState(rx.State):
files_received: list[str] = []
@rx.event(background=True)
async def handle_upload_chunk(self, chunk_iter: rx.UploadChunkIterator):
"""Process uploaded file data as it streams in.
A single upload request may contain multiple files.
The iterator yields chunks across all files in the request.
"""
upload_dir = Path(rx.get_upload_dir())
file_handles: dict[str, BinaryIO | None] = {}
local_paths: dict[str, Path] = {}
try:
async for chunk in chunk_iter:
# chunk.filename: str — original file name
# chunk.offset: int — byte offset of this chunk within the file
# chunk.content_type: str — MIME type
# chunk.data: bytes — the chunk payload
# Resolve local path once per filename.
if chunk.filename not in local_paths:
p = upload_dir / chunk.filename
p.parent.mkdir(parents=True, exist_ok=True)
local_paths[chunk.filename] = p
fh = file_handles.get(chunk.filename)
if chunk.filename not in file_handles:
# New file — open a fresh handle.
fh = open(local_paths[chunk.filename], "wb")
file_handles[chunk.filename] = fh
elif fh is None:
# Previously seen file whose handle was closed — reopen for append.
fh = open(local_paths[chunk.filename], "ab")
file_handles[chunk.filename] = fh
fh.seek(chunk.offset)
fh.write(chunk.data)
finally:
for fh in file_handles.values():
if fh is not None:
fh.close()
async with self:
self.files_received = list(file_handles.keys())
def index():
return rx.upload.root(
id="my_upload",
on_drop=UploadState.handle_upload_chunk(
rx.upload_files_chunk(
upload_id="my_upload",
on_upload_progress=lambda u: rx.console_log(u),
)
),
)Key design points:
rx.upload_files_chunk(...)works likerx.upload_files(...)— it is passed to the upload trigger. It supports the sameupload_idandon_upload_progressarguments- The handler must be a
@rx.event(background=True)task (enforce this with a validation error at compile time) - The handler must accept an
rx.UploadChunkIteratorargument (async iterator yieldingUploadChunkobjects) UploadChunkis a dataclass with fields:filename: str,offset: int,content_type: str,data: bytes- A single upload request may contain multiple files — the iterator yields chunks across all files interleaved
Notes
- Background events handle their own updates to the frontend via
async with selfmechanism, so the upload handler is not responsible for yielding ndjson updates as it does for the current API.
Acceptance Criteria
- New
UploadChunkdataclass defined and exported fromrxnamespace - New
UploadChunkIteratortype defined (async iterator ofUploadChunk) - Backend
/_uploadendpoint (or a new streaming variant) supports chunked/streaming multipart parsing — investigatestreaming_form_datalibrary or similar - Handler validation: raise a clear error if the given event handler is not a background task
- Handler validation: raise a clear error if no
UploadChunkIteratorparameter is found in the handler signature - The frontend upload JS (
reflex/.templates/web/utils/helpers/upload.js) sends file data in a streaming-compatible way (this may mean chunked uploads via multiple requests, or a streaming multipart body) - Integration test demonstrating a large file upload processed chunk-by-chunk
- Existing
rx.upload_filesAPI continues to work unchanged (backward compatible)
Key Files
| File | Purpose |
|---|---|
reflex/app.py:1894-2026 |
Current upload_file() endpoint — reads entire file via await request.form() then await file.read(). New streaming path goes here or in a parallel endpoint |
reflex/app.py:248-285 |
UploadFile class wrapping Starlette's UploadFile |
reflex/.templates/web/utils/helpers/upload.js |
Frontend upload handler — uses XMLHttpRequest + FormData. May need chunking logic |
reflex/components/core/upload.py |
Upload component, upload_file() / selected_files() helpers, FileUpload event |
reflex/event.py:845-937 |
FileUpload / upload_files event class — as_event_spec() creates the client-side upload trigger |
reflex/constants/event.py |
Endpoint.UPLOAD constant |
Prior Art / Libraries
streaming_form_data— streaming multipart parser for Python; avoids buffering files into memory- See ext-upload.tar.gz for an idea of how this was used with Reflex previously, but don't get hung up on this implementation, as the new proposed API will handle chunking with background events and async iterators instead of calling an
on_chunkevent repeatedly, which would take the costly state lock.
- See ext-upload.tar.gz for an idea of how this was used with Reflex previously, but don't get hung up on this implementation, as the new proposed API will handle chunking with background events and async iterators instead of calling an