feat(bigframes): Add substrait-datafusion backend#17215
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a Substrait compilation and execution framework for BigFrames, featuring a compiler that translates logical plans into Substrait format and an executor that utilizes Apache DataFusion for local execution. The implementation handles various relational operations such as joins, aggregations, and window functions, and includes a testing suite with a dedicated test session. Review feedback focuses on enhancing code quality by addressing a potential NameError, removing debug print and stderr statements, replacing hardcoded magic numbers with existing mappings, and optimizing import placement for better performance.
| project_rel2.common.emit.output_mapping.extend(output_mapping) | ||
| return rel2 | ||
|
|
||
| def _compile_bound(self, val: typing.Optional[int], bound_msg: algebra_pb2.Expression.WindowFunction.Bound): |
There was a problem hiding this comment.
| pb_rel = self._compile_node(plan) | ||
|
|
||
| pb_plan = plan_pb2.Plan() | ||
| pb_plan.version.minor_number = 42 |
| import sys | ||
| for n in plan.unique_nodes(): | ||
| if not isinstance(n, supported_nodes): | ||
| sys.stderr.write(f"UNSUPPORTED NODE TYPE: {type(n).__name__} -> {n}\n") | ||
| sys.stderr.flush() |
There was a problem hiding this comment.
This block uses sys.stderr for debug logging. Replace this with proper logging to aid in debugging. Additionally, the loop is redundant as the same check is performed by the all() call on line 88; remove it to keep the codebase clean and avoid redundancy.
References
- Remove duplicate lines of code to keep the codebase clean and avoid redundancy.
- Avoid using direct stderr writes for logging; use a logger to aid in debugging and prevent masking issues.
| "names": fields, | ||
| "struct": {"types": types} | ||
| } | ||
| print("SCHEMA_DICT:", schema_dict) |
| sys.stderr.write(f"JOIN CONDITIONS: {node.conditions}\n") | ||
| sys.stderr.flush() |
| expr = eq_expressions[0] | ||
| for e in eq_expressions[1:]: | ||
| and_expr = algebra_pb2.Expression() | ||
| and_expr.scalar_function.function_reference = 13 # and |
| table = table.select([item.source_id for item in node.scan_list.items]) | ||
| table = table.rename_columns([item.id.sql for item in node.scan_list.items]) | ||
| if node.offsets_col is not None: | ||
| from bigframes.core import pyarrow_utils |
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