Skip to content

feat(hydro_lang): add flatten_stream, flat_map_stream to Stream#2693

Merged
MingweiSamuel merged 2 commits intomainfrom
mingwei/hydro-flat-stream
Apr 6, 2026
Merged

feat(hydro_lang): add flatten_stream, flat_map_stream to Stream#2693
MingweiSamuel merged 2 commits intomainfrom
mingwei/hydro-flat-stream

Conversation

@MingweiSamuel
Copy link
Copy Markdown
Member

@MingweiSamuel MingweiSamuel commented Mar 25, 2026

STACK PREV: #2688

  • Added FlatMapStream variant to HydroNode enum in compile/ir/mod.rs with
    the same shape as FlatMap (f, input, metadata).
  • Handled FlatMapStream in all match arms: traversal, deep_clone, lowering
    (emits flat_map_stream(#f)), expression refs, metadata/metadata_mut,
    input(), and print_root().
  • Updated viz/render.rs to handle FlatMapStream in the transform group.
  • Added flat_map_stream() and flatten_stream() methods to Stream in
    hydro_lang/src/live_collections/stream/mod.rs. flat_map_stream takes a
    closure mapping items to futures::Stream; flatten_stream flattens items
    that are already Streams.

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented Mar 25, 2026

Deploying hydro with  Cloudflare Pages  Cloudflare Pages

Latest commit: 3e53ecd
Status: ✅  Deploy successful!
Preview URL: https://4ad58126.hydroflow.pages.dev
Branch Preview URL: https://mingwei-hydro-flat-stream.hydroflow.pages.dev

View logs

@MingweiSamuel MingweiSamuel force-pushed the mingwei/hydro-flat-stream branch 2 times, most recently from 6a281e6 to 5bfd0e2 Compare April 1, 2026 17:32
@MingweiSamuel MingweiSamuel marked this pull request as ready for review April 1, 2026 17:32
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds first-class support for flattening/flat-mapping async streams in Hydro/DFIR by introducing a new IR node and surfacing the corresponding operators/methods across the stack.

Changes:

  • Introduces HydroNode::FlatMapStream and threads it through IR traversal/cloning/lowering/metadata/printing.
  • Adds DFIR surface operators flat_map_stream and flatten_stream (pull + push codegen paths).
  • Adds Stream::{flat_map_stream, flatten_stream} in hydro_lang and adds DFIR surface tests covering basic + empty cases.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
hydro_lang/src/viz/render.rs Renders FlatMapStream in the “single-expr transform” group for visualization.
hydro_lang/src/live_collections/stream/mod.rs Adds flat_map_stream / flatten_stream Stream APIs that build HydroNode::FlatMapStream.
hydro_lang/src/compile/ir/mod.rs Adds the FlatMapStream IR variant and lowers it to flat_map_stream(#f) in emitted DFIR.
dfir_rs/tests/surface_flat_stream.rs Adds surface-level tests for flat_map_stream and flatten_stream.
dfir_lang/src/graph/ops/mod.rs Registers the new operators so they’re available to DFIR syntax/codegen.
dfir_lang/src/graph/ops/flatten_stream.rs Defines flatten_stream operator constraints and codegen for pull/push.
dfir_lang/src/graph/ops/flat_map_stream.rs Defines flat_map_stream operator constraints and codegen for pull/push.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread hydro_lang/src/live_collections/stream/mod.rs Outdated
Comment thread dfir_lang/src/graph/ops/flat_map_stream.rs Outdated
MingweiSamuel added a commit that referenced this pull request Apr 3, 2026
…tten_stream_blocking

Address review comment on PR #2693 requesting the `_blocking` suffix for
these operators, consistent with the naming convention used by
resolve_future_blocking and resolve_futures_blocking.

Changes:
- dfir_lang: Renamed operator files flat_map_stream.rs → flat_map_stream_blocking.rs
  and flatten_stream.rs → flatten_stream_blocking.rs, with updated constants
  (FLAT_MAP_STREAM_BLOCKING, FLATTEN_STREAM_BLOCKING) and operator names
- dfir_lang/src/graph/ops/mod.rs: Updated declare_ops! entries
- dfir_rs/tests/surface_flat_stream.rs: Updated operator names and test function names
- hydro_lang/src/compile/ir/mod.rs: Renamed HydroNode::FlatMapStream to
  FlatMapStreamBlocking, updated lowering to emit flat_map_stream_blocking
- hydro_lang/src/live_collections/stream/mod.rs: Renamed methods
  flat_map_stream → flat_map_stream_blocking, flatten_stream → flatten_stream_blocking
- hydro_lang/src/viz/render.rs: Updated FlatMapStream → FlatMapStreamBlocking

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
- Added FlatMapStream variant to HydroNode enum in compile/ir/mod.rs with
  the same shape as FlatMap (f, input, metadata).
- Handled FlatMapStream in all match arms: traversal, deep_clone, lowering
  (emits flat_map_stream(#f)), expression refs, metadata/metadata_mut,
  input(), and print_root().
- Updated viz/render.rs to handle FlatMapStream in the transform group.
- Added flat_map_stream() and flatten_stream() methods to Stream in
  hydro_lang/src/live_collections/stream/mod.rs. flat_map_stream takes a
  closure mapping items to futures::Stream; flatten_stream flattens items
  that are already Streams.
MingweiSamuel added a commit that referenced this pull request Apr 6, 2026
…tten_stream_blocking

Address review comment on PR #2693 requesting the `_blocking` suffix for
these operators, consistent with the naming convention used by
resolve_future_blocking and resolve_futures_blocking.

Changes:
- dfir_lang: Renamed operator files flat_map_stream.rs → flat_map_stream_blocking.rs
  and flatten_stream.rs → flatten_stream_blocking.rs, with updated constants
  (FLAT_MAP_STREAM_BLOCKING, FLATTEN_STREAM_BLOCKING) and operator names
- dfir_lang/src/graph/ops/mod.rs: Updated declare_ops! entries
- dfir_rs/tests/surface_flat_stream.rs: Updated operator names and test function names
- hydro_lang/src/compile/ir/mod.rs: Renamed HydroNode::FlatMapStream to
  FlatMapStreamBlocking, updated lowering to emit flat_map_stream_blocking
- hydro_lang/src/live_collections/stream/mod.rs: Renamed methods
  flat_map_stream → flat_map_stream_blocking, flatten_stream → flatten_stream_blocking
- hydro_lang/src/viz/render.rs: Updated FlatMapStream → FlatMapStreamBlocking

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
@MingweiSamuel MingweiSamuel force-pushed the mingwei/hydro-flat-stream branch from 96e9697 to 617530e Compare April 6, 2026 20:46
@MingweiSamuel MingweiSamuel enabled auto-merge (squash) April 6, 2026 20:49
…tten_stream_blocking

Address review comment on PR #2693 requesting the `_blocking` suffix for
these operators, consistent with the naming convention used by
resolve_future_blocking and resolve_futures_blocking.

Changes:
- dfir_lang: Renamed operator files flat_map_stream.rs → flat_map_stream_blocking.rs
  and flatten_stream.rs → flatten_stream_blocking.rs, with updated constants
  (FLAT_MAP_STREAM_BLOCKING, FLATTEN_STREAM_BLOCKING) and operator names
- dfir_lang/src/graph/ops/mod.rs: Updated declare_ops! entries
- dfir_rs/tests/surface_flat_stream.rs: Updated operator names and test function names
- hydro_lang/src/compile/ir/mod.rs: Renamed HydroNode::FlatMapStream to
  FlatMapStreamBlocking, updated lowering to emit flat_map_stream_blocking
- hydro_lang/src/live_collections/stream/mod.rs: Renamed methods
  flat_map_stream → flat_map_stream_blocking, flatten_stream → flatten_stream_blocking
- hydro_lang/src/viz/render.rs: Updated FlatMapStream → FlatMapStreamBlocking

Co-authored-by: Infinity 🤖 <infinity@hydro.run>

fix formatting
@MingweiSamuel MingweiSamuel force-pushed the mingwei/hydro-flat-stream branch from 617530e to 3e53ecd Compare April 6, 2026 21:12
@MingweiSamuel MingweiSamuel merged commit 354ede5 into main Apr 6, 2026
19 checks passed
@MingweiSamuel MingweiSamuel deleted the mingwei/hydro-flat-stream branch April 6, 2026 21:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants