Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
d887714
fix: Respect display.progress_bar=None in background threads
shuoweil Apr 17, 2026
42e4a50
refactor: Refactor BQ event progress bar config
shuoweil Apr 17, 2026
d6757bc
Refactor BQ event progress bar config and add system test
shuoweil Apr 17, 2026
3e8ddde
refactor: refactor code
shuoweil Apr 17, 2026
be825b5
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil Apr 20, 2026
96aaff3
docs: add ignore
shuoweil Apr 20, 2026
bfba719
format file
shuoweil Apr 20, 2026
4ffd647
format file
shuoweil Apr 20, 2026
ffbc397
Roll back .pre-commit-config.yaml changes
shuoweil Apr 20, 2026
27010d3
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil Apr 22, 2026
a6c42af
chore: format files
shuoweil Apr 22, 2026
9742165
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil Apr 29, 2026
cdcbf0c
rename _FALLBACK_TO_GLOBAL to _DEFAULT
shuoweil Apr 29, 2026
c20751b
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 1, 2026
a139c65
format: format code
shuoweil May 1, 2026
85bf48f
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 8, 2026
67c16c6
Apply review comments in events.py
shuoweil May 8, 2026
13b75e7
format code
shuoweil May 8, 2026
7b9f249
format code
shuoweil May 8, 2026
dd69a8d
fix mypy
shuoweil May 8, 2026
a3e4ac5
update code for lint
shuoweil May 8, 2026
f393a0d
format code
shuoweil May 8, 2026
193d33c
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 11, 2026
d001a9b
refactor: remove progress_bar from events
shuoweil May 11, 2026
6377840
refactor: use EventEnvelope for progress_bar context
shuoweil May 11, 2026
5584531
style: fix line length lints in formatting_helpers and __init__
shuoweil May 11, 2026
a7d0171
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 11, 2026
d372472
format file
shuoweil May 12, 2026
8879473
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 12, 2026
457cf84
fix lint
shuoweil May 12, 2026
2339d17
format code
shuoweil May 12, 2026
44dd6de
format code
shuoweil May 12, 2026
5e4dcd8
reformat stubs and fix flake8 E704
shuoweil May 12, 2026
26420bf
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 13, 2026
e46c163
chore: add clarifying comment for on_event envelope check
shuoweil May 13, 2026
27bba29
chore: add clarifying comment for formatting_helpers envelope check
shuoweil May 13, 2026
7f8d621
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 14, 2026
aa911cc
Refactor Publisher to automatically wrap Event in EventEnvelope
shuoweil May 14, 2026
b9726f4
style: remove flake8 noqa comments
shuoweil May 14, 2026
8a42ceb
style: remove flake8 noqa and wrap long lines in metrics.py
shuoweil May 14, 2026
20dd16e
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 14, 2026
6d8b079
fix tests
shuoweil May 14, 2026
929c053
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 14, 2026
33a7298
test: fix test_formatting_helpers after async executor merge
shuoweil May 14, 2026
402b88f
refactor: align Executor base class and TestExecutor with async subcl…
shuoweil May 14, 2026
a5dbf82
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 14, 2026
4b536fd
style: format codebase with ruff
shuoweil May 14, 2026
4356c59
refactor: restore Executor.execute to synchronous def to match public…
shuoweil May 14, 2026
40d241e
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 15, 2026
49f3243
Merge branch 'main' into shuowei-anywidget-extraneous-output
shuoweil May 15, 2026
acfcedf
fix: respect thread-local display options in background events
shuoweil May 16, 2026
7400097
format code
shuoweil May 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import functools
import typing
from typing import cast, Any
from typing import Any, cast

import bigframes_vendored.ibis.expr.api as ibis_api
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
Expand Down
121 changes: 77 additions & 44 deletions packages/bigframes/bigframes/core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,27 @@
import datetime
import threading
import uuid
from typing import Any, Callable, Optional, Set
from typing import Any, Callable, Literal, Set

import google.cloud.bigquery._job_helpers
import google.cloud.bigquery.job.query
import google.cloud.bigquery.table

import bigframes.session.executor

_DEFAULT: Literal["default"] = "default"

