Skip to content

feat(bigframes): Add substrait-datafusion backend#17215

Draft
TrevorBergeron wants to merge 5 commits into
mainfrom
tbergeron_substrait
Draft

feat(bigframes): Add substrait-datafusion backend#17215
TrevorBergeron wants to merge 5 commits into
mainfrom
tbergeron_substrait

Conversation

@TrevorBergeron
Copy link
Copy Markdown
Contributor

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a Substrait compilation and execution framework for BigFrames, featuring a compiler that translates logical plans into Substrait format and an executor that utilizes Apache DataFusion for local execution. The implementation handles various relational operations such as joins, aggregations, and window functions, and includes a testing suite with a dedicated test session. Review feedback focuses on enhancing code quality by addressing a potential NameError, removing debug print and stderr statements, replacing hardcoded magic numbers with existing mappings, and optimizing import placement for better performance.

project_rel2.common.emit.output_mapping.extend(output_mapping)
return rel2

def _compile_bound(self, val: typing.Optional[int], bound_msg: algebra_pb2.Expression.WindowFunction.Bound):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The typing module is not imported, which will cause a NameError when accessing typing.Optional. Since Optional is already imported from typing at the top of the file, use it directly.

def _compile_bound(self, val: Optional[int], bound_msg: algebra_pb2.Expression.WindowFunction.Bound):

pb_rel = self._compile_node(plan)

pb_plan = plan_pb2.Plan()
pb_plan.version.minor_number = 42
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Substrait version minor number is hardcoded to 42. This appears to be a placeholder and should be replaced with a valid version number or a configurable parameter.

Comment on lines +83 to +87
import sys
for n in plan.unique_nodes():
if not isinstance(n, supported_nodes):
sys.stderr.write(f"UNSUPPORTED NODE TYPE: {type(n).__name__} -> {n}\n")
sys.stderr.flush()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block uses sys.stderr for debug logging. Replace this with proper logging to aid in debugging. Additionally, the loop is redundant as the same check is performed by the all() call on line 88; remove it to keep the codebase clean and avoid redundancy.

References
  1. Remove duplicate lines of code to keep the codebase clean and avoid redundancy.
  2. Avoid using direct stderr writes for logging; use a logger to aid in debugging and prevent masking issues.

"names": fields,
"struct": {"types": types}
}
print("SCHEMA_DICT:", schema_dict)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Remove this debug print statement.

Comment on lines +218 to +219
sys.stderr.write(f"JOIN CONDITIONS: {node.conditions}\n")
sys.stderr.flush()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Remove these debug stderr writes.

expr = eq_expressions[0]
for e in eq_expressions[1:]:
and_expr = algebra_pb2.Expression()
and_expr.scalar_function.function_reference = 13 # and
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid using hardcoded magic numbers for function references. Use the _EXTENSIONS mapping instead for consistency and maintainability.

and_expr.scalar_function.function_reference = self._EXTENSIONS["and"]

table = table.select([item.source_id for item in node.scan_list.items])
table = table.rename_columns([item.id.sql for item in node.scan_list.items])
if node.offsets_col is not None:
from bigframes.core import pyarrow_utils
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This import is inside a loop. For better performance and to follow best practices, move it to the top of the method or the file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant