diff --git a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py
index 926f220370b0..e9e8435ea1a1 100644
--- a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py
+++ b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py
@@ -16,7 +16,7 @@
import functools
import typing
-from typing import cast, Any
+from typing import Any, cast
import bigframes_vendored.ibis.expr.api as ibis_api
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py
index ce01252b68b2..61831f4cc399 100644
--- a/packages/bigframes/bigframes/core/events.py
+++ b/packages/bigframes/bigframes/core/events.py
@@ -20,7 +20,7 @@
import datetime
import threading
import uuid
-from typing import Any, Callable, Optional, Set
+from typing import Any, Callable, Literal, Set
import google.cloud.bigquery._job_helpers
import google.cloud.bigquery.job.query
@@ -28,9 +28,19 @@
import bigframes.session.executor
+_DEFAULT: Literal["default"] = "default"
+
+ProgressBarType = Literal["default", "auto", "notebook", "terminal"] | None
+QueryPlanType = list[google.cloud.bigquery.job.query.QueryPlanEntry] | None
+
class Subscriber:
- def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher):
+ def __init__(
+ self,
+ callback: Callable[[EventEnvelope], None],
+ *,
+ publisher: Publisher,
+ ):
self._publisher = publisher
self._callback = callback
self._subscriber_id = uuid.uuid4()
@@ -57,10 +67,12 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
if exc_value is not None:
self(
- UnknownErrorEvent(
- exc_type=exc_type,
- exc_value=exc_value,
- traceback=traceback,
+ EventEnvelope(
+ UnknownErrorEvent(
+ exc_type=exc_type,
+ exc_value=exc_value,
+ traceback=traceback,
+ )
)
)
self.close()
@@ -74,7 +86,10 @@ def __init__(self):
concurrent.futures.ThreadPoolExecutor()
)
- def subscribe(self, callback: Callable[[Event], None]) -> Subscriber:
+ def subscribe(
+ self,
+ callback: Callable[[EventEnvelope], None],
+ ) -> Subscriber:
# TODO(b/448176657): figure out how to handle subscribers/publishers in
# a background thread. Maybe subscribers should be thread-local?
subscriber = Subscriber(callback, publisher=self)
@@ -86,17 +101,21 @@ def unsubscribe(self, subscriber: Subscriber):
with self._subscribers_lock:
self._subscribers.remove(subscriber)
- def publish(self, event: Event):
+ def publish(self, envelope: EventEnvelope | Event):
+ if not isinstance(envelope, EventEnvelope):
+ envelope = EventEnvelope(event=envelope)
with self._subscribers_lock:
for subscriber in self._subscribers:
- subscriber(event)
+ subscriber(envelope)
- async def publish_async(self, event: Event):
+ async def publish_async(self, envelope: EventEnvelope | Event):
+ if not isinstance(envelope, EventEnvelope):
+ envelope = EventEnvelope(event=envelope)
with self._subscribers_lock:
subscribers_snapshot = list(self._subscribers)
loop = asyncio.get_running_loop()
tasks = [
- loop.run_in_executor(self._executor, subscriber, event)
+ loop.run_in_executor(self._executor, subscriber, envelope)
for subscriber in subscribers_snapshot
]
return await asyncio.gather(*tasks, return_exceptions=True)
@@ -106,6 +125,12 @@ class Event:
pass
+@dataclasses.dataclass(frozen=True)
+class EventEnvelope:
+ event: Event
+ progress_bar: ProgressBarType = _DEFAULT
+
+
@dataclasses.dataclass(frozen=True)
class SessionClosed(Event):
session_id: str
@@ -121,7 +146,7 @@ class ExecutionRunning(Event):
@dataclasses.dataclass(frozen=True)
class ExecutionFinished(Event):
- result: Optional[bigframes.session.executor.ExecuteResult] = None
+ result: bigframes.session.executor.ExecuteResult | None = None
@dataclasses.dataclass(frozen=True)
@@ -136,13 +161,16 @@ class BigQuerySentEvent(ExecutionRunning):
"""Query sent to BigQuery."""
query: str
- billing_project: Optional[str] = None
- location: Optional[str] = None
- job_id: Optional[str] = None
- request_id: Optional[str] = None
+ billing_project: str | None = None
+ location: str | None = None
+ job_id: str | None = None
+ request_id: str | None = None
@classmethod
- def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent):
+ def from_bqclient(
+ cls,
+ event: google.cloud.bigquery._job_helpers.QuerySentEvent,
+ ):
return cls(
query=event.query,
billing_project=event.billing_project,
@@ -157,13 +185,16 @@ class BigQueryRetryEvent(ExecutionRunning):
"""Query sent another time because the previous attempt failed."""
query: str
- billing_project: Optional[str] = None
- location: Optional[str] = None
- job_id: Optional[str] = None
- request_id: Optional[str] = None
+ billing_project: str | None = None
+ location: str | None = None
+ job_id: str | None = None
+ request_id: str | None = None
@classmethod
- def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent):
+ def from_bqclient(
+ cls,
+ event: google.cloud.bigquery._job_helpers.QueryRetryEvent,
+ ):
return cls(
query=event.query,
billing_project=event.billing_project,
@@ -177,19 +208,20 @@ def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent
class BigQueryReceivedEvent(ExecutionRunning):
"""Query received and acknowledged by the BigQuery API."""
- billing_project: Optional[str] = None
- location: Optional[str] = None
- job_id: Optional[str] = None
- statement_type: Optional[str] = None
- state: Optional[str] = None
- query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None
- created: Optional[datetime.datetime] = None
- started: Optional[datetime.datetime] = None
- ended: Optional[datetime.datetime] = None
+ billing_project: str | None = None
+ location: str | None = None
+ job_id: str | None = None
+ statement_type: str | None = None
+ state: str | None = None
+ query_plan: QueryPlanType = None
+ created: datetime.datetime | None = None
+ started: datetime.datetime | None = None
+ ended: datetime.datetime | None = None
@classmethod
def from_bqclient(
- cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent
+ cls,
+ event: google.cloud.bigquery._job_helpers.QueryReceivedEvent,
):
return cls(
billing_project=event.billing_project,
@@ -208,21 +240,22 @@ def from_bqclient(
class BigQueryFinishedEvent(ExecutionRunning):
"""Query finished successfully."""
- billing_project: Optional[str] = None
- location: Optional[str] = None
- query_id: Optional[str] = None
- job_id: Optional[str] = None
- destination: Optional[google.cloud.bigquery.table.TableReference] = None
- total_rows: Optional[int] = None
- total_bytes_processed: Optional[int] = None
- slot_millis: Optional[int] = None
- created: Optional[datetime.datetime] = None
- started: Optional[datetime.datetime] = None
- ended: Optional[datetime.datetime] = None
+ billing_project: str | None = None
+ location: str | None = None
+ query_id: str | None = None
+ job_id: str | None = None
+ destination: google.cloud.bigquery.table.TableReference | None = None
+ total_rows: int | None = None
+ total_bytes_processed: int | None = None
+ slot_millis: int | None = None
+ created: datetime.datetime | None = None
+ started: datetime.datetime | None = None
+ ended: datetime.datetime | None = None
@classmethod
def from_bqclient(
- cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent
+ cls,
+ event: google.cloud.bigquery._job_helpers.QueryFinishedEvent,
):
return cls(
billing_project=event.billing_project,
diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py
index 6e902a9a9802..9ab259519320 100644
--- a/packages/bigframes/bigframes/formatting_helpers.py
+++ b/packages/bigframes/bigframes/formatting_helpers.py
@@ -73,7 +73,10 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]):
if query_job is None:
return "No job information available"
if query_job.dry_run:
- return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}"
+ return (
+ f"Computation deferred. Computation will process "
+ f"{get_formatted_bytes(query_job.total_bytes_processed)}"
+ )
res = "Query Job Info"
for key, value in query_job_prop_pairs.items():
job_val = getattr(query_job, value)
@@ -107,11 +110,15 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]):
if query_job is None:
return "No job information available"
if query_job.dry_run:
- return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}"
+ return (
+ f"Computation deferred. Computation will process "
+ f"{get_formatted_bytes(query_job.total_bytes_processed)}"
+ )
# We can reuse the plaintext repr for now or make a nicer table.
- # For deferred mode consistency, let's just wrap the text in a pre block or similar,
- # but the request implies we want a distinct HTML representation if possible.
+ # For deferred mode consistency, let's just wrap the text in a pre
+ # block or similar, but the request implies we want a distinct HTML
+ # representation if possible.
# However, existing repr_query_job returns a simple string.
# Let's format it as a simple table or list.
@@ -125,7 +132,10 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]):
location=query_job.location,
job_id=query_job.job_id,
)
- res += f'
Job: {query_job.job_id}'
+ res += (
+ f'Job: '
+ f"{query_job.job_id}"
+ )
elif key == "Slot Time":
res += f"{key}: {get_formatted_time(job_val)}"
elif key == "Bytes Processed":
@@ -145,7 +155,7 @@ def create_progress_callback():
display_opts = bigframes._config.options.display
def progress_callback(
- event: bigframes.core.events.Event,
+ envelope: Any,
):
"""Displays a progress bar while the query is running"""
global current_display_id
@@ -159,7 +169,14 @@ def progress_callback(
# This will allow cleanup to continue.
return
- progress_bar = display_opts.progress_bar
+ # Publisher.publish automatically wraps raw Event objects in an
+ # EventEnvelope, ensuring subscribers receive a consistent contract.
+ assert isinstance(envelope, bigframes.core.events.EventEnvelope)
+ event = envelope.event
+ progress_bar = envelope.progress_bar
+
+ if progress_bar == bigframes.core.events._DEFAULT:
+ progress_bar = display_opts.progress_bar
if progress_bar == "auto":
progress_bar = "notebook" if in_ipython() else "terminal"
@@ -241,7 +258,8 @@ def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None):
job.result()
job.reload()
display.update_display(
- display.HTML(get_base_job_loading_html(job)), display_id=display_id
+ display.HTML(get_base_job_loading_html(job)),
+ display_id=display_id,
)
elif progress_bar == "terminal":
inital_loading_bar = get_base_job_loading_string(job)
@@ -295,7 +313,10 @@ def render_job_link_html(
job_id=job_id,
)
if job_url:
- job_link = f' [Job {project_id}:{location}.{job_id} details]'
+ job_link = (
+ f' ['
+ f"Job {project_id}:{location}.{job_id} details]"
+ )
else:
job_link = ""
return job_link
@@ -332,7 +353,10 @@ def get_job_url(
"""
if project_id is None or location is None or job_id is None:
return None
- return f"""https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"""
+ return (
+ f"https://console.cloud.google.com/bigquery?project={project_id}"
+ f"&j=bq:{location}:{job_id}&page=queryresults"
+ )
def render_bqquery_sent_event_html(
@@ -357,7 +381,10 @@ def render_bqquery_sent_event_html(
job_id=event.job_id,
request_id=event.request_id,
)
- query_text_details = f"SQL
{html.escape(event.query)} "
+ query_text_details = (
+ f"SQL
"
+ f"{html.escape(event.query)} "
+ )
return f"""
Query started{query_id}.{job_link}{query_text_details}
@@ -406,7 +433,10 @@ def render_bqquery_retry_event_html(
job_id=event.job_id,
request_id=event.request_id,
)
- query_text_details = f"SQL
{html.escape(event.query)} "
+ query_text_details = (
+ f"SQL
"
+ f"{html.escape(event.query)} "
+ )
return f"""
Retrying query{query_id}.{job_link}{query_text_details}
@@ -452,7 +482,10 @@ def render_bqquery_received_event_html(
query_plan_details = ""
if event.query_plan:
plan_str = "\n".join([str(entry) for entry in event.query_plan])
- query_plan_details = f"Query Plan
{html.escape(plan_str)} "
+ query_plan_details = (
+ f"Query Plan
"
+ f"{html.escape(plan_str)} "
+ )
return f"""
Query{query_id} is {event.state}.{job_link}{query_plan_details}
@@ -515,7 +548,8 @@ def render_bqquery_finished_event_plaintext(
bytes_str = ""
if event.total_bytes_processed is not None:
- bytes_str = f" {humanize.naturalsize(event.total_bytes_processed)} processed."
+ size_str = humanize.naturalsize(event.total_bytes_processed)
+ bytes_str = f" {size_str} processed."
slot_time_str = ""
if event.slot_millis is not None:
@@ -581,7 +615,8 @@ def get_formatted_time(val):
Duration string
"""
try:
- return humanize.naturaldelta(datetime.timedelta(milliseconds=float(val)))
+ delta = datetime.timedelta(milliseconds=float(val))
+ return humanize.naturaldelta(delta)
except Exception:
return val
@@ -600,7 +635,10 @@ def get_formatted_bytes(val):
def get_bytes_processed_string(val: Any):
- """Try to get bytes processed string. Return empty if passed non int value"""
+ """Try to get bytes processed string.
+
+ Return empty if passed non int value.
+ """
bytes_processed_string = ""
if isinstance(val, int):
bytes_processed_string = f"""{get_formatted_bytes(val)} processed. """
diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py
index 7cd7634b08e4..17534e59273d 100644
--- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py
+++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py
@@ -22,7 +22,14 @@
import textwrap
import types
import typing
-from typing import Dict, Iterable, Literal, Mapping, Optional, Tuple, Union, overload
+from typing import (
+ Dict,
+ Iterable,
+ Mapping,
+ Optional,
+ Tuple,
+ Union,
+)
import bigframes_vendored.google_cloud_bigquery.retry as third_party_gcb_retry
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
@@ -38,7 +45,10 @@
from bigframes.core.compile.sqlglot import sql as sg_sql
from bigframes.core.logging import log_adapter
-CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
+CHECK_DRIVE_PERMISSIONS = (
+ "\nCheck https://cloud.google.com/bigquery/docs/"
+ "query-drive-data#Google_Drive_permissions."
+)
IO_ORDERING_ID = "bqdf_row_nums"
@@ -80,7 +90,10 @@ def create_job_configs_labels(
def create_export_data_statement(
- table_id: str, uri: str, format: str, export_options: Dict[str, Union[bool, str]]
+ table_id: str,
+ uri: str,
+ format: str,
+ export_options: Dict[str, Union[bool, str]],
) -> str:
all_options: Dict[str, Union[bool, str]] = {
"uri": uri,
@@ -134,10 +147,10 @@ def create_temp_table(
if cluster_columns:
destination.clustering_fields = cluster_columns
if kms_key:
- destination.encryption_configuration = bigquery.EncryptionConfiguration(
- kms_key_name=kms_key
- )
- # Ok if already exists, since this will only happen from retries internal to this method
+ enc_config = bigquery.EncryptionConfiguration(kms_key_name=kms_key)
+ destination.encryption_configuration = enc_config
+ # Ok if already exists, since this will only happen from retries
+ # internal to this method
# as the requested table id has a random UUID4 component.
bqclient.create_table(destination, exists_ok=True)
return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
@@ -160,7 +173,8 @@ def create_temp_view(
destination.expires = expiration
destination.view_query = sql
- # Ok if already exists, since this will only happen from retries internal to this method
+ # Ok if already exists, since this will only happen from retries
+ # internal to this method
# as the requested table id has a random UUID4 component.
bqclient.create_table(destination, exists_ok=True)
return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
@@ -194,7 +208,10 @@ def bq_field_to_type_sql(field: bigquery.SchemaField):
if field.mode == "REPEATED":
nested_type = bq_field_to_type_sql(
bigquery.SchemaField(
- field.name, field.field_type, mode="NULLABLE", fields=field.fields
+ field.name,
+ field.field_type,
+ mode="NULLABLE",
+ fields=field.fields,
)
)
return f"ARRAY<{nested_type}>"
@@ -226,11 +243,14 @@ def format_option(key: str, value: Union[bool, str]) -> str:
def add_and_trim_labels(
- job_config, session=None, extra_query_labels: Optional[Mapping[str, str]] = None
+ job_config,
+ session=None,
+ extra_query_labels: Optional[Mapping[str, str]] = None,
):
"""
- Add additional labels to the job configuration and trim the total number of labels
- to ensure they do not exceed MAX_LABELS_COUNT labels per job.
+ Add additional labels to the job configuration and trim the total
+ number of labels to ensure they do not exceed MAX_LABELS_COUNT labels
+ per job.
"""
api_methods = log_adapter.get_and_reset_api_methods(
dry_run=job_config.dry_run, session=session
@@ -242,19 +262,31 @@ def add_and_trim_labels(
def create_bq_event_callback(publisher):
- def publish_bq_event(event):
- if isinstance(event, google.cloud.bigquery._job_helpers.QueryFinishedEvent):
- bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient(event)
- elif isinstance(event, google.cloud.bigquery._job_helpers.QueryReceivedEvent):
- bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient(event)
- elif isinstance(event, google.cloud.bigquery._job_helpers.QueryRetryEvent):
- bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient(event)
- elif isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent):
- bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient(event)
- else:
- bf_event = bigframes.core.events.BigQueryUnknownEvent(event)
+ event_map = {
+ google.cloud.bigquery._job_helpers.QueryFinishedEvent: (
+ bigframes.core.events.BigQueryFinishedEvent
+ ),
+ google.cloud.bigquery._job_helpers.QueryReceivedEvent: (
+ bigframes.core.events.BigQueryReceivedEvent
+ ),
+ google.cloud.bigquery._job_helpers.QueryRetryEvent: (
+ bigframes.core.events.BigQueryRetryEvent
+ ),
+ google.cloud.bigquery._job_helpers.QuerySentEvent: (
+ bigframes.core.events.BigQuerySentEvent
+ ),
+ }
- publisher.publish(bf_event)
+ def publish_bq_event(event):
+ bf_event = bigframes.core.events.BigQueryUnknownEvent(event)
+ for bq_type, bf_type in event_map.items():
+ if isinstance(event, bq_type):
+ bf_event = bf_type.from_bqclient(event) # type: ignore
+ break
+ envelope = bigframes.core.events.EventEnvelope(
+ event=bf_event, progress_bar=bigframes.core.events._DEFAULT
+ )
+ publisher.publish(envelope)
return publish_bq_event
@@ -272,7 +304,7 @@ def start_query_with_job(
# google-cloud-bigquery version with
# https://github.com/googleapis/python-bigquery/pull/2256 merged, likely
# version 3.36.0 or later.
- job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
+ job_retry: google.api_core.retry.Retry = (third_party_gcb_retry.DEFAULT_JOB_RETRY), # noqa: E501
publisher: bigframes.core.events.Publisher,
session=None,
) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
@@ -322,14 +354,15 @@ def start_query_job_optional(
# google-cloud-bigquery version with
# https://github.com/googleapis/python-bigquery/pull/2256 merged, likely
# version 3.36.0 or later.
- job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
+ job_retry: google.api_core.retry.Retry = (third_party_gcb_retry.DEFAULT_JOB_RETRY), # noqa: E501
publisher: bigframes.core.events.Publisher,
session=None,
) -> google.cloud.bigquery.table.RowIterator:
"""
Run a bigquery query, with job optional.
- See: https://docs.cloud.google.com/bigquery/docs/running-queries#optional-job-creation
+ See:
+ https://docs.cloud.google.com/bigquery/docs/running-queries#optional-job-creation
"""
add_and_trim_labels(job_config, session=session)
try:
@@ -389,7 +422,9 @@ def _publish_events(
def delete_tables_matching_session_id(
- client: bigquery.Client, dataset: bigquery.DatasetReference, session_id: str
+ client: bigquery.Client,
+ dataset: bigquery.DatasetReference,
+ session_id: str,
) -> None:
"""Searches within the dataset for tables conforming to the
expected session_id form, and instructs bigquery to delete them.
@@ -443,7 +478,8 @@ def create_bq_dataset_reference(
The project id of the project to create the dataset in.
Returns:
- bigquery.DatasetReference: The constructed reference to the anonymous dataset.
+ bigquery.DatasetReference: The constructed reference to the
+ anonymous dataset.
"""
job_config = google.cloud.bigquery.QueryJobConfig()
@@ -476,7 +512,8 @@ def is_query(query_or_table: str) -> bool:
def is_table_with_wildcard_suffix(query_or_table: str) -> bool:
- """Determine if `query_or_table` is a table and contains a wildcard suffix."""
+ """Determine if `query_or_table` is a table and contains a wildcard
+ suffix."""
return not is_query(query_or_table) and query_or_table.endswith("*")
@@ -492,7 +529,8 @@ def to_query(
from_item = f"({query_or_table})"
else:
# Table ID can have 1, 2, 3, or 4 parts. Quoting all parts to be safe.
- # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#identifiers
+ # See:
+ # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#identifiers
parts = query_or_table.split(".")
from_item = ".".join(f"`{part}`" for part in parts)
@@ -509,7 +547,7 @@ def to_query(
time_travel_clause = ""
if time_travel_timestamp is not None:
- time_travel_literal = sg_sql.to_sql(sg_sql.literal(time_travel_timestamp))
+ time_travel_literal = sg_sql.to_sql(sg_sql.literal(time_travel_timestamp)) # noqa: E501
time_travel_clause = f" FOR SYSTEM_TIME AS OF {time_travel_literal}"
limit_clause = ""
@@ -542,10 +580,11 @@ def compile_filters(filters: third_party_pandas_gbq.FiltersType) -> str:
"!=": "!=",
}
- # If single layer filter, add another pseudo layer. So the single layer represents "and" logic.
+ # If single layer filter, add another pseudo layer. So the single
+ # layer represents "and" logic.
filters_list: list = list(filters)
if isinstance(filters_list[0], tuple) and (
- len(filters_list[0]) == 0 or not isinstance(list(filters_list[0])[0], tuple)
+ len(filters_list[0]) == 0 or not isinstance(list(filters_list[0])[0], tuple) # noqa: E501
):
filter_items = [filters_list]
else:
@@ -559,14 +598,16 @@ def compile_filters(filters: third_party_pandas_gbq.FiltersType) -> str:
for filter_item in group:
if not isinstance(filter_item, tuple) or (len(filter_item) != 3):
raise ValueError(
- f"Elements of filters must be tuples of length 3, but got {repr(filter_item)}.",
+ f"Elements of filters must be tuples of length 3, "
+ f"but got {repr(filter_item)}.",
)
column, operator, value = filter_item
if not isinstance(column, str):
raise ValueError(
- f"Column name should be a string, but received '{column}' of type {type(column).__name__}."
+ f"Column name should be a string, but received "
+ f"'{column}' of type {type(column).__name__}."
)
if operator not in valid_operators:
diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py
index d2682bbcaf7f..3712cce80726 100644
--- a/packages/bigframes/bigframes/session/metrics.py
+++ b/packages/bigframes/bigframes/session/metrics.py
@@ -54,7 +54,9 @@ class JobMetadata:
@classmethod
def from_job(
- cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None
+ cls,
+ query_job: Union[QueryJob, LoadJob],
+ exec_seconds: Optional[float] = None,
) -> "JobMetadata":
query_text = getattr(query_job, "query", None)
if query_text and len(query_text) > 1024:
@@ -63,7 +65,11 @@ def from_job(
job_id = getattr(query_job, "job_id", None)
job_url = None
if job_id:
- job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults"
+ job_url = (
+ f"https://console.cloud.google.com/bigquery?"
+ f"project={query_job.project}&j=bq:{query_job.location}:"
+ f"{job_id}&page=queryresults"
+ )
metadata = cls(
job_id=query_job.job_id,
@@ -108,7 +114,9 @@ def from_job(
@classmethod
def from_row_iterator(
- cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None
+ cls,
+ row_iterator: bq_table.RowIterator,
+ exec_seconds: Optional[float] = None,
) -> "JobMetadata":
query_text = getattr(row_iterator, "query", None)
if query_text and len(query_text) > 1024:
@@ -119,8 +127,12 @@ def from_row_iterator(
if job_id:
project = getattr(row_iterator, "project", "")
location = getattr(row_iterator, "location", "")
- job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults"
+ job_url = (
+ f"https://console.cloud.google.com/bigquery?"
+ f"project={project}&j=bq:{location}:{job_id}&page=queryresults"
+ )
+ # fmt: off
return cls(
job_id=job_id,
query_id=getattr(row_iterator, "query_id", None),
@@ -131,13 +143,16 @@ def from_row_iterator(
end_time=getattr(row_iterator, "ended", None),
duration_seconds=exec_seconds,
status="DONE",
- total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None),
+ total_bytes_processed=getattr(
+ row_iterator, "total_bytes_processed", None
+ ),
total_slot_ms=getattr(row_iterator, "slot_millis", None),
job_type="query",
cached=getattr(row_iterator, "cache_hit", None),
query=query_text,
job_url=job_url,
)
+ # fmt: on
@dataclasses.dataclass
@@ -149,6 +164,7 @@ class ExecutionMetrics:
query_char_count: int = 0
jobs: list[JobMetadata] = dataclasses.field(default_factory=list)
+ # fmt: off
def count_job_stats(
self,
query_job: Optional[Union[QueryJob, LoadJob]] = None,
@@ -157,8 +173,11 @@ def count_job_stats(
if query_job is None:
assert row_iterator is not None
- # TODO(tswast): Pass None after making benchmark publishing robust to missing data.
- bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
+ # TODO(tswast): Pass None after making benchmark publishing robust
+ # to missing data.
+ bytes_processed = (
+ getattr(row_iterator, "total_bytes_processed", 0) or 0
+ )
query_char_count = len(getattr(row_iterator, "query", "") or "")
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
created = getattr(row_iterator, "created", None)
@@ -174,27 +193,40 @@ def count_job_stats(
self.execution_secs += exec_seconds
self.jobs.append(
- JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds)
+ JobMetadata.from_row_iterator(
+ row_iterator, exec_seconds=exec_seconds
+ )
)
- elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run:
+ elif (
+ isinstance(query_job, QueryJob)
+ and query_job.configuration.dry_run
+ ):
query_char_count = len(getattr(query_job, "query", ""))
- # TODO(tswast): Pass None after making benchmark publishing robust to missing data.
+ # TODO(tswast): Pass None after making benchmark publishing robust
+ # to missing data.
bytes_processed = 0
slot_millis = 0
exec_seconds = 0.0
elif isinstance(query_job, bigquery.QueryJob):
if (stats := get_performance_stats(query_job)) is not None:
- query_char_count, bytes_processed, slot_millis, exec_seconds = stats
+ (
+ query_char_count,
+ bytes_processed,
+ slot_millis,
+ exec_seconds,
+ ) = stats
self.execution_count += 1
self.query_char_count += query_char_count or 0
self.bytes_processed += bytes_processed or 0
self.slot_millis += slot_millis or 0
self.execution_secs += exec_seconds or 0
- metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds)
+ metadata = JobMetadata.from_job(
+ query_job, exec_seconds=exec_seconds
+ )
self.jobs.append(metadata)
else:
@@ -204,7 +236,9 @@ def count_job_stats(
if query_job.ended and query_job.created
else None
)
- self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration))
+ self.jobs.append(
+ JobMetadata.from_job(query_job, exec_seconds=duration)
+ )
# For pytest runs only, log information about the query job
# to a file in order to create a performance report.
@@ -221,7 +255,9 @@ def count_job_stats(
exec_seconds=stats[3],
)
elif row_iterator is not None:
- bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
+ bytes_processed = (
+ getattr(row_iterator, "total_bytes_processed", 0) or 0
+ )
query_char_count = len(getattr(row_iterator, "query", "") or "")
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
created = getattr(row_iterator, "created", None)
@@ -235,14 +271,20 @@ def count_job_stats(
slot_millis=slot_millis,
exec_seconds=exec_seconds,
)
+ # fmt: on
- def on_event(self, event: Any):
+ def on_event(self, envelope: Any):
try:
import bigframes.core.events
from bigframes.session.executor import LocalExecuteResult
except ImportError:
return
+ # Publisher.publish automatically wraps raw Event objects in an
+ # EventEnvelope, ensuring subscribers receive a consistent contract.
+ assert isinstance(envelope, bigframes.core.events.EventEnvelope)
+ event = envelope.event
+
if isinstance(event, bigframes.core.events.ExecutionFinished):
if event.result and isinstance(event.result, LocalExecuteResult):
self.execution_count += 1
diff --git a/packages/bigframes/noxfile.py b/packages/bigframes/noxfile.py
index 72dbccdc4f6f..0a60281bd35a 100644
--- a/packages/bigframes/noxfile.py
+++ b/packages/bigframes/noxfile.py
@@ -1048,7 +1048,7 @@ def mypy(session):
# Editable mode is not compatible with mypy when there are multiple
# package directories. See:
# https://github.com/python/mypy/issues/10564#issuecomment-851687749
- session.install(".")
+ session.install("--no-cache-dir", ".")
# Just install the dependencies' type info directly, since "mypy --install-types"
# might require an additional pass.
diff --git a/packages/bigframes/tests/system/small/test_progress_bar.py b/packages/bigframes/tests/system/small/test_progress_bar.py
index bc247f6078ce..a179e18332af 100644
--- a/packages/bigframes/tests/system/small/test_progress_bar.py
+++ b/packages/bigframes/tests/system/small/test_progress_bar.py
@@ -104,6 +104,23 @@ def test_progress_bar_load_jobs(
assert_loading_msg_exist(capsys.readouterr().out, pattern="Load")
+def test_progress_bar_uniqueness_check(session: bf.Session, capsys):
+ # Ensure strictly_ordered is True (default) to trigger uniqueness check
+ assert session._strictly_ordered
+
+ capsys.readouterr() # clear output
+
+ with bf.option_context("display.progress_bar", "terminal"):
+ # Read a table and specify a non-unique index_col to trigger the check.
+ # We use a public table to make it a "real" test.
+ session.read_gbq_table(
+ "bigquery-public-data.ml_datasets.penguins",
+ index_col="island",
+ )
+
+ assert_loading_msg_exist(capsys.readouterr().out)
+
+
def assert_loading_msg_exist(capstdout: str, pattern=job_load_message_regex):
num_loading_msg = 0
lines = capstdout.split("\n")
diff --git a/packages/bigframes/tests/unit/session/test_metrics.py b/packages/bigframes/tests/unit/session/test_metrics.py
index 296c6e96c5af..ebd6e210fbe2 100644
--- a/packages/bigframes/tests/unit/session/test_metrics.py
+++ b/packages/bigframes/tests/unit/session/test_metrics.py
@@ -251,12 +251,17 @@ def test_on_event_with_local_execute_result():
import bigframes.core.events
from bigframes.session.executor import LocalExecuteResult
- local_result = unittest.mock.create_autospec(LocalExecuteResult, instance=True)
+ # fmt: off
+ local_result = unittest.mock.create_autospec(
+ LocalExecuteResult, instance=True
+ )
+ # fmt: on
local_result.total_bytes_processed = 1024
event = bigframes.core.events.ExecutionFinished(result=local_result)
+ envelope = bigframes.core.events.EventEnvelope(event)
execution_metrics = metrics.ExecutionMetrics()
- execution_metrics.on_event(event)
+ execution_metrics.on_event(envelope)
assert execution_metrics.execution_count == 1
assert len(execution_metrics.jobs) == 1
diff --git a/packages/bigframes/tests/unit/test_formatting_helpers.py b/packages/bigframes/tests/unit/test_formatting_helpers.py
index ec681b36ab05..8917f540501d 100644
--- a/packages/bigframes/tests/unit/test_formatting_helpers.py
+++ b/packages/bigframes/tests/unit/test_formatting_helpers.py
@@ -212,3 +212,29 @@ def test_get_job_url():
job_id=job_id, location=location, project_id=project_id
)
assert actual_url == expected_url
+
+
+def test_progress_callback_falls_back_to_global():
+ event = bfevents.BigQuerySentEvent(
+ query="SELECT * FROM my_table",
+ )
+ envelope = bfevents.EventEnvelope(event=event, progress_bar=bfevents._DEFAULT)
+
+ with mock.patch("bigframes._config.options.display.progress_bar", "terminal"):
+ with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False):
+ with mock.patch("builtins.print") as mock_print:
+ formatting_helpers.create_progress_callback()(envelope)
+ mock_print.assert_called_once()
+
+
+def test_progress_callback_respects_envelope_progress_bar():
+ event = bfevents.BigQuerySentEvent(
+ query="SELECT * FROM my_table",
+ )
+ envelope = bfevents.EventEnvelope(event=event, progress_bar=None)
+
+ with mock.patch("bigframes._config.options.display.progress_bar", "terminal"):
+ with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False):
+ with mock.patch("builtins.print") as mock_print:
+ formatting_helpers.create_progress_callback()(envelope)
+ mock_print.assert_not_called()
diff --git a/packages/bigframes/third_party/bigframes_vendored/pandas/core/config_init.py b/packages/bigframes/third_party/bigframes_vendored/pandas/core/config_init.py
index 15425b6a9828..bd40d05154b9 100644
--- a/packages/bigframes/third_party/bigframes_vendored/pandas/core/config_init.py
+++ b/packages/bigframes/third_party/bigframes_vendored/pandas/core/config_init.py
@@ -1,4 +1,5 @@
-# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/config_init.py
+# Contains code from
+# https://github.com/pandas-dev/pandas/blob/main/pandas/core/config_init.py
"""
This module is imported from the pandas package __init__.py file
in order to ensure that the core.config options registered here will
@@ -33,7 +34,9 @@ class DisplayOptions:
>>> df.head(20) # will no longer run the job # doctest: +SKIP
Computation deferred. Computation will process 28.9 kB
- Users can also get a dry run of the job by accessing the query_job property before they've run the job. This will return a dry run instance of the job they can inspect.
+ Users can also get a dry run of the job by accessing the query_job
+ property before they've run the job. This will return a dry run
+ instance of the job they can inspect.
>>> df.query_job.total_bytes_processed # doctest: +SKIP
28947
@@ -56,7 +59,8 @@ class DisplayOptions:
or just remove it.
- Setting to default value "auto" will detect and show progress bar automatically.
+ Setting to default value "auto" will detect and show progress bar
+ automatically.
>>> bpd.options.display.progress_bar = "auto" # doctest: +SKIP
"""
@@ -99,7 +103,7 @@ class DisplayOptions:
"""
# Options unique to BigQuery DataFrames.
- progress_bar: Optional[str] = "auto"
+ progress_bar: Optional[Literal["auto", "notebook", "terminal"]] = "auto"
"""
Determines if progress bars are shown during job runs. Default "auto".
@@ -121,7 +125,8 @@ class DisplayOptions:
Dataframe and Series objects during repr.
`deferred`
- Prevent executions from repr statements in DataFrame and Series objects.
+ Prevent executions from repr statements in DataFrame and
+ Series objects.
Instead, estimated bytes processed will be shown. DataFrame and Series
objects can still be computed with methods that explicitly execute and
download results.
@@ -175,7 +180,8 @@ class DisplayOptions:
max_info_rows: Optional[int] = 200_000
"""
- Limit null check in ``df.info()`` only to frames with smaller dimensions than
+ Limit null check in ``df.info()`` only to frames with smaller
+ dimensions than
max_info_rows. Default 200,000.
df.info() will usually show null-counts for each column.