diff --git a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index 926f220370b0..e9e8435ea1a1 100644 --- a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -16,7 +16,7 @@ import functools import typing -from typing import cast, Any +from typing import Any, cast import bigframes_vendored.ibis.expr.api as ibis_api import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index ce01252b68b2..61831f4cc399 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -20,7 +20,7 @@ import datetime import threading import uuid -from typing import Any, Callable, Optional, Set +from typing import Any, Callable, Literal, Set import google.cloud.bigquery._job_helpers import google.cloud.bigquery.job.query @@ -28,9 +28,19 @@ import bigframes.session.executor +_DEFAULT: Literal["default"] = "default" + +ProgressBarType = Literal["default", "auto", "notebook", "terminal"] | None +QueryPlanType = list[google.cloud.bigquery.job.query.QueryPlanEntry] | None + class Subscriber: - def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): + def __init__( + self, + callback: Callable[[EventEnvelope], None], + *, + publisher: Publisher, + ): self._publisher = publisher self._callback = callback self._subscriber_id = uuid.uuid4() @@ -57,10 +67,12 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): if exc_value is not None: self( - UnknownErrorEvent( - exc_type=exc_type, - exc_value=exc_value, - traceback=traceback, + EventEnvelope( + UnknownErrorEvent( + exc_type=exc_type, + exc_value=exc_value, + traceback=traceback, + ) ) ) self.close() @@ -74,7 +86,10 @@ def __init__(self): concurrent.futures.ThreadPoolExecutor() ) - def subscribe(self, callback: Callable[[Event], None]) -> Subscriber: + def subscribe( + self, + callback: Callable[[EventEnvelope], None], + ) -> Subscriber: # TODO(b/448176657): figure out how to handle subscribers/publishers in # a background thread. Maybe subscribers should be thread-local? subscriber = Subscriber(callback, publisher=self) @@ -86,17 +101,21 @@ def unsubscribe(self, subscriber: Subscriber): with self._subscribers_lock: self._subscribers.remove(subscriber) - def publish(self, event: Event): + def publish(self, envelope: EventEnvelope | Event): + if not isinstance(envelope, EventEnvelope): + envelope = EventEnvelope(event=envelope) with self._subscribers_lock: for subscriber in self._subscribers: - subscriber(event) + subscriber(envelope) - async def publish_async(self, event: Event): + async def publish_async(self, envelope: EventEnvelope | Event): + if not isinstance(envelope, EventEnvelope): + envelope = EventEnvelope(event=envelope) with self._subscribers_lock: subscribers_snapshot = list(self._subscribers) loop = asyncio.get_running_loop() tasks = [ - loop.run_in_executor(self._executor, subscriber, event) + loop.run_in_executor(self._executor, subscriber, envelope) for subscriber in subscribers_snapshot ] return await asyncio.gather(*tasks, return_exceptions=True) @@ -106,6 +125,12 @@ class Event: pass +@dataclasses.dataclass(frozen=True) +class EventEnvelope: + event: Event + progress_bar: ProgressBarType = _DEFAULT + + @dataclasses.dataclass(frozen=True) class SessionClosed(Event): session_id: str @@ -121,7 +146,7 @@ class ExecutionRunning(Event): @dataclasses.dataclass(frozen=True) class ExecutionFinished(Event): - result: Optional[bigframes.session.executor.ExecuteResult] = None + result: bigframes.session.executor.ExecuteResult | None = None @dataclasses.dataclass(frozen=True) @@ -136,13 +161,16 @@ class BigQuerySentEvent(ExecutionRunning): """Query sent to BigQuery.""" query: str - billing_project: Optional[str] = None - location: Optional[str] = None - job_id: Optional[str] = None - request_id: Optional[str] = None + billing_project: str | None = None + location: str | None = None + job_id: str | None = None + request_id: str | None = None @classmethod - def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent): + def from_bqclient( + cls, + event: google.cloud.bigquery._job_helpers.QuerySentEvent, + ): return cls( query=event.query, billing_project=event.billing_project, @@ -157,13 +185,16 @@ class BigQueryRetryEvent(ExecutionRunning): """Query sent another time because the previous attempt failed.""" query: str - billing_project: Optional[str] = None - location: Optional[str] = None - job_id: Optional[str] = None - request_id: Optional[str] = None + billing_project: str | None = None + location: str | None = None + job_id: str | None = None + request_id: str | None = None @classmethod - def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent): + def from_bqclient( + cls, + event: google.cloud.bigquery._job_helpers.QueryRetryEvent, + ): return cls( query=event.query, billing_project=event.billing_project, @@ -177,19 +208,20 @@ def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent class BigQueryReceivedEvent(ExecutionRunning): """Query received and acknowledged by the BigQuery API.""" - billing_project: Optional[str] = None - location: Optional[str] = None - job_id: Optional[str] = None - statement_type: Optional[str] = None - state: Optional[str] = None - query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None - created: Optional[datetime.datetime] = None - started: Optional[datetime.datetime] = None - ended: Optional[datetime.datetime] = None + billing_project: str | None = None + location: str | None = None + job_id: str | None = None + statement_type: str | None = None + state: str | None = None + query_plan: QueryPlanType = None + created: datetime.datetime | None = None + started: datetime.datetime | None = None + ended: datetime.datetime | None = None @classmethod def from_bqclient( - cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent + cls, + event: google.cloud.bigquery._job_helpers.QueryReceivedEvent, ): return cls( billing_project=event.billing_project, @@ -208,21 +240,22 @@ def from_bqclient( class BigQueryFinishedEvent(ExecutionRunning): """Query finished successfully.""" - billing_project: Optional[str] = None - location: Optional[str] = None - query_id: Optional[str] = None - job_id: Optional[str] = None - destination: Optional[google.cloud.bigquery.table.TableReference] = None - total_rows: Optional[int] = None - total_bytes_processed: Optional[int] = None - slot_millis: Optional[int] = None - created: Optional[datetime.datetime] = None - started: Optional[datetime.datetime] = None - ended: Optional[datetime.datetime] = None + billing_project: str | None = None + location: str | None = None + query_id: str | None = None + job_id: str | None = None + destination: google.cloud.bigquery.table.TableReference | None = None + total_rows: int | None = None + total_bytes_processed: int | None = None + slot_millis: int | None = None + created: datetime.datetime | None = None + started: datetime.datetime | None = None + ended: datetime.datetime | None = None @classmethod def from_bqclient( - cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent + cls, + event: google.cloud.bigquery._job_helpers.QueryFinishedEvent, ): return cls( billing_project=event.billing_project, diff --git a/packages/bigframes/bigframes/formatting_helpers.py b/packages/bigframes/bigframes/formatting_helpers.py index 6e902a9a9802..9ab259519320 100644 --- a/packages/bigframes/bigframes/formatting_helpers.py +++ b/packages/bigframes/bigframes/formatting_helpers.py @@ -73,7 +73,10 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]): if query_job is None: return "No job information available" if query_job.dry_run: - return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}" + return ( + f"Computation deferred. Computation will process " + f"{get_formatted_bytes(query_job.total_bytes_processed)}" + ) res = "Query Job Info" for key, value in query_job_prop_pairs.items(): job_val = getattr(query_job, value) @@ -107,11 +110,15 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]): if query_job is None: return "No job information available" if query_job.dry_run: - return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}" + return ( + f"Computation deferred. Computation will process " + f"{get_formatted_bytes(query_job.total_bytes_processed)}" + ) # We can reuse the plaintext repr for now or make a nicer table. - # For deferred mode consistency, let's just wrap the text in a pre block or similar, - # but the request implies we want a distinct HTML representation if possible. + # For deferred mode consistency, let's just wrap the text in a pre + # block or similar, but the request implies we want a distinct HTML + # representation if possible. # However, existing repr_query_job returns a simple string. # Let's format it as a simple table or list. @@ -125,7 +132,10 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]): location=query_job.location, job_id=query_job.job_id, ) - res += f'
  • Job: {query_job.job_id}
  • ' + res += ( + f'
  • Job: ' + f"{query_job.job_id}
  • " + ) elif key == "Slot Time": res += f"
  • {key}: {get_formatted_time(job_val)}
  • " elif key == "Bytes Processed": @@ -145,7 +155,7 @@ def create_progress_callback(): display_opts = bigframes._config.options.display def progress_callback( - event: bigframes.core.events.Event, + envelope: Any, ): """Displays a progress bar while the query is running""" global current_display_id @@ -159,7 +169,14 @@ def progress_callback( # This will allow cleanup to continue. return - progress_bar = display_opts.progress_bar + # Publisher.publish automatically wraps raw Event objects in an + # EventEnvelope, ensuring subscribers receive a consistent contract. + assert isinstance(envelope, bigframes.core.events.EventEnvelope) + event = envelope.event + progress_bar = envelope.progress_bar + + if progress_bar == bigframes.core.events._DEFAULT: + progress_bar = display_opts.progress_bar if progress_bar == "auto": progress_bar = "notebook" if in_ipython() else "terminal" @@ -241,7 +258,8 @@ def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None): job.result() job.reload() display.update_display( - display.HTML(get_base_job_loading_html(job)), display_id=display_id + display.HTML(get_base_job_loading_html(job)), + display_id=display_id, ) elif progress_bar == "terminal": inital_loading_bar = get_base_job_loading_string(job) @@ -295,7 +313,10 @@ def render_job_link_html( job_id=job_id, ) if job_url: - job_link = f' [Job {project_id}:{location}.{job_id} details]' + job_link = ( + f' [' + f"Job {project_id}:{location}.{job_id} details]" + ) else: job_link = "" return job_link @@ -332,7 +353,10 @@ def get_job_url( """ if project_id is None or location is None or job_id is None: return None - return f"""https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults""" + return ( + f"https://console.cloud.google.com/bigquery?project={project_id}" + f"&j=bq:{location}:{job_id}&page=queryresults" + ) def render_bqquery_sent_event_html( @@ -357,7 +381,10 @@ def render_bqquery_sent_event_html( job_id=event.job_id, request_id=event.request_id, ) - query_text_details = f"
    SQL
    {html.escape(event.query)}
    " + query_text_details = ( + f"
    SQL
    "
    +        f"{html.escape(event.query)}
    " + ) return f""" Query started{query_id}.{job_link}{query_text_details} @@ -406,7 +433,10 @@ def render_bqquery_retry_event_html( job_id=event.job_id, request_id=event.request_id, ) - query_text_details = f"
    SQL
    {html.escape(event.query)}
    " + query_text_details = ( + f"
    SQL
    "
    +        f"{html.escape(event.query)}
    " + ) return f""" Retrying query{query_id}.{job_link}{query_text_details} @@ -452,7 +482,10 @@ def render_bqquery_received_event_html( query_plan_details = "" if event.query_plan: plan_str = "\n".join([str(entry) for entry in event.query_plan]) - query_plan_details = f"
    Query Plan
    {html.escape(plan_str)}
    " + query_plan_details = ( + f"
    Query Plan
    "
    +            f"{html.escape(plan_str)}
    " + ) return f""" Query{query_id} is {event.state}.{job_link}{query_plan_details} @@ -515,7 +548,8 @@ def render_bqquery_finished_event_plaintext( bytes_str = "" if event.total_bytes_processed is not None: - bytes_str = f" {humanize.naturalsize(event.total_bytes_processed)} processed." + size_str = humanize.naturalsize(event.total_bytes_processed) + bytes_str = f" {size_str} processed." slot_time_str = "" if event.slot_millis is not None: @@ -581,7 +615,8 @@ def get_formatted_time(val): Duration string """ try: - return humanize.naturaldelta(datetime.timedelta(milliseconds=float(val))) + delta = datetime.timedelta(milliseconds=float(val)) + return humanize.naturaldelta(delta) except Exception: return val @@ -600,7 +635,10 @@ def get_formatted_bytes(val): def get_bytes_processed_string(val: Any): - """Try to get bytes processed string. Return empty if passed non int value""" + """Try to get bytes processed string. + + Return empty if passed non int value. + """ bytes_processed_string = "" if isinstance(val, int): bytes_processed_string = f"""{get_formatted_bytes(val)} processed. """ diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index 7cd7634b08e4..17534e59273d 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -22,7 +22,14 @@ import textwrap import types import typing -from typing import Dict, Iterable, Literal, Mapping, Optional, Tuple, Union, overload +from typing import ( + Dict, + Iterable, + Mapping, + Optional, + Tuple, + Union, +) import bigframes_vendored.google_cloud_bigquery.retry as third_party_gcb_retry import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq @@ -38,7 +45,10 @@ from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.logging import log_adapter -CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." +CHECK_DRIVE_PERMISSIONS = ( + "\nCheck https://cloud.google.com/bigquery/docs/" + "query-drive-data#Google_Drive_permissions." +) IO_ORDERING_ID = "bqdf_row_nums" @@ -80,7 +90,10 @@ def create_job_configs_labels( def create_export_data_statement( - table_id: str, uri: str, format: str, export_options: Dict[str, Union[bool, str]] + table_id: str, + uri: str, + format: str, + export_options: Dict[str, Union[bool, str]], ) -> str: all_options: Dict[str, Union[bool, str]] = { "uri": uri, @@ -134,10 +147,10 @@ def create_temp_table( if cluster_columns: destination.clustering_fields = cluster_columns if kms_key: - destination.encryption_configuration = bigquery.EncryptionConfiguration( - kms_key_name=kms_key - ) - # Ok if already exists, since this will only happen from retries internal to this method + enc_config = bigquery.EncryptionConfiguration(kms_key_name=kms_key) + destination.encryption_configuration = enc_config + # Ok if already exists, since this will only happen from retries + # internal to this method # as the requested table id has a random UUID4 component. bqclient.create_table(destination, exists_ok=True) return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" @@ -160,7 +173,8 @@ def create_temp_view( destination.expires = expiration destination.view_query = sql - # Ok if already exists, since this will only happen from retries internal to this method + # Ok if already exists, since this will only happen from retries + # internal to this method # as the requested table id has a random UUID4 component. bqclient.create_table(destination, exists_ok=True) return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" @@ -194,7 +208,10 @@ def bq_field_to_type_sql(field: bigquery.SchemaField): if field.mode == "REPEATED": nested_type = bq_field_to_type_sql( bigquery.SchemaField( - field.name, field.field_type, mode="NULLABLE", fields=field.fields + field.name, + field.field_type, + mode="NULLABLE", + fields=field.fields, ) ) return f"ARRAY<{nested_type}>" @@ -226,11 +243,14 @@ def format_option(key: str, value: Union[bool, str]) -> str: def add_and_trim_labels( - job_config, session=None, extra_query_labels: Optional[Mapping[str, str]] = None + job_config, + session=None, + extra_query_labels: Optional[Mapping[str, str]] = None, ): """ - Add additional labels to the job configuration and trim the total number of labels - to ensure they do not exceed MAX_LABELS_COUNT labels per job. + Add additional labels to the job configuration and trim the total + number of labels to ensure they do not exceed MAX_LABELS_COUNT labels + per job. """ api_methods = log_adapter.get_and_reset_api_methods( dry_run=job_config.dry_run, session=session @@ -242,19 +262,31 @@ def add_and_trim_labels( def create_bq_event_callback(publisher): - def publish_bq_event(event): - if isinstance(event, google.cloud.bigquery._job_helpers.QueryFinishedEvent): - bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient(event) - elif isinstance(event, google.cloud.bigquery._job_helpers.QueryReceivedEvent): - bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient(event) - elif isinstance(event, google.cloud.bigquery._job_helpers.QueryRetryEvent): - bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient(event) - elif isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent): - bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient(event) - else: - bf_event = bigframes.core.events.BigQueryUnknownEvent(event) + event_map = { + google.cloud.bigquery._job_helpers.QueryFinishedEvent: ( + bigframes.core.events.BigQueryFinishedEvent + ), + google.cloud.bigquery._job_helpers.QueryReceivedEvent: ( + bigframes.core.events.BigQueryReceivedEvent + ), + google.cloud.bigquery._job_helpers.QueryRetryEvent: ( + bigframes.core.events.BigQueryRetryEvent + ), + google.cloud.bigquery._job_helpers.QuerySentEvent: ( + bigframes.core.events.BigQuerySentEvent + ), + } - publisher.publish(bf_event) + def publish_bq_event(event): + bf_event = bigframes.core.events.BigQueryUnknownEvent(event) + for bq_type, bf_type in event_map.items(): + if isinstance(event, bq_type): + bf_event = bf_type.from_bqclient(event) # type: ignore + break + envelope = bigframes.core.events.EventEnvelope( + event=bf_event, progress_bar=bigframes.core.events._DEFAULT + ) + publisher.publish(envelope) return publish_bq_event @@ -272,7 +304,7 @@ def start_query_with_job( # google-cloud-bigquery version with # https://github.com/googleapis/python-bigquery/pull/2256 merged, likely # version 3.36.0 or later. - job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY, + job_retry: google.api_core.retry.Retry = (third_party_gcb_retry.DEFAULT_JOB_RETRY), # noqa: E501 publisher: bigframes.core.events.Publisher, session=None, ) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: @@ -322,14 +354,15 @@ def start_query_job_optional( # google-cloud-bigquery version with # https://github.com/googleapis/python-bigquery/pull/2256 merged, likely # version 3.36.0 or later. - job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY, + job_retry: google.api_core.retry.Retry = (third_party_gcb_retry.DEFAULT_JOB_RETRY), # noqa: E501 publisher: bigframes.core.events.Publisher, session=None, ) -> google.cloud.bigquery.table.RowIterator: """ Run a bigquery query, with job optional. - See: https://docs.cloud.google.com/bigquery/docs/running-queries#optional-job-creation + See: + https://docs.cloud.google.com/bigquery/docs/running-queries#optional-job-creation """ add_and_trim_labels(job_config, session=session) try: @@ -389,7 +422,9 @@ def _publish_events( def delete_tables_matching_session_id( - client: bigquery.Client, dataset: bigquery.DatasetReference, session_id: str + client: bigquery.Client, + dataset: bigquery.DatasetReference, + session_id: str, ) -> None: """Searches within the dataset for tables conforming to the expected session_id form, and instructs bigquery to delete them. @@ -443,7 +478,8 @@ def create_bq_dataset_reference( The project id of the project to create the dataset in. Returns: - bigquery.DatasetReference: The constructed reference to the anonymous dataset. + bigquery.DatasetReference: The constructed reference to the + anonymous dataset. """ job_config = google.cloud.bigquery.QueryJobConfig() @@ -476,7 +512,8 @@ def is_query(query_or_table: str) -> bool: def is_table_with_wildcard_suffix(query_or_table: str) -> bool: - """Determine if `query_or_table` is a table and contains a wildcard suffix.""" + """Determine if `query_or_table` is a table and contains a wildcard + suffix.""" return not is_query(query_or_table) and query_or_table.endswith("*") @@ -492,7 +529,8 @@ def to_query( from_item = f"({query_or_table})" else: # Table ID can have 1, 2, 3, or 4 parts. Quoting all parts to be safe. - # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#identifiers + # See: + # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#identifiers parts = query_or_table.split(".") from_item = ".".join(f"`{part}`" for part in parts) @@ -509,7 +547,7 @@ def to_query( time_travel_clause = "" if time_travel_timestamp is not None: - time_travel_literal = sg_sql.to_sql(sg_sql.literal(time_travel_timestamp)) + time_travel_literal = sg_sql.to_sql(sg_sql.literal(time_travel_timestamp)) # noqa: E501 time_travel_clause = f" FOR SYSTEM_TIME AS OF {time_travel_literal}" limit_clause = "" @@ -542,10 +580,11 @@ def compile_filters(filters: third_party_pandas_gbq.FiltersType) -> str: "!=": "!=", } - # If single layer filter, add another pseudo layer. So the single layer represents "and" logic. + # If single layer filter, add another pseudo layer. So the single + # layer represents "and" logic. filters_list: list = list(filters) if isinstance(filters_list[0], tuple) and ( - len(filters_list[0]) == 0 or not isinstance(list(filters_list[0])[0], tuple) + len(filters_list[0]) == 0 or not isinstance(list(filters_list[0])[0], tuple) # noqa: E501 ): filter_items = [filters_list] else: @@ -559,14 +598,16 @@ def compile_filters(filters: third_party_pandas_gbq.FiltersType) -> str: for filter_item in group: if not isinstance(filter_item, tuple) or (len(filter_item) != 3): raise ValueError( - f"Elements of filters must be tuples of length 3, but got {repr(filter_item)}.", + f"Elements of filters must be tuples of length 3, " + f"but got {repr(filter_item)}.", ) column, operator, value = filter_item if not isinstance(column, str): raise ValueError( - f"Column name should be a string, but received '{column}' of type {type(column).__name__}." + f"Column name should be a string, but received " + f"'{column}' of type {type(column).__name__}." ) if operator not in valid_operators: diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py index d2682bbcaf7f..3712cce80726 100644 --- a/packages/bigframes/bigframes/session/metrics.py +++ b/packages/bigframes/bigframes/session/metrics.py @@ -54,7 +54,9 @@ class JobMetadata: @classmethod def from_job( - cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None + cls, + query_job: Union[QueryJob, LoadJob], + exec_seconds: Optional[float] = None, ) -> "JobMetadata": query_text = getattr(query_job, "query", None) if query_text and len(query_text) > 1024: @@ -63,7 +65,11 @@ def from_job( job_id = getattr(query_job, "job_id", None) job_url = None if job_id: - job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults" + job_url = ( + f"https://console.cloud.google.com/bigquery?" + f"project={query_job.project}&j=bq:{query_job.location}:" + f"{job_id}&page=queryresults" + ) metadata = cls( job_id=query_job.job_id, @@ -108,7 +114,9 @@ def from_job( @classmethod def from_row_iterator( - cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None + cls, + row_iterator: bq_table.RowIterator, + exec_seconds: Optional[float] = None, ) -> "JobMetadata": query_text = getattr(row_iterator, "query", None) if query_text and len(query_text) > 1024: @@ -119,8 +127,12 @@ def from_row_iterator( if job_id: project = getattr(row_iterator, "project", "") location = getattr(row_iterator, "location", "") - job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults" + job_url = ( + f"https://console.cloud.google.com/bigquery?" + f"project={project}&j=bq:{location}:{job_id}&page=queryresults" + ) + # fmt: off return cls( job_id=job_id, query_id=getattr(row_iterator, "query_id", None), @@ -131,13 +143,16 @@ def from_row_iterator( end_time=getattr(row_iterator, "ended", None), duration_seconds=exec_seconds, status="DONE", - total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None), + total_bytes_processed=getattr( + row_iterator, "total_bytes_processed", None + ), total_slot_ms=getattr(row_iterator, "slot_millis", None), job_type="query", cached=getattr(row_iterator, "cache_hit", None), query=query_text, job_url=job_url, ) + # fmt: on @dataclasses.dataclass @@ -149,6 +164,7 @@ class ExecutionMetrics: query_char_count: int = 0 jobs: list[JobMetadata] = dataclasses.field(default_factory=list) + # fmt: off def count_job_stats( self, query_job: Optional[Union[QueryJob, LoadJob]] = None, @@ -157,8 +173,11 @@ def count_job_stats( if query_job is None: assert row_iterator is not None - # TODO(tswast): Pass None after making benchmark publishing robust to missing data. - bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0 + # TODO(tswast): Pass None after making benchmark publishing robust + # to missing data. + bytes_processed = ( + getattr(row_iterator, "total_bytes_processed", 0) or 0 + ) query_char_count = len(getattr(row_iterator, "query", "") or "") slot_millis = getattr(row_iterator, "slot_millis", 0) or 0 created = getattr(row_iterator, "created", None) @@ -174,27 +193,40 @@ def count_job_stats( self.execution_secs += exec_seconds self.jobs.append( - JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds) + JobMetadata.from_row_iterator( + row_iterator, exec_seconds=exec_seconds + ) ) - elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run: + elif ( + isinstance(query_job, QueryJob) + and query_job.configuration.dry_run + ): query_char_count = len(getattr(query_job, "query", "")) - # TODO(tswast): Pass None after making benchmark publishing robust to missing data. + # TODO(tswast): Pass None after making benchmark publishing robust + # to missing data. bytes_processed = 0 slot_millis = 0 exec_seconds = 0.0 elif isinstance(query_job, bigquery.QueryJob): if (stats := get_performance_stats(query_job)) is not None: - query_char_count, bytes_processed, slot_millis, exec_seconds = stats + ( + query_char_count, + bytes_processed, + slot_millis, + exec_seconds, + ) = stats self.execution_count += 1 self.query_char_count += query_char_count or 0 self.bytes_processed += bytes_processed or 0 self.slot_millis += slot_millis or 0 self.execution_secs += exec_seconds or 0 - metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds) + metadata = JobMetadata.from_job( + query_job, exec_seconds=exec_seconds + ) self.jobs.append(metadata) else: @@ -204,7 +236,9 @@ def count_job_stats( if query_job.ended and query_job.created else None ) - self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration)) + self.jobs.append( + JobMetadata.from_job(query_job, exec_seconds=duration) + ) # For pytest runs only, log information about the query job # to a file in order to create a performance report. @@ -221,7 +255,9 @@ def count_job_stats( exec_seconds=stats[3], ) elif row_iterator is not None: - bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0 + bytes_processed = ( + getattr(row_iterator, "total_bytes_processed", 0) or 0 + ) query_char_count = len(getattr(row_iterator, "query", "") or "") slot_millis = getattr(row_iterator, "slot_millis", 0) or 0 created = getattr(row_iterator, "created", None) @@ -235,14 +271,20 @@ def count_job_stats( slot_millis=slot_millis, exec_seconds=exec_seconds, ) + # fmt: on - def on_event(self, event: Any): + def on_event(self, envelope: Any): try: import bigframes.core.events from bigframes.session.executor import LocalExecuteResult except ImportError: return + # Publisher.publish automatically wraps raw Event objects in an + # EventEnvelope, ensuring subscribers receive a consistent contract. + assert isinstance(envelope, bigframes.core.events.EventEnvelope) + event = envelope.event + if isinstance(event, bigframes.core.events.ExecutionFinished): if event.result and isinstance(event.result, LocalExecuteResult): self.execution_count += 1 diff --git a/packages/bigframes/noxfile.py b/packages/bigframes/noxfile.py index 72dbccdc4f6f..0a60281bd35a 100644 --- a/packages/bigframes/noxfile.py +++ b/packages/bigframes/noxfile.py @@ -1048,7 +1048,7 @@ def mypy(session): # Editable mode is not compatible with mypy when there are multiple # package directories. See: # https://github.com/python/mypy/issues/10564#issuecomment-851687749 - session.install(".") + session.install("--no-cache-dir", ".") # Just install the dependencies' type info directly, since "mypy --install-types" # might require an additional pass. diff --git a/packages/bigframes/tests/system/small/test_progress_bar.py b/packages/bigframes/tests/system/small/test_progress_bar.py index bc247f6078ce..a179e18332af 100644 --- a/packages/bigframes/tests/system/small/test_progress_bar.py +++ b/packages/bigframes/tests/system/small/test_progress_bar.py @@ -104,6 +104,23 @@ def test_progress_bar_load_jobs( assert_loading_msg_exist(capsys.readouterr().out, pattern="Load") +def test_progress_bar_uniqueness_check(session: bf.Session, capsys): + # Ensure strictly_ordered is True (default) to trigger uniqueness check + assert session._strictly_ordered + + capsys.readouterr() # clear output + + with bf.option_context("display.progress_bar", "terminal"): + # Read a table and specify a non-unique index_col to trigger the check. + # We use a public table to make it a "real" test. + session.read_gbq_table( + "bigquery-public-data.ml_datasets.penguins", + index_col="island", + ) + + assert_loading_msg_exist(capsys.readouterr().out) + + def assert_loading_msg_exist(capstdout: str, pattern=job_load_message_regex): num_loading_msg = 0 lines = capstdout.split("\n") diff --git a/packages/bigframes/tests/unit/session/test_metrics.py b/packages/bigframes/tests/unit/session/test_metrics.py index 296c6e96c5af..ebd6e210fbe2 100644 --- a/packages/bigframes/tests/unit/session/test_metrics.py +++ b/packages/bigframes/tests/unit/session/test_metrics.py @@ -251,12 +251,17 @@ def test_on_event_with_local_execute_result(): import bigframes.core.events from bigframes.session.executor import LocalExecuteResult - local_result = unittest.mock.create_autospec(LocalExecuteResult, instance=True) + # fmt: off + local_result = unittest.mock.create_autospec( + LocalExecuteResult, instance=True + ) + # fmt: on local_result.total_bytes_processed = 1024 event = bigframes.core.events.ExecutionFinished(result=local_result) + envelope = bigframes.core.events.EventEnvelope(event) execution_metrics = metrics.ExecutionMetrics() - execution_metrics.on_event(event) + execution_metrics.on_event(envelope) assert execution_metrics.execution_count == 1 assert len(execution_metrics.jobs) == 1 diff --git a/packages/bigframes/tests/unit/test_formatting_helpers.py b/packages/bigframes/tests/unit/test_formatting_helpers.py index ec681b36ab05..8917f540501d 100644 --- a/packages/bigframes/tests/unit/test_formatting_helpers.py +++ b/packages/bigframes/tests/unit/test_formatting_helpers.py @@ -212,3 +212,29 @@ def test_get_job_url(): job_id=job_id, location=location, project_id=project_id ) assert actual_url == expected_url + + +def test_progress_callback_falls_back_to_global(): + event = bfevents.BigQuerySentEvent( + query="SELECT * FROM my_table", + ) + envelope = bfevents.EventEnvelope(event=event, progress_bar=bfevents._DEFAULT) + + with mock.patch("bigframes._config.options.display.progress_bar", "terminal"): + with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False): + with mock.patch("builtins.print") as mock_print: + formatting_helpers.create_progress_callback()(envelope) + mock_print.assert_called_once() + + +def test_progress_callback_respects_envelope_progress_bar(): + event = bfevents.BigQuerySentEvent( + query="SELECT * FROM my_table", + ) + envelope = bfevents.EventEnvelope(event=event, progress_bar=None) + + with mock.patch("bigframes._config.options.display.progress_bar", "terminal"): + with mock.patch("bigframes.formatting_helpers.in_ipython", return_value=False): + with mock.patch("builtins.print") as mock_print: + formatting_helpers.create_progress_callback()(envelope) + mock_print.assert_not_called() diff --git a/packages/bigframes/third_party/bigframes_vendored/pandas/core/config_init.py b/packages/bigframes/third_party/bigframes_vendored/pandas/core/config_init.py index 15425b6a9828..bd40d05154b9 100644 --- a/packages/bigframes/third_party/bigframes_vendored/pandas/core/config_init.py +++ b/packages/bigframes/third_party/bigframes_vendored/pandas/core/config_init.py @@ -1,4 +1,5 @@ -# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/config_init.py +# Contains code from +# https://github.com/pandas-dev/pandas/blob/main/pandas/core/config_init.py """ This module is imported from the pandas package __init__.py file in order to ensure that the core.config options registered here will @@ -33,7 +34,9 @@ class DisplayOptions: >>> df.head(20) # will no longer run the job # doctest: +SKIP Computation deferred. Computation will process 28.9 kB - Users can also get a dry run of the job by accessing the query_job property before they've run the job. This will return a dry run instance of the job they can inspect. + Users can also get a dry run of the job by accessing the query_job + property before they've run the job. This will return a dry run + instance of the job they can inspect. >>> df.query_job.total_bytes_processed # doctest: +SKIP 28947 @@ -56,7 +59,8 @@ class DisplayOptions: or just remove it. - Setting to default value "auto" will detect and show progress bar automatically. + Setting to default value "auto" will detect and show progress bar + automatically. >>> bpd.options.display.progress_bar = "auto" # doctest: +SKIP """ @@ -99,7 +103,7 @@ class DisplayOptions: """ # Options unique to BigQuery DataFrames. - progress_bar: Optional[str] = "auto" + progress_bar: Optional[Literal["auto", "notebook", "terminal"]] = "auto" """ Determines if progress bars are shown during job runs. Default "auto". @@ -121,7 +125,8 @@ class DisplayOptions: Dataframe and Series objects during repr. `deferred` - Prevent executions from repr statements in DataFrame and Series objects. + Prevent executions from repr statements in DataFrame and + Series objects. Instead, estimated bytes processed will be shown. DataFrame and Series objects can still be computed with methods that explicitly execute and download results. @@ -175,7 +180,8 @@ class DisplayOptions: max_info_rows: Optional[int] = 200_000 """ - Limit null check in ``df.info()`` only to frames with smaller dimensions than + Limit null check in ``df.info()`` only to frames with smaller + dimensions than max_info_rows. Default 200,000. df.info() will usually show null-counts for each column.