ProgressBarType = Literal["default", "auto", "notebook", "terminal"] | None
QueryPlanType = list[google.cloud.bigquery.job.query.QueryPlanEntry] | None


class Subscriber:
def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher):
def __init__(
self,
callback: Callable[[EventEnvelope], None],
*,
publisher: Publisher,
):
self._publisher = publisher
self._callback = callback
self._subscriber_id = uuid.uuid4()
Expand All @@ -57,10 +67,12 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
if exc_value is not None:
self(
UnknownErrorEvent(
exc_type=exc_type,
exc_value=exc_value,
traceback=traceback,
EventEnvelope(
UnknownErrorEvent(
exc_type=exc_type,
exc_value=exc_value,
traceback=traceback,
)
)
)
self.close()
Expand All @@ -74,7 +86,10 @@ def __init__(self):
concurrent.futures.ThreadPoolExecutor()
)

def subscribe(self, callback: Callable[[Event], None]) -> Subscriber:
def subscribe(
self,
callback: Callable[[EventEnvelope], None],
) -> Subscriber:
# TODO(b/448176657): figure out how to handle subscribers/publishers in
# a background thread. Maybe subscribers should be thread-local?
subscriber = Subscriber(callback, publisher=self)
Expand All @@ -86,17 +101,21 @@ def unsubscribe(self, subscriber: Subscriber):
with self._subscribers_lock:
self._subscribers.remove(subscriber)

def publish(self, event: Event):
def publish(self, envelope: EventEnvelope | Event):
if not isinstance(envelope, EventEnvelope):
envelope = EventEnvelope(event=envelope)
with self._subscribers_lock:
for subscriber in self._subscribers:
subscriber(event)
subscriber(envelope)

async def publish_async(self, event: Event):
async def publish_async(self, envelope: EventEnvelope | Event):
if not isinstance(envelope, EventEnvelope):
envelope = EventEnvelope(event=envelope)
with self._subscribers_lock:
subscribers_snapshot = list(self._subscribers)
loop = asyncio.get_running_loop()
tasks = [
loop.run_in_executor(self._executor, subscriber, event)
loop.run_in_executor(self._executor, subscriber, envelope)
for subscriber in subscribers_snapshot
]
return await asyncio.gather(*tasks, return_exceptions=True)
Expand All @@ -106,6 +125,12 @@ class Event:
pass


@dataclasses.dataclass(frozen=True)
class EventEnvelope:
event: Event
progress_bar: ProgressBarType = _DEFAULT


@dataclasses.dataclass(frozen=True)
class SessionClosed(Event):
session_id: str
Expand All @@ -121,7 +146,7 @@ class ExecutionRunning(Event):

@dataclasses.dataclass(frozen=True)
class ExecutionFinished(Event):
result: Optional[bigframes.session.executor.ExecuteResult] = None
result: bigframes.session.executor.ExecuteResult | None = None


@dataclasses.dataclass(frozen=True)
Expand All @@ -136,13 +161,16 @@ class BigQuerySentEvent(ExecutionRunning):
"""Query sent to BigQuery."""

query: str
billing_project: Optional[str] = None
location: Optional[str] = None
job_id: Optional[str] = None
request_id: Optional[str] = None
billing_project: str | None = None
location: str | None = None
job_id: str | None = None
request_id: str | None = None

