Skip to content

Streaming Upload API (rx.upload_files_chunk) #6184

@masenf

Description

@masenf

Problem

Reflex's current upload API (rx.upload_files) buffers the entire file into memory on the server before the event handler runs. For large files (video, datasets, etc.), this causes high memory usage and prevents the developer from processing data as it arrives (e.g., streaming to S3, computing checksums incrementally, showing server-side progress).

Goal

Add a new streaming upload API that allows event handlers to process file data chunk-by-chunk as it arrives, without buffering the entire file.

Proposed API

rx.upload_files_chunk(...) is used in the same way as rx.upload_files(...) — provided as an argument to an event handler passed to the rx.upload on_drop trigger or a button's on_click. The difference is that instead of the handler receiving a list[rx.UploadFile], it receives an rx.UploadChunkIterator.

import reflex as rx
from pathlib import Path
from typing import BinaryIO

class UploadState(rx.State):
    files_received: list[str] = []

    @rx.event(background=True)
    async def handle_upload_chunk(self, chunk_iter: rx.UploadChunkIterator):
        """Process uploaded file data as it streams in.

        A single upload request may contain multiple files.
        The iterator yields chunks across all files in the request.
        """
        upload_dir = Path(rx.get_upload_dir())
        file_handles: dict[str, BinaryIO | None] = {}
        local_paths: dict[str, Path] = {}
        try:
            async for chunk in chunk_iter:
                # chunk.filename: str — original file name
                # chunk.offset: int — byte offset of this chunk within the file
                # chunk.content_type: str — MIME type
                # chunk.data: bytes — the chunk payload

                # Resolve local path once per filename.
                if chunk.filename not in local_paths:
                    p = upload_dir / chunk.filename
                    p.parent.mkdir(parents=True, exist_ok=True)
                    local_paths[chunk.filename] = p

                fh = file_handles.get(chunk.filename)
                if chunk.filename not in file_handles:
                    # New file — open a fresh handle.
                    fh = open(local_paths[chunk.filename], "wb")
                    file_handles[chunk.filename] = fh
                elif fh is None:
                    # Previously seen file whose handle was closed — reopen for append.
                    fh = open(local_paths[chunk.filename], "ab")
                    file_handles[chunk.filename] = fh
                fh.seek(chunk.offset)
                fh.write(chunk.data)
        finally:
            for fh in file_handles.values():
                if fh is not None:
                    fh.close()

        async with self:
            self.files_received = list(file_handles.keys())


def index():
    return rx.upload.root(
        id="my_upload",
        on_drop=UploadState.handle_upload_chunk(
            rx.upload_files_chunk(
                upload_id="my_upload",
                on_upload_progress=lambda u: rx.console_log(u),
            )
        ),
    )

Key design points:

  • rx.upload_files_chunk(...) works like rx.upload_files(...) — it is passed to the upload trigger. It supports the same upload_id and on_upload_progress arguments
  • The handler must be a @rx.event(background=True) task (enforce this with a validation error at compile time)
  • The handler must accept an rx.UploadChunkIterator argument (async iterator yielding UploadChunk objects)
  • UploadChunk is a dataclass with fields: filename: str, offset: int, content_type: str, data: bytes
  • A single upload request may contain multiple files — the iterator yields chunks across all files interleaved

Notes

  • Background events handle their own updates to the frontend via async with self mechanism, so the upload handler is not responsible for yielding ndjson updates as it does for the current API.

Acceptance Criteria

  • New UploadChunk dataclass defined and exported from rx namespace
  • New UploadChunkIterator type defined (async iterator of UploadChunk)
  • Backend /_upload endpoint (or a new streaming variant) supports chunked/streaming multipart parsing — investigate streaming_form_data library or similar
  • Handler validation: raise a clear error if the given event handler is not a background task
  • Handler validation: raise a clear error if no UploadChunkIterator parameter is found in the handler signature
  • The frontend upload JS (reflex/.templates/web/utils/helpers/upload.js) sends file data in a streaming-compatible way (this may mean chunked uploads via multiple requests, or a streaming multipart body)
  • Integration test demonstrating a large file upload processed chunk-by-chunk
  • Existing rx.upload_files API continues to work unchanged (backward compatible)

Key Files

File Purpose
reflex/app.py:1894-2026 Current upload_file() endpoint — reads entire file via await request.form() then await file.read(). New streaming path goes here or in a parallel endpoint
reflex/app.py:248-285 UploadFile class wrapping Starlette's UploadFile
reflex/.templates/web/utils/helpers/upload.js Frontend upload handler — uses XMLHttpRequest + FormData. May need chunking logic
reflex/components/core/upload.py Upload component, upload_file() / selected_files() helpers, FileUpload event
reflex/event.py:845-937 FileUpload / upload_files event class — as_event_spec() creates the client-side upload trigger
reflex/constants/event.py Endpoint.UPLOAD constant

Prior Art / Libraries

  • streaming_form_data — streaming multipart parser for Python; avoids buffering files into memory
    • See ext-upload.tar.gz for an idea of how this was used with Reflex previously, but don't get hung up on this implementation, as the new proposed API will handle chunking with background events and async iterators instead of calling an on_chunk event repeatedly, which would take the costly state lock.

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions