Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions samples/snippets/data_client/data_client_snippets_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,46 @@ async def write_conditional(project_id, instance_id, table_id):
await write_conditional(table.client.project, table.instance_id, table.table_id)


async def write_aggregate(table):
# [START bigtable_async_write_aggregate]
import time
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup

async def write_aggregate(project_id, instance_id, table_id):
"""Increments a value in a Bigtable table using AddToCell mutation."""
async with BigtableDataClientAsync(project=project_id) as client:
table = client.get_table(instance_id, table_id)
row_key = "unique_device_ids_1"
try:
async with table.mutations_batcher() as batcher:
# The AddToCell mutation increments the value of a cell.
# The `counters` family must be set up to be an aggregate
# family with an int64 input type.
reading = AddToCell(
family="counters",
qualifier="odometer",
value=32304,
timestamp_micros=time.time_ns() // 1000,
)
await batcher.append(
RowMutationEntry(row_key.encode("utf-8"), [reading])
)
except MutationsExceptionGroup as e:
# MutationsExceptionGroup contains a FailedMutationEntryError for
# each mutation that failed.
for sub_exception in e.exceptions:
failed_entry: RowMutationEntry = sub_exception.entry
cause: Exception = sub_exception.__cause__
print(
f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}"
)

# [END bigtable_async_write_aggregate]
await write_aggregate(table.client.project, table.instance_id, table.table_id)


async def read_row(table):
# [START bigtable_async_reads_row]
from google.cloud.bigtable.data import BigtableDataClientAsync
Expand Down
27 changes: 25 additions & 2 deletions samples/snippets/data_client/data_client_snippets_async_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,26 @@


@pytest.fixture(scope="session")
def table_id():
with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, {"family": None, "stats_summary": None}):
def column_family_config():
from google.cloud.bigtable_admin_v2 import types

int_aggregate_type = types.Type.Aggregate(
input_type=types.Type(int64_type={"encoding": {"big_endian_bytes": {}}}),
sum={},
)

return {
"family": types.ColumnFamily(),
"stats_summary": types.ColumnFamily(),
"counters": types.ColumnFamily(
value_type=types.Type(aggregate_type=int_aggregate_type)
),
}


@pytest.fixture(scope="session")
def table_id(column_family_config):
with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, column_family_config):
yield TABLE_ID


Expand Down Expand Up @@ -59,6 +77,11 @@ async def test_write_conditional(table):
await data_snippets.write_conditional(table)


@pytest.mark.asyncio
async def test_write_aggregate(table):
await data_snippets.write_aggregate(table)


@pytest.mark.asyncio
async def test_read_row(table):
await data_snippets.read_row(table)
Expand Down
12 changes: 8 additions & 4 deletions samples/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ def create_table(project, instance_id, table_id, column_families={}):
if table.exists():
table.delete()

kwargs = {}
if column_families:
kwargs["column_families"] = column_families
table.create(**kwargs)
# create table using gapic layer
instance._client.table_admin_client.create_table(
request={
"parent": instance.name,
"table_id": table_id,
"table": {"column_families": column_families},
}
)

wait_for_table(table)

Expand Down
Loading