Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/bigframes/bigframes/core/compile/substrait/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from .compiler import SubstraitCompiler

__all__ = ["SubstraitCompiler"]
1,494 changes: 1,494 additions & 0 deletions packages/bigframes/bigframes/core/compile/substrait/compiler.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import bigframes
import bigframes.pandas as bpd
import pandas as pd
import sys

# Initialize session
bpd.options.compute.backend = "substrait"

df = bpd.read_pandas(pd.DataFrame({
"bytes_col": [b"a", b"b", b"c", b"d", b"e", b"f", b"g"],
"numeric_col": [1, 2, 3, 4, 5, 6, 7],
"val": [10, 20, 30, 40, 50, 60, 70]
}))

sub_df = df.iloc[[4, 1, 2]]
sub_df = sub_df.set_index(["bytes_col", "numeric_col"])
drop_index = sub_df.index

df = df.set_index(["bytes_col", "numeric_col"])

print("DF INDEX:")
print(df.index)
print("DROP INDEX:")
print(drop_index)

res = df.drop(index=drop_index)
print("RESULT:")
print(res.to_pandas())
154 changes: 154 additions & 0 deletions packages/bigframes/bigframes/session/substrait_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import abc
from typing import TYPE_CHECKING, Optional

from bigframes.core import bigframe_node
from bigframes.session import executor, semi_executor
import bigframes.core.rewrite.slices as slices_rewrite
from bigframes.core import nodes
import asyncio

if TYPE_CHECKING:
import pyarrow as pa


class SubstraitConsumer(abc.ABC):
"""
Interface for consuming Substrait plans and executing them.
This acts as a plugin interface for different Substrait execution engines.
"""

@abc.abstractmethod
def consume(self, plan: bytes, tables: dict[str, pa.Table]) -> pa.Table:
"""
Executes a Substrait plan and returns a PyArrow Table.

Args:
plan: The Substrait plan as bytes (usually a serialized Protobuf).
tables: A dictionary of table names to PyArrow Tables for local data.

Returns:
A PyArrow Table containing the results.
"""
pass


class DataFusionSubstraitConsumer(SubstraitConsumer):
"""
Executes Substrait plans using Apache DataFusion.
"""

def consume(self, plan_proto: bytes, tables: dict[str, pa.Table]) -> pa.Table:
# Import datafusion lazily to avoid hard dependency
try:
import datafusion
except ImportError:
raise ImportError(
"The datafusion package is required to use DataFusionSubstraitConsumer. "
"Install it with `pip install datafusion`."
)

ctx = datafusion.SessionContext()

for name, table in tables.items():
df = ctx.from_arrow_table(table)
ctx.register_table(name, df)

import datafusion.substrait

datafusion_substrait_plan = datafusion.substrait.Serde.deserialize_bytes(plan_proto)
logical_plan = datafusion.substrait.Consumer.from_substrait_plan(ctx, datafusion_substrait_plan)
df = ctx.create_dataframe_from_logical_plan(logical_plan)
return df.to_arrow_table()


class SubstraitExecutor(semi_executor.SemiExecutor):
"""
Executes plans by compiling them to Substrait and running them via a consumer.
"""

def __init__(self, consumer: SubstraitConsumer):
self._consumer = consumer
# Lazy import to avoid circular dependencies
from bigframes.core.compile.substrait.compiler import SubstraitCompiler
self._compiler = SubstraitCompiler()

async def execute(
self,
plan: bigframe_node.BigFrameNode,
ordered: bool,
peek: Optional[int] = None,
) -> Optional[executor.ExecuteResult]:
plan = plan.bottom_up(slices_rewrite.rewrite_slice)

from bigframes.core import expression, rewrite
output_cols = tuple((expression.DerefOp(id), id.name) for id in plan.ids)
result_node = nodes.ResultNode(
plan,
output_cols=output_cols,
)
import typing
result_node = typing.cast(nodes.ResultNode, rewrite.column_pruning(result_node))
result_node = rewrite.defer_order(result_node, output_hidden_row_keys=False)

rewritten_plan = result_node.child

if ordered and result_node.order_by and result_node.order_by.all_ordering_columns:
rewritten_plan = nodes.OrderByNode(
rewritten_plan,
by=tuple(result_node.order_by.all_ordering_columns),
)

original_ids = tuple(id for id in plan.ids)
if rewritten_plan.ids != original_ids:
rewritten_plan = nodes.SelectionNode(
rewritten_plan,
input_output_pairs=tuple(nodes.AliasedRef.identity(id) for id in original_ids)
)

if not self._can_execute(rewritten_plan):
return None

substrait_plan_proto = self._compiler.compile(rewritten_plan)
if substrait_plan_proto is None:
return None

tables = {}
for node in rewritten_plan.unique_nodes():
if isinstance(node, nodes.ReadLocalNode):
table_name = f"table_{id(node)}"
table = node.local_data_source.data
table = table.select([item.source_id for item in node.scan_list.items])
table = table.rename_columns([item.id.sql for item in node.scan_list.items])
if node.offsets_col is not None:
from bigframes.core import pyarrow_utils
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This import is inside a loop. For better performance and to follow best practices, move it to the top of the method or the file.

