diff --git a/samples/snippets/data_client/data_client_snippets_async.py b/samples/snippets/data_client/data_client_snippets_async.py index dabbcb839..13552cbf3 100644 --- a/samples/snippets/data_client/data_client_snippets_async.py +++ b/samples/snippets/data_client/data_client_snippets_async.py @@ -136,6 +136,46 @@ async def write_conditional(project_id, instance_id, table_id): await write_conditional(table.client.project, table.instance_id, table.table_id) +async def write_aggregate(table): + # [START bigtable_async_write_aggregate] + import time + from google.cloud.bigtable.data import BigtableDataClientAsync + from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry + from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup + + async def write_aggregate(project_id, instance_id, table_id): + """Increments a value in a Bigtable table using AddToCell mutation.""" + async with BigtableDataClientAsync(project=project_id) as client: + table = client.get_table(instance_id, table_id) + row_key = "unique_device_ids_1" + try: + async with table.mutations_batcher() as batcher: + # The AddToCell mutation increments the value of a cell. + # The `counters` family must be set up to be an aggregate + # family with an int64 input type. + reading = AddToCell( + family="counters", + qualifier="odometer", + value=32304, + timestamp_micros=time.time_ns() // 1000, + ) + await batcher.append( + RowMutationEntry(row_key.encode("utf-8"), [reading]) + ) + except MutationsExceptionGroup as e: + # MutationsExceptionGroup contains a FailedMutationEntryError for + # each mutation that failed. + for sub_exception in e.exceptions: + failed_entry: RowMutationEntry = sub_exception.entry + cause: Exception = sub_exception.__cause__ + print( + f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}" + ) + + # [END bigtable_async_write_aggregate] + await write_aggregate(table.client.project, table.instance_id, table.table_id) + + async def read_row(table): # [START bigtable_async_reads_row] from google.cloud.bigtable.data import BigtableDataClientAsync diff --git a/samples/snippets/data_client/data_client_snippets_async_test.py b/samples/snippets/data_client/data_client_snippets_async_test.py index 8dfff50d1..2761bd487 100644 --- a/samples/snippets/data_client/data_client_snippets_async_test.py +++ b/samples/snippets/data_client/data_client_snippets_async_test.py @@ -25,8 +25,26 @@ @pytest.fixture(scope="session") -def table_id(): - with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, {"family": None, "stats_summary": None}): +def column_family_config(): + from google.cloud.bigtable_admin_v2 import types + + int_aggregate_type = types.Type.Aggregate( + input_type=types.Type(int64_type={"encoding": {"big_endian_bytes": {}}}), + sum={}, + ) + + return { + "family": types.ColumnFamily(), + "stats_summary": types.ColumnFamily(), + "counters": types.ColumnFamily( + value_type=types.Type(aggregate_type=int_aggregate_type) + ), + } + + +@pytest.fixture(scope="session") +def table_id(column_family_config): + with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, column_family_config): yield TABLE_ID @@ -59,6 +77,11 @@ async def test_write_conditional(table): await data_snippets.write_conditional(table) +@pytest.mark.asyncio +async def test_write_aggregate(table): + await data_snippets.write_aggregate(table) + + @pytest.mark.asyncio async def test_read_row(table): await data_snippets.read_row(table) diff --git a/samples/utils.py b/samples/utils.py index eb0ca68f9..c6322dca9 100644 --- a/samples/utils.py +++ b/samples/utils.py @@ -59,10 +59,14 @@ def create_table(project, instance_id, table_id, column_families={}): if table.exists(): table.delete() - kwargs = {} - if column_families: - kwargs["column_families"] = column_families - table.create(**kwargs) + # create table using gapic layer + instance._client.table_admin_client.create_table( + request={ + "parent": instance.name, + "table_id": table_id, + "table": {"column_families": column_families}, + } + ) wait_for_table(table)