feat(hydro_lang): add flatten_stream, flat_map_stream to Stream#2693
Merged
MingweiSamuel merged 2 commits intomainfrom Apr 6, 2026
Merged
feat(hydro_lang): add flatten_stream, flat_map_stream to Stream#2693MingweiSamuel merged 2 commits intomainfrom
MingweiSamuel merged 2 commits intomainfrom
Conversation
Deploying hydro with
|
| Latest commit: |
3e53ecd
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://4ad58126.hydroflow.pages.dev |
| Branch Preview URL: | https://mingwei-hydro-flat-stream.hydroflow.pages.dev |
6a281e6 to
5bfd0e2
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
Adds first-class support for flattening/flat-mapping async streams in Hydro/DFIR by introducing a new IR node and surfacing the corresponding operators/methods across the stack.
Changes:
- Introduces
HydroNode::FlatMapStreamand threads it through IR traversal/cloning/lowering/metadata/printing. - Adds DFIR surface operators
flat_map_streamandflatten_stream(pull + push codegen paths). - Adds
Stream::{flat_map_stream, flatten_stream}inhydro_langand adds DFIR surface tests covering basic + empty cases.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| hydro_lang/src/viz/render.rs | Renders FlatMapStream in the “single-expr transform” group for visualization. |
| hydro_lang/src/live_collections/stream/mod.rs | Adds flat_map_stream / flatten_stream Stream APIs that build HydroNode::FlatMapStream. |
| hydro_lang/src/compile/ir/mod.rs | Adds the FlatMapStream IR variant and lowers it to flat_map_stream(#f) in emitted DFIR. |
| dfir_rs/tests/surface_flat_stream.rs | Adds surface-level tests for flat_map_stream and flatten_stream. |
| dfir_lang/src/graph/ops/mod.rs | Registers the new operators so they’re available to DFIR syntax/codegen. |
| dfir_lang/src/graph/ops/flatten_stream.rs | Defines flatten_stream operator constraints and codegen for pull/push. |
| dfir_lang/src/graph/ops/flat_map_stream.rs | Defines flat_map_stream operator constraints and codegen for pull/push. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
shadaj
reviewed
Apr 2, 2026
shadaj
reviewed
Apr 3, 2026
MingweiSamuel
added a commit
that referenced
this pull request
Apr 3, 2026
…tten_stream_blocking Address review comment on PR #2693 requesting the `_blocking` suffix for these operators, consistent with the naming convention used by resolve_future_blocking and resolve_futures_blocking. Changes: - dfir_lang: Renamed operator files flat_map_stream.rs → flat_map_stream_blocking.rs and flatten_stream.rs → flatten_stream_blocking.rs, with updated constants (FLAT_MAP_STREAM_BLOCKING, FLATTEN_STREAM_BLOCKING) and operator names - dfir_lang/src/graph/ops/mod.rs: Updated declare_ops! entries - dfir_rs/tests/surface_flat_stream.rs: Updated operator names and test function names - hydro_lang/src/compile/ir/mod.rs: Renamed HydroNode::FlatMapStream to FlatMapStreamBlocking, updated lowering to emit flat_map_stream_blocking - hydro_lang/src/live_collections/stream/mod.rs: Renamed methods flat_map_stream → flat_map_stream_blocking, flatten_stream → flatten_stream_blocking - hydro_lang/src/viz/render.rs: Updated FlatMapStream → FlatMapStreamBlocking Co-authored-by: Infinity 🤖 <infinity@hydro.run>
shadaj
approved these changes
Apr 6, 2026
- Added FlatMapStream variant to HydroNode enum in compile/ir/mod.rs with the same shape as FlatMap (f, input, metadata). - Handled FlatMapStream in all match arms: traversal, deep_clone, lowering (emits flat_map_stream(#f)), expression refs, metadata/metadata_mut, input(), and print_root(). - Updated viz/render.rs to handle FlatMapStream in the transform group. - Added flat_map_stream() and flatten_stream() methods to Stream in hydro_lang/src/live_collections/stream/mod.rs. flat_map_stream takes a closure mapping items to futures::Stream; flatten_stream flattens items that are already Streams.
MingweiSamuel
added a commit
that referenced
this pull request
Apr 6, 2026
…tten_stream_blocking Address review comment on PR #2693 requesting the `_blocking` suffix for these operators, consistent with the naming convention used by resolve_future_blocking and resolve_futures_blocking. Changes: - dfir_lang: Renamed operator files flat_map_stream.rs → flat_map_stream_blocking.rs and flatten_stream.rs → flatten_stream_blocking.rs, with updated constants (FLAT_MAP_STREAM_BLOCKING, FLATTEN_STREAM_BLOCKING) and operator names - dfir_lang/src/graph/ops/mod.rs: Updated declare_ops! entries - dfir_rs/tests/surface_flat_stream.rs: Updated operator names and test function names - hydro_lang/src/compile/ir/mod.rs: Renamed HydroNode::FlatMapStream to FlatMapStreamBlocking, updated lowering to emit flat_map_stream_blocking - hydro_lang/src/live_collections/stream/mod.rs: Renamed methods flat_map_stream → flat_map_stream_blocking, flatten_stream → flatten_stream_blocking - hydro_lang/src/viz/render.rs: Updated FlatMapStream → FlatMapStreamBlocking Co-authored-by: Infinity 🤖 <infinity@hydro.run>
96e9697 to
617530e
Compare
…tten_stream_blocking Address review comment on PR #2693 requesting the `_blocking` suffix for these operators, consistent with the naming convention used by resolve_future_blocking and resolve_futures_blocking. Changes: - dfir_lang: Renamed operator files flat_map_stream.rs → flat_map_stream_blocking.rs and flatten_stream.rs → flatten_stream_blocking.rs, with updated constants (FLAT_MAP_STREAM_BLOCKING, FLATTEN_STREAM_BLOCKING) and operator names - dfir_lang/src/graph/ops/mod.rs: Updated declare_ops! entries - dfir_rs/tests/surface_flat_stream.rs: Updated operator names and test function names - hydro_lang/src/compile/ir/mod.rs: Renamed HydroNode::FlatMapStream to FlatMapStreamBlocking, updated lowering to emit flat_map_stream_blocking - hydro_lang/src/live_collections/stream/mod.rs: Renamed methods flat_map_stream → flat_map_stream_blocking, flatten_stream → flatten_stream_blocking - hydro_lang/src/viz/render.rs: Updated FlatMapStream → FlatMapStreamBlocking Co-authored-by: Infinity 🤖 <infinity@hydro.run> fix formatting
617530e to
3e53ecd
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
STACK PREV: #2688
the same shape as FlatMap (f, input, metadata).
(emits flat_map_stream(#f)), expression refs, metadata/metadata_mut,
input(), and print_root().
hydro_lang/src/live_collections/stream/mod.rs. flat_map_stream takes a
closure mapping items to futures::Stream; flatten_stream flattens items
that are already Streams.