From 44c983d5714aecec02a5b18aaa94f79db7240dc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kasia=20Strza=C5=82kowska?= Date: Tue, 18 Nov 2025 11:24:51 +0100 Subject: [PATCH 01/15] feat: Add async sample for incrementing a value --- .../snippets/writes/write_increment_async.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 samples/snippets/writes/write_increment_async.py diff --git a/samples/snippets/writes/write_increment_async.py b/samples/snippets/writes/write_increment_async.py new file mode 100644 index 000000000..cc1c047bd --- /dev/null +++ b/samples/snippets/writes/write_increment_async.py @@ -0,0 +1,30 @@ +# [START bigtable_write_increment_async] +from google.cloud.bigtable.data import BigtableDataClientAsync +from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry +from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup + +async def write_increment_async(project_id, instance_id, table_id): + """Increments a value in a Bigtable table using the async client.""" + async with BigtableDataClientAsync(project=project_id) as client: + table = client.get_table(instance_id, table_id) + row_key = "unique_device_ids_1" + try: + async with table.mutations_batcher() as batcher: + # The AddToCell mutation increments the value of a cell. + # The value must be a positive or negative integer. + reading = AddToCell( + family="counters", + qualifier="odometer", + value=32304 + ) + await batcher.append(RowMutationEntry(row_key.encode("utf-8"), [reading])) + except MutationsExceptionGroup as e: + # MutationsExceptionGroup contains a FailedMutationEntryError for + # each mutation that failed. + for sub_exception in e.exceptions: + failed_entry: RowMutationEntry = sub_exception.entry + cause: Exception = sub_exception.__cause__ + print( + f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}" + ) +# [END bigtable_write_increment_async] From 291d3ea4907298c83a5d1db7ddafb43990d99800 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kasia=20Strza=C5=82kowska?= Date: Tue, 18 Nov 2025 11:24:51 +0100 Subject: [PATCH 02/15] feat: Add async sample for incrementing a value --- .../snippets/writes/write_increment_async.py | 30 +++++++++++++++++++ samples/snippets/writes/writes_test.py | 12 +++++++- 2 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 samples/snippets/writes/write_increment_async.py diff --git a/samples/snippets/writes/write_increment_async.py b/samples/snippets/writes/write_increment_async.py new file mode 100644 index 000000000..cc1c047bd --- /dev/null +++ b/samples/snippets/writes/write_increment_async.py @@ -0,0 +1,30 @@ +# [START bigtable_write_increment_async] +from google.cloud.bigtable.data import BigtableDataClientAsync +from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry +from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup + +async def write_increment_async(project_id, instance_id, table_id): + """Increments a value in a Bigtable table using the async client.""" + async with BigtableDataClientAsync(project=project_id) as client: + table = client.get_table(instance_id, table_id) + row_key = "unique_device_ids_1" + try: + async with table.mutations_batcher() as batcher: + # The AddToCell mutation increments the value of a cell. + # The value must be a positive or negative integer. + reading = AddToCell( + family="counters", + qualifier="odometer", + value=32304 + ) + await batcher.append(RowMutationEntry(row_key.encode("utf-8"), [reading])) + except MutationsExceptionGroup as e: + # MutationsExceptionGroup contains a FailedMutationEntryError for + # each mutation that failed. + for sub_exception in e.exceptions: + failed_entry: RowMutationEntry = sub_exception.entry + cause: Exception = sub_exception.__cause__ + print( + f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}" + ) +# [END bigtable_write_increment_async] diff --git a/samples/snippets/writes/writes_test.py b/samples/snippets/writes/writes_test.py index 2c7a3d62b..5bf5675fd 100644 --- a/samples/snippets/writes/writes_test.py +++ b/samples/snippets/writes/writes_test.py @@ -12,13 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import os +import uuid import backoff from google.api_core.exceptions import DeadlineExceeded import pytest -import uuid +from .write_increment_async import write_increment_async from .write_batch import write_batch from .write_conditionally import write_conditional from .write_increment import write_increment @@ -71,3 +73,11 @@ def _write_batch(): _write_batch() out, _ = capsys.readouterr() assert "Successfully wrote 2 rows" in out + + @backoff.on_exception(backoff.expo, DeadlineExceeded, max_time=60) + def _write_increment_async(): + asyncio.run(write_increment_async(PROJECT, BIGTABLE_INSTANCE, table_id)) + + _write_increment_async() + out, _ = capsys.readouterr() + assert "Successfully incremented row" in out From d39e4228d7d24d209b89e6d081aa8ba0362570bb Mon Sep 17 00:00:00 2001 From: KasiaStrz Date: Thu, 20 Nov 2025 13:41:15 +0100 Subject: [PATCH 03/15] Resolved comments --- samples/snippets/increment.py | 0 samples/snippets/writes/write_increment_async.py | 9 +++++---- samples/snippets/writes/writes_test.py | 2 +- samples/writes/increment.py | 0 4 files changed, 6 insertions(+), 5 deletions(-) create mode 100644 samples/snippets/increment.py create mode 100644 samples/writes/increment.py diff --git a/samples/snippets/increment.py b/samples/snippets/increment.py new file mode 100644 index 000000000..e69de29bb diff --git a/samples/snippets/writes/write_increment_async.py b/samples/snippets/writes/write_increment_async.py index cc1c047bd..253f95a2b 100644 --- a/samples/snippets/writes/write_increment_async.py +++ b/samples/snippets/writes/write_increment_async.py @@ -7,7 +7,8 @@ async def write_increment_async(project_id, instance_id, table_id): """Increments a value in a Bigtable table using the async client.""" async with BigtableDataClientAsync(project=project_id) as client: table = client.get_table(instance_id, table_id) - row_key = "unique_device_ids_1" + # Define the row key as a byte string + row_key = b"unique_device_ids_1" try: async with table.mutations_batcher() as batcher: # The AddToCell mutation increments the value of a cell. @@ -17,10 +18,10 @@ async def write_increment_async(project_id, instance_id, table_id): qualifier="odometer", value=32304 ) - await batcher.append(RowMutationEntry(row_key.encode("utf-8"), [reading])) + await batcher.append(RowMutationEntry(row_key, [reading])) + print(f"Successfully incremented row {row_key.decode('utf-8')}.") except MutationsExceptionGroup as e: - # MutationsExceptionGroup contains a FailedMutationEntryError for - # each mutation that failed. + print(f"Failed to increment row {row_key.decode('utf-8')}") for sub_exception in e.exceptions: failed_entry: RowMutationEntry = sub_exception.entry cause: Exception = sub_exception.__cause__ diff --git a/samples/snippets/writes/writes_test.py b/samples/snippets/writes/writes_test.py index 5bf5675fd..89954e337 100644 --- a/samples/snippets/writes/writes_test.py +++ b/samples/snippets/writes/writes_test.py @@ -80,4 +80,4 @@ def _write_increment_async(): _write_increment_async() out, _ = capsys.readouterr() - assert "Successfully incremented row" in out + assert "Successfully incremented row" in out \ No newline at end of file diff --git a/samples/writes/increment.py b/samples/writes/increment.py new file mode 100644 index 000000000..e69de29bb From 31a5c9cacfddb053da6603042522d461bc533a5a Mon Sep 17 00:00:00 2001 From: KasiaStrz Date: Thu, 20 Nov 2025 13:55:24 +0100 Subject: [PATCH 04/15] Deleted redundant files --- samples/snippets/increment.py | 0 samples/writes/increment.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 samples/snippets/increment.py delete mode 100644 samples/writes/increment.py diff --git a/samples/snippets/increment.py b/samples/snippets/increment.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/samples/writes/increment.py b/samples/writes/increment.py deleted file mode 100644 index e69de29bb..000000000 From 33e6fa31ca350a47f1df9bc1939553f18f89288f Mon Sep 17 00:00:00 2001 From: KasiaStrz Date: Wed, 3 Dec 2025 18:12:40 +0100 Subject: [PATCH 05/15] Fix lint on b/452032333 --- samples/snippets/writes/noxfile.py | 15 ++++++++------- samples/snippets/writes/write_increment_async.py | 11 +++++++---- samples/snippets/writes/writes_test.py | 2 +- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/samples/snippets/writes/noxfile.py b/samples/snippets/writes/noxfile.py index a169b5b5b..c9a3d1ecb 100644 --- a/samples/snippets/writes/noxfile.py +++ b/samples/snippets/writes/noxfile.py @@ -160,6 +160,7 @@ def blacken(session: nox.sessions.Session) -> None: # format = isort + black # + @nox.session def format(session: nox.sessions.Session) -> None: """ @@ -187,7 +188,9 @@ def _session_tests( session: nox.sessions.Session, post_install: Callable = None ) -> None: # check for presence of tests - test_list = glob.glob("**/*_test.py", recursive=True) + glob.glob("**/test_*.py", recursive=True) + test_list = glob.glob("**/*_test.py", recursive=True) + glob.glob( + "**/test_*.py", recursive=True + ) test_list.extend(glob.glob("**/tests", recursive=True)) if len(test_list) == 0: @@ -209,9 +212,7 @@ def _session_tests( if os.path.exists("requirements-test.txt"): if os.path.exists("constraints-test.txt"): - session.install( - "-r", "requirements-test.txt", "-c", "constraints-test.txt" - ) + session.install("-r", "requirements-test.txt", "-c", "constraints-test.txt") else: session.install("-r", "requirements-test.txt") with open("requirements-test.txt") as rtfile: @@ -224,9 +225,9 @@ def _session_tests( post_install(session) if "pytest-parallel" in packages: - concurrent_args.extend(['--workers', 'auto', '--tests-per-worker', 'auto']) + concurrent_args.extend(["--workers", "auto", "--tests-per-worker", "auto"]) elif "pytest-xdist" in packages: - concurrent_args.extend(['-n', 'auto']) + concurrent_args.extend(["-n", "auto"]) session.run( "pytest", @@ -256,7 +257,7 @@ def py(session: nox.sessions.Session) -> None: def _get_repo_root() -> Optional[str]: - """ Returns the root folder of the project. """ + """Returns the root folder of the project.""" # Get root of this repository. Assume we don't have directories nested deeper than 10 items. p = Path(os.getcwd()) for i in range(10): diff --git a/samples/snippets/writes/write_increment_async.py b/samples/snippets/writes/write_increment_async.py index cc1c047bd..b4306872a 100644 --- a/samples/snippets/writes/write_increment_async.py +++ b/samples/snippets/writes/write_increment_async.py @@ -3,6 +3,7 @@ from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup + async def write_increment_async(project_id, instance_id, table_id): """Increments a value in a Bigtable table using the async client.""" async with BigtableDataClientAsync(project=project_id) as client: @@ -13,11 +14,11 @@ async def write_increment_async(project_id, instance_id, table_id): # The AddToCell mutation increments the value of a cell. # The value must be a positive or negative integer. reading = AddToCell( - family="counters", - qualifier="odometer", - value=32304 + family="counters", qualifier="odometer", value=32304 + ) + await batcher.append( + RowMutationEntry(row_key.encode("utf-8"), [reading]) ) - await batcher.append(RowMutationEntry(row_key.encode("utf-8"), [reading])) except MutationsExceptionGroup as e: # MutationsExceptionGroup contains a FailedMutationEntryError for # each mutation that failed. @@ -27,4 +28,6 @@ async def write_increment_async(project_id, instance_id, table_id): print( f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}" ) + + # [END bigtable_write_increment_async] diff --git a/samples/snippets/writes/writes_test.py b/samples/snippets/writes/writes_test.py index 89954e337..5bf5675fd 100644 --- a/samples/snippets/writes/writes_test.py +++ b/samples/snippets/writes/writes_test.py @@ -80,4 +80,4 @@ def _write_increment_async(): _write_increment_async() out, _ = capsys.readouterr() - assert "Successfully incremented row" in out \ No newline at end of file + assert "Successfully incremented row" in out From 4245e988cc2e4469901b10d5eafe32b8a908e463 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 30 Jan 2026 14:27:29 -0800 Subject: [PATCH 06/15] moved sample to new location --- .../data_client/data_client_snippets_async.py | 36 +++++++++++++++++++ .../data_client_snippets_async_test.py | 5 +++ .../snippets/writes/write_increment_async.py | 33 ----------------- samples/snippets/writes/writes_test.py | 12 +------ 4 files changed, 42 insertions(+), 44 deletions(-) delete mode 100644 samples/snippets/writes/write_increment_async.py diff --git a/samples/snippets/data_client/data_client_snippets_async.py b/samples/snippets/data_client/data_client_snippets_async.py index dabbcb839..91918a1d2 100644 --- a/samples/snippets/data_client/data_client_snippets_async.py +++ b/samples/snippets/data_client/data_client_snippets_async.py @@ -136,6 +136,42 @@ async def write_conditional(project_id, instance_id, table_id): await write_conditional(table.client.project, table.instance_id, table.table_id) +async def write_aggregate(table): + # [START bigtable_async_write_aggregate] + from google.cloud.bigtable.data import BigtableDataClientAsync + from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry + from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup + + + async def write_aggregate(project_id, instance_id, table_id): + """Increments a value in a Bigtable table using AddToCell mutation.""" + async with BigtableDataClientAsync(project=project_id) as client: + table = client.get_table(instance_id, table_id) + row_key = "unique_device_ids_1" + try: + async with table.mutations_batcher() as batcher: + # The AddToCell mutation increments the value of a cell. + # The value must be a positive or negative integer. + reading = AddToCell( + family="counters", qualifier="odometer", value=32304 + ) + await batcher.append( + RowMutationEntry(row_key.encode("utf-8"), [reading]) + ) + except MutationsExceptionGroup as e: + # MutationsExceptionGroup contains a FailedMutationEntryError for + # each mutation that failed. + for sub_exception in e.exceptions: + failed_entry: RowMutationEntry = sub_exception.entry + cause: Exception = sub_exception.__cause__ + print( + f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}" + ) + + + # [END bigtable_async_write_aggregate] + await write_aggregate(table.client.project, table.instance_id, table.table_id) + async def read_row(table): # [START bigtable_async_reads_row] from google.cloud.bigtable.data import BigtableDataClientAsync diff --git a/samples/snippets/data_client/data_client_snippets_async_test.py b/samples/snippets/data_client/data_client_snippets_async_test.py index 8dfff50d1..24219439e 100644 --- a/samples/snippets/data_client/data_client_snippets_async_test.py +++ b/samples/snippets/data_client/data_client_snippets_async_test.py @@ -59,6 +59,11 @@ async def test_write_conditional(table): await data_snippets.write_conditional(table) +@pytest.mark.asyncio +async def test_write_aggregate(table): + await data_snippets.write_aggregate(table) + + @pytest.mark.asyncio async def test_read_row(table): await data_snippets.read_row(table) diff --git a/samples/snippets/writes/write_increment_async.py b/samples/snippets/writes/write_increment_async.py deleted file mode 100644 index b4306872a..000000000 --- a/samples/snippets/writes/write_increment_async.py +++ /dev/null @@ -1,33 +0,0 @@ -# [START bigtable_write_increment_async] -from google.cloud.bigtable.data import BigtableDataClientAsync -from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry -from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup - - -async def write_increment_async(project_id, instance_id, table_id): - """Increments a value in a Bigtable table using the async client.""" - async with BigtableDataClientAsync(project=project_id) as client: - table = client.get_table(instance_id, table_id) - row_key = "unique_device_ids_1" - try: - async with table.mutations_batcher() as batcher: - # The AddToCell mutation increments the value of a cell. - # The value must be a positive or negative integer. - reading = AddToCell( - family="counters", qualifier="odometer", value=32304 - ) - await batcher.append( - RowMutationEntry(row_key.encode("utf-8"), [reading]) - ) - except MutationsExceptionGroup as e: - # MutationsExceptionGroup contains a FailedMutationEntryError for - # each mutation that failed. - for sub_exception in e.exceptions: - failed_entry: RowMutationEntry = sub_exception.entry - cause: Exception = sub_exception.__cause__ - print( - f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}" - ) - - -# [END bigtable_write_increment_async] diff --git a/samples/snippets/writes/writes_test.py b/samples/snippets/writes/writes_test.py index 5bf5675fd..2c7a3d62b 100644 --- a/samples/snippets/writes/writes_test.py +++ b/samples/snippets/writes/writes_test.py @@ -12,15 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import os -import uuid import backoff from google.api_core.exceptions import DeadlineExceeded import pytest +import uuid -from .write_increment_async import write_increment_async from .write_batch import write_batch from .write_conditionally import write_conditional from .write_increment import write_increment @@ -73,11 +71,3 @@ def _write_batch(): _write_batch() out, _ = capsys.readouterr() assert "Successfully wrote 2 rows" in out - - @backoff.on_exception(backoff.expo, DeadlineExceeded, max_time=60) - def _write_increment_async(): - asyncio.run(write_increment_async(PROJECT, BIGTABLE_INSTANCE, table_id)) - - _write_increment_async() - out, _ = capsys.readouterr() - assert "Successfully incremented row" in out From c4e7d8947fdc97aa229dc081cabfbfe5115a7390 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 30 Jan 2026 14:39:09 -0800 Subject: [PATCH 07/15] set up table with aggregate family --- .../data_client/data_client_snippets_async.py | 3 ++- .../data_client_snippets_async_test.py | 21 +++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/samples/snippets/data_client/data_client_snippets_async.py b/samples/snippets/data_client/data_client_snippets_async.py index 91918a1d2..0be43514a 100644 --- a/samples/snippets/data_client/data_client_snippets_async.py +++ b/samples/snippets/data_client/data_client_snippets_async.py @@ -151,7 +151,8 @@ async def write_aggregate(project_id, instance_id, table_id): try: async with table.mutations_batcher() as batcher: # The AddToCell mutation increments the value of a cell. - # The value must be a positive or negative integer. + # The `counters` family must be set up to be an aggregate + # family with an int64 input type. reading = AddToCell( family="counters", qualifier="odometer", value=32304 ) diff --git a/samples/snippets/data_client/data_client_snippets_async_test.py b/samples/snippets/data_client/data_client_snippets_async_test.py index 24219439e..fe35f4ac3 100644 --- a/samples/snippets/data_client/data_client_snippets_async_test.py +++ b/samples/snippets/data_client/data_client_snippets_async_test.py @@ -23,10 +23,27 @@ BIGTABLE_INSTANCE = os.environ["BIGTABLE_INSTANCE"] TABLE_ID = f"data-client-{str(uuid.uuid4())[:16]}" +@pytest.fixture(scope="session") +def column_family_config(self): + from google.cloud.bigtable_admin_v2 import types + + int_aggregate_type = types.Type.Aggregate( + input_type=types.Type(int64_type={"encoding": {"big_endian_bytes": {}}}), + sum={}, + ) + + returb { + "family": types.ColumnFamily(), + "stats_summary": types.ColumnFamily(), + "counters": types.ColumnFamily( + value_type=types.Type(aggregate_type=int_aggregate_type) + ), + } @pytest.fixture(scope="session") -def table_id(): - with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, {"family": None, "stats_summary": None}): +def table_id(column_family_config): + + with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, column_family_config): yield TABLE_ID From 3237c9f989cf7e6da214a5391453f14a298f10d7 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 30 Jan 2026 14:42:44 -0800 Subject: [PATCH 08/15] reverted changes to noxfile --- samples/snippets/writes/noxfile.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/samples/snippets/writes/noxfile.py b/samples/snippets/writes/noxfile.py index c9a3d1ecb..a169b5b5b 100644 --- a/samples/snippets/writes/noxfile.py +++ b/samples/snippets/writes/noxfile.py @@ -160,7 +160,6 @@ def blacken(session: nox.sessions.Session) -> None: # format = isort + black # - @nox.session def format(session: nox.sessions.Session) -> None: """ @@ -188,9 +187,7 @@ def _session_tests( session: nox.sessions.Session, post_install: Callable = None ) -> None: # check for presence of tests - test_list = glob.glob("**/*_test.py", recursive=True) + glob.glob( - "**/test_*.py", recursive=True - ) + test_list = glob.glob("**/*_test.py", recursive=True) + glob.glob("**/test_*.py", recursive=True) test_list.extend(glob.glob("**/tests", recursive=True)) if len(test_list) == 0: @@ -212,7 +209,9 @@ def _session_tests( if os.path.exists("requirements-test.txt"): if os.path.exists("constraints-test.txt"): - session.install("-r", "requirements-test.txt", "-c", "constraints-test.txt") + session.install( + "-r", "requirements-test.txt", "-c", "constraints-test.txt" + ) else: session.install("-r", "requirements-test.txt") with open("requirements-test.txt") as rtfile: @@ -225,9 +224,9 @@ def _session_tests( post_install(session) if "pytest-parallel" in packages: - concurrent_args.extend(["--workers", "auto", "--tests-per-worker", "auto"]) + concurrent_args.extend(['--workers', 'auto', '--tests-per-worker', 'auto']) elif "pytest-xdist" in packages: - concurrent_args.extend(["-n", "auto"]) + concurrent_args.extend(['-n', 'auto']) session.run( "pytest", @@ -257,7 +256,7 @@ def py(session: nox.sessions.Session) -> None: def _get_repo_root() -> Optional[str]: - """Returns the root folder of the project.""" + """ Returns the root folder of the project. """ # Get root of this repository. Assume we don't have directories nested deeper than 10 items. p = Path(os.getcwd()) for i in range(10): From 4509d6ef0e7040f7ad72a294d2dd3055462e11c8 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 30 Jan 2026 15:00:53 -0800 Subject: [PATCH 09/15] added timestamp_micros --- samples/snippets/data_client/data_client_snippets_async.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/samples/snippets/data_client/data_client_snippets_async.py b/samples/snippets/data_client/data_client_snippets_async.py index 0be43514a..2dc86b8f0 100644 --- a/samples/snippets/data_client/data_client_snippets_async.py +++ b/samples/snippets/data_client/data_client_snippets_async.py @@ -138,6 +138,7 @@ async def write_conditional(project_id, instance_id, table_id): async def write_aggregate(table): # [START bigtable_async_write_aggregate] + import time from google.cloud.bigtable.data import BigtableDataClientAsync from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup @@ -154,7 +155,10 @@ async def write_aggregate(project_id, instance_id, table_id): # The `counters` family must be set up to be an aggregate # family with an int64 input type. reading = AddToCell( - family="counters", qualifier="odometer", value=32304 + family="counters", + qualifier="odometer", + value=32304, + timestamp_micros=time.time_ns() // 1000, ) await batcher.append( RowMutationEntry(row_key.encode("utf-8"), [reading]) From a3afee5092b97b121519ba5c32e0db1f5b0f79a4 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 30 Jan 2026 15:01:02 -0800 Subject: [PATCH 10/15] fixed typos --- .../snippets/data_client/data_client_snippets_async_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/snippets/data_client/data_client_snippets_async_test.py b/samples/snippets/data_client/data_client_snippets_async_test.py index fe35f4ac3..5fd4b6aa8 100644 --- a/samples/snippets/data_client/data_client_snippets_async_test.py +++ b/samples/snippets/data_client/data_client_snippets_async_test.py @@ -24,7 +24,7 @@ TABLE_ID = f"data-client-{str(uuid.uuid4())[:16]}" @pytest.fixture(scope="session") -def column_family_config(self): +def column_family_config(): from google.cloud.bigtable_admin_v2 import types int_aggregate_type = types.Type.Aggregate( @@ -32,7 +32,7 @@ def column_family_config(self): sum={}, ) - returb { + return { "family": types.ColumnFamily(), "stats_summary": types.ColumnFamily(), "counters": types.ColumnFamily( From 7ace1799022d302d3eea05d4d8a3c4c0156a4037 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 30 Jan 2026 15:01:21 -0800 Subject: [PATCH 11/15] create table using gapic client --- samples/utils.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/samples/utils.py b/samples/utils.py index eb0ca68f9..c6322dca9 100644 --- a/samples/utils.py +++ b/samples/utils.py @@ -59,10 +59,14 @@ def create_table(project, instance_id, table_id, column_families={}): if table.exists(): table.delete() - kwargs = {} - if column_families: - kwargs["column_families"] = column_families - table.create(**kwargs) + # create table using gapic layer + instance._client.table_admin_client.create_table( + request={ + "parent": instance.name, + "table_id": table_id, + "table": {"column_families": column_families}, + } + ) wait_for_table(table) From 2d633bc8da2ca1cab3361a65603830e850dacd34 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 30 Jan 2026 15:02:26 -0800 Subject: [PATCH 12/15] removed extra line --- samples/snippets/data_client/data_client_snippets_async_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/data_client/data_client_snippets_async_test.py b/samples/snippets/data_client/data_client_snippets_async_test.py index 5fd4b6aa8..b1ecd1652 100644 --- a/samples/snippets/data_client/data_client_snippets_async_test.py +++ b/samples/snippets/data_client/data_client_snippets_async_test.py @@ -42,7 +42,6 @@ def column_family_config(): @pytest.fixture(scope="session") def table_id(column_family_config): - with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, column_family_config): yield TABLE_ID From 34976cadffab567ae0f97a4507653e7fe89e2f06 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 30 Jan 2026 15:16:13 -0800 Subject: [PATCH 13/15] fixed lint --- samples/snippets/data_client/data_client_snippets_async.py | 3 +-- .../snippets/data_client/data_client_snippets_async_test.py | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/samples/snippets/data_client/data_client_snippets_async.py b/samples/snippets/data_client/data_client_snippets_async.py index 2dc86b8f0..13552cbf3 100644 --- a/samples/snippets/data_client/data_client_snippets_async.py +++ b/samples/snippets/data_client/data_client_snippets_async.py @@ -143,7 +143,6 @@ async def write_aggregate(table): from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup - async def write_aggregate(project_id, instance_id, table_id): """Increments a value in a Bigtable table using AddToCell mutation.""" async with BigtableDataClientAsync(project=project_id) as client: @@ -173,10 +172,10 @@ async def write_aggregate(project_id, instance_id, table_id): f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}" ) - # [END bigtable_async_write_aggregate] await write_aggregate(table.client.project, table.instance_id, table.table_id) + async def read_row(table): # [START bigtable_async_reads_row] from google.cloud.bigtable.data import BigtableDataClientAsync diff --git a/samples/snippets/data_client/data_client_snippets_async_test.py b/samples/snippets/data_client/data_client_snippets_async_test.py index b1ecd1652..2761bd487 100644 --- a/samples/snippets/data_client/data_client_snippets_async_test.py +++ b/samples/snippets/data_client/data_client_snippets_async_test.py @@ -23,6 +23,7 @@ BIGTABLE_INSTANCE = os.environ["BIGTABLE_INSTANCE"] TABLE_ID = f"data-client-{str(uuid.uuid4())[:16]}" + @pytest.fixture(scope="session") def column_family_config(): from google.cloud.bigtable_admin_v2 import types @@ -40,6 +41,7 @@ def column_family_config(): ), } + @pytest.fixture(scope="session") def table_id(column_family_config): with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, column_family_config): From f647dbcc59a11d7ccf0060860b7a9e04428f829a Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 3 Feb 2026 11:48:50 -0800 Subject: [PATCH 14/15] added comment --- samples/snippets/data_client/data_client_snippets_async.py | 1 + 1 file changed, 1 insertion(+) diff --git a/samples/snippets/data_client/data_client_snippets_async.py b/samples/snippets/data_client/data_client_snippets_async.py index 13552cbf3..332dbd56f 100644 --- a/samples/snippets/data_client/data_client_snippets_async.py +++ b/samples/snippets/data_client/data_client_snippets_async.py @@ -157,6 +157,7 @@ async def write_aggregate(project_id, instance_id, table_id): family="counters", qualifier="odometer", value=32304, + # Convert nanoseconds to microseconds timestamp_micros=time.time_ns() // 1000, ) await batcher.append( From dfc070ce99222ff72cf9f34c4652a98b4e6d73e4 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 3 Feb 2026 12:00:25 -0800 Subject: [PATCH 15/15] fixed table setup --- samples/utils.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/samples/utils.py b/samples/utils.py index c6322dca9..49b7ab994 100644 --- a/samples/utils.py +++ b/samples/utils.py @@ -16,6 +16,7 @@ from google.cloud import bigtable +from google.cloud.bigtable.column_family import ColumnFamily from google.api_core import exceptions from google.api_core.retry import Retry from google.api_core.retry import if_exception_type @@ -59,12 +60,18 @@ def create_table(project, instance_id, table_id, column_families={}): if table.exists(): table.delete() + # convert column families to pb if needed + pb_families = { + id: ColumnFamily(id, table, rule).to_pb() if not isinstance(rule, ColumnFamily) else rule + for (id, rule) in column_families.items() + } + # create table using gapic layer instance._client.table_admin_client.create_table( request={ "parent": instance.name, "table_id": table_id, - "table": {"column_families": column_families}, + "table": {"column_families": pb_families}, } )