table = pyarrow_utils.append_offsets(table, node.offsets_col.sql)
tables[table_name] = table

pa_table = await asyncio.to_thread(self._consumer.consume, substrait_plan_proto, tables)

if peek is not None:
pa_table = pa_table.slice(0, peek)

return executor.LocalExecuteResult(
data=pa_table,
bf_schema=rewritten_plan.schema,
)

def _can_execute(self, plan: bigframe_node.BigFrameNode) -> bool:
return self._compiler.can_compile(plan)
112 changes: 112 additions & 0 deletions packages/bigframes/bigframes/testing/substrait_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import dataclasses
import weakref
from typing import TYPE_CHECKING, Union

import pandas

import bigframes
import bigframes.core.blocks
import bigframes.dataframe
import bigframes.session.execution_spec
import bigframes.session.executor
import bigframes.session.metrics

if TYPE_CHECKING:
import bigframes.core


@dataclasses.dataclass
class SubstraitTestExecutor(bigframes.session.executor.Executor):
def __init__(self):
from bigframes.session.substrait_executor import DataFusionSubstraitConsumer, SubstraitExecutor
self.executor = SubstraitExecutor(DataFusionSubstraitConsumer())

def execute(
self,
array_value: bigframes.core.ArrayValue,
execution_spec: bigframes.session.execution_spec.ExecutionSpec,
):
if execution_spec.destination_spec is not None:
raise ValueError(
f"SubstraitTestExecutor does not support destination spec: {execution_spec.destination_spec}"
)

result = self.executor.execute(array_value.node, ordered=True, peek=execution_spec.peek)
if result is None:
raise NotImplementedError("SubstraitExecutor cannot execute this plan")

return result

def cached(
self,
array_value: bigframes.core.ArrayValue,
*,
config,
) -> None:
return


class TestSession(bigframes.session.Session):
def __init__(self):
self._location = None # type: ignore
self._bq_kms_key_name = None # type: ignore
self._clients_provider = None # type: ignore
self._bq_connection = None # type: ignore
self._skip_bq_connection_check = True
self._session_id: str = "substrait_test_session"
self._objects: list[
weakref.ReferenceType[
Union[
bigframes.core.indexes.Index,
bigframes.series.Series,
bigframes.dataframe.DataFrame,
]
]
] = []
self._strictly_ordered: bool = True
self._allow_ambiguity = False # type: ignore
self._default_index_type = bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64
self._metrics = bigframes.session.metrics.ExecutionMetrics()
self._function_session = None # type: ignore
self._temp_storage_manager = None # type: ignore
self._executor = SubstraitTestExecutor()
self._loader = None # type: ignore

def read_pandas(self, pandas_dataframe, write_engine="default"):
original_input = pandas_dataframe

if isinstance(pandas_dataframe, (pandas.Series, pandas.Index)):
pandas_dataframe = pandas_dataframe.to_frame()

local_block = bigframes.core.blocks.Block.from_local(pandas_dataframe, self)
bf_df = bigframes.dataframe.DataFrame(local_block)

if isinstance(original_input, pandas.Series):
series = bf_df[bf_df.columns[0]]
series.name = original_input.name
return series

if isinstance(original_input, pandas.Index):
return bf_df.index

return bf_df

@property
def bqclient(self):
return None
15 changes: 13 additions & 2 deletions packages/bigframes/tests/system/small/engines/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
local_scan_executor,
polars_executor,
semi_executor,
substrait_executor,
)

CURRENT_DIR = pathlib.Path(__file__).parent
Expand Down Expand Up @@ -81,9 +82,17 @@ def sqlglot_engine(
)


@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq", "bq-sqlglot"])
@pytest.fixture(scope="session")
def substrait_datafusion_engine(
) -> semi_executor.SemiExecutor:
return substrait_executor.SubstraitExecutor(
consumer = substrait_executor.DataFusionSubstraitConsumer()
)


@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq", "bq-sqlglot", "substrait-datafusion"])
def engine(
request, pyarrow_engine, polars_engine, bq_engine, sqlglot_engine
request, pyarrow_engine, polars_engine, bq_engine, sqlglot_engine, substrait_datafusion_engine
) -> semi_executor.SemiExecutor:
if request.param == "pyarrow":
return pyarrow_engine
Expand All @@ -93,6 +102,8 @@ def engine(
return bq_engine
if request.param == "bq-sqlglot":
return sqlglot_engine
if request.param == "substrait-datafusion":
return substrait_datafusion_engine
raise ValueError(f"Unrecognized param: {request.param}")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def apply_agg_to_all_valid(
return new_arr


@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True)
@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot", "substrait-datafusion"], indirect=True)
def test_engines_aggregate_post_filter_size(
scalars_array_value: array_value.ArrayValue,
engine,
Expand Down
Loading
Loading