@classmethod
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent):
def from_bqclient(
cls,
event: google.cloud.bigquery._job_helpers.QuerySentEvent,
):
return cls(
query=event.query,
billing_project=event.billing_project,
Expand All @@ -157,13 +185,16 @@ class BigQueryRetryEvent(ExecutionRunning):
"""Query sent another time because the previous attempt failed."""

query: str
billing_project: Optional[str] = None
location: Optional[str] = None
job_id: Optional[str] = None
request_id: Optional[str] = None
billing_project: str | None = None
location: str | None = None
job_id: str | None = None
request_id: str | None = None

@classmethod
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent):
def from_bqclient(
cls,
event: google.cloud.bigquery._job_helpers.QueryRetryEvent,
):
return cls(
query=event.query,
billing_project=event.billing_project,
Expand All @@ -177,19 +208,20 @@ def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent
class BigQueryReceivedEvent(ExecutionRunning):
"""Query received and acknowledged by the BigQuery API."""

billing_project: Optional[str] = None
location: Optional[str] = None
job_id: Optional[str] = None
statement_type: Optional[str] = None
state: Optional[str] = None
query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None
created: Optional[datetime.datetime] = None
started: Optional[datetime.datetime] = None
ended: Optional[datetime.datetime] = None
billing_project: str | None = None
location: str | None = None
job_id: str | None = None
statement_type: str | None = None
state: str | None = None
query_plan: QueryPlanType = None
created: datetime.datetime | None = None
started: datetime.datetime | None = None
ended: datetime.datetime | None = None

@classmethod
def from_bqclient(
cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent
cls,
event: google.cloud.bigquery._job_helpers.QueryReceivedEvent,
):
return cls(
billing_project=event.billing_project,
Expand All @@ -208,21 +240,22 @@ def from_bqclient(
class BigQueryFinishedEvent(ExecutionRunning):
"""Query finished successfully."""

billing_project: Optional[str] = None
location: Optional[str] = None
query_id: Optional[str] = None
job_id: Optional[str] = None
destination: Optional[google.cloud.bigquery.table.TableReference] = None
total_rows: Optional[int] = None
total_bytes_processed: Optional[int] = None
slot_millis: Optional[int] = None
created: Optional[datetime.datetime] = None
started: Optional[datetime.datetime] = None
ended: Optional[datetime.datetime] = None
billing_project: str | None = None
location: str | None = None
query_id: str | None = None
job_id: str | None = None
destination: google.cloud.bigquery.table.TableReference | None = None
total_rows: int | None = None
total_bytes_processed: int | None = None
slot_millis: int | None = None
created: datetime.datetime | None = None
started: datetime.datetime | None = None
ended: datetime.datetime | None = None

@classmethod
def from_bqclient(
cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent
cls,
event: google.cloud.bigquery._job_helpers.QueryFinishedEvent,
):
return cls(
billing_project=event.billing_project,
Expand Down
70 changes: 54 additions & 16 deletions packages/bigframes/bigframes/formatting_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]):
if query_job is None:
return "No job information available"
if query_job.dry_run:
return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}"
return (
f"Computation deferred. Computation will process "
f"{get_formatted_bytes(query_job.total_bytes_processed)}"
)
res = "Query Job Info"
for key, value in query_job_prop_pairs.items():
job_val = getattr(query_job, value)
Expand Down Expand Up @@ -107,11 +110,15 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]):
if query_job is None:
return "No job information available"
if query_job.dry_run:
return f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}"
return (
f"Computation deferred. Computation will process "
f"{get_formatted_bytes(query_job.total_bytes_processed)}"
)

# We can reuse the plaintext repr for now or make a nicer table.
# For deferred mode consistency, let's just wrap the text in a pre block or similar,
# but the request implies we want a distinct HTML representation if possible.
# For deferred mode consistency, let's just wrap the text in a pre
# block or similar, but the request implies we want a distinct HTML
# representation if possible.
# However, existing repr_query_job returns a simple string.
# Let's format it as a simple table or list.

Expand All @@ -125,7 +132,10 @@ def repr_query_job_html(query_job: Optional[bigquery.QueryJob]):
location=query_job.location,
job_id=query_job.job_id,
)
res += f'<li>Job: <a target="_blank" href="{url}">{query_job.job_id}</a></li>'
res += (
f'<li>Job: <a target="_blank" href="{url}">'
f"{query_job.job_id}</a></li>"
)
elif key == "Slot Time":
res += f"<li>{key}: {get_formatted_time(job_val)}</li>"
elif key == "Bytes Processed":
Expand All @@ -145,7 +155,7 @@ def create_progress_callback():
display_opts = bigframes._config.options.display

def progress_callback(
event: bigframes.core.events.Event,
envelope: Any,
):
"""Displays a progress bar while the query is running"""
global current_display_id
Expand All @@ -159,7 +169,14 @@ def progress_callback(
# This will allow cleanup to continue.
return

progress_bar = display_opts.progress_bar
# Publisher.publish automatically wraps raw Event objects in an
# EventEnvelope, ensuring subscribers receive a consistent contract.
assert isinstance(envelope, bigframes.core.events.EventEnvelope)
event = envelope.event
progress_bar = envelope.progress_bar

if progress_bar == bigframes.core.events._DEFAULT:
progress_bar = display_opts.progress_bar

if progress_bar == "auto":
progress_bar = "notebook" if in_ipython() else "terminal"
Expand Down Expand Up @@ -241,7 +258,8 @@ def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None):
job.result()
job.reload()
display.update_display(
display.HTML(get_base_job_loading_html(job)), display_id=display_id
display.HTML(get_base_job_loading_html(job)),
display_id=display_id,
)
elif progress_bar == "terminal":
inital_loading_bar = get_base_job_loading_string(job)
Expand Down Expand Up @@ -295,7 +313,10 @@ def render_job_link_html(
job_id=job_id,
)
if job_url:
job_link = f' [<a target="_blank" href="{job_url}">Job {project_id}:{location}.{job_id} details</a>]'
job_link = (
f' [<a target="_blank" href="{job_url}">'
f"Job {project_id}:{location}.{job_id} details</a>]"
)
else:
job_link = ""
return job_link
Expand Down Expand Up @@ -332,7 +353,10 @@ def get_job_url(
"""
if project_id is None or location is None or job_id is None:
return None
return f"""https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"""
return (
f"https://console.cloud.google.com/bigquery?project={project_id}"
f"&j=bq:{location}:{job_id}&page=queryresults"
)


def render_bqquery_sent_event_html(
Expand All @@ -357,7 +381,10 @@ def render_bqquery_sent_event_html(
job_id=event.job_id,
request_id=event.request_id,
)
query_text_details = f"<details><summary>SQL</summary><pre>{html.escape(event.query)}</pre></details>"
query_text_details = (
f"<details><summary>SQL</summary><pre>"
f"{html.escape(event.query)}</pre></details>"
)

return f"""
Query started{query_id}.{job_link}{query_text_details}
Expand Down Expand Up @@ -406,7 +433,10 @@ def render_bqquery_retry_event_html(
job_id=event.job_id,
request_id=event.request_id,
)
query_text_details = f"<details><summary>SQL</summary><pre>{html.escape(event.query)}</pre></details>"
query_text_details = (
f"<details><summary>SQL</summary><pre>"
f"{html.escape(event.query)}</pre></details>"
)

return f"""
Retrying query{query_id}.{job_link}{query_text_details}
Expand Down Expand Up @@ -452,7 +482,10 @@ def render_bqquery_received_event_html(
query_plan_details = ""
if event.query_plan:
plan_str = "\n".join([str(entry) for entry in event.query_plan])
query_plan_details = f"<details><summary>Query Plan</summary><pre>{html.escape(plan_str)}</pre></details>"
query_plan_details = (
f"<details><summary>Query Plan</summary><pre>"
f"{html.escape(plan_str)}</pre></details>"
)

return f"""
Query{query_id} is {event.state}.{job_link}{query_plan_details}
Expand Down Expand Up @@ -515,7 +548,8 @@ def render_bqquery_finished_event_plaintext(

bytes_str = ""
if event.total_bytes_processed is not None:
bytes_str = f" {humanize.naturalsize(event.total_bytes_processed)} processed."
size_str = humanize.naturalsize(event.total_bytes_processed)
bytes_str = f" {size_str} processed."

slot_time_str = ""
if event.slot_millis is not None:
Expand Down Expand Up @@ -581,7 +615,8 @@ def get_formatted_time(val):
Duration string
"""
try:
return humanize.naturaldelta(datetime.timedelta(milliseconds=float(val)))
delta = datetime.timedelta(milliseconds=float(val))
return humanize.naturaldelta(delta)
except Exception:
return val

Expand All @@ -600,7 +635,10 @@ def get_formatted_bytes(val):


def get_bytes_processed_string(val: Any):
"""Try to get bytes processed string. Return empty if passed non int value"""
"""Try to get bytes processed string.

Return empty if passed non int value.
"""
bytes_processed_string = ""
if isinstance(val, int):
bytes_processed_string = f"""{get_formatted_bytes(val)} processed. """
Expand Down
Loading
Loading