Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions tests/perf/microbenchmarks/writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
from tests.perf.microbenchmarks._utils import (
publish_benchmark_extra_info,
RandomBytesIO,
get_irq_affinity,
)
from tests.perf.microbenchmarks.conftest import publish_resource_metrics
import tests.perf.microbenchmarks.writes.config as config
from google.cloud import storage

# Get write parameters
all_params = config.get_write_params()
Expand Down Expand Up @@ -318,10 +318,34 @@ def target_wrapper(*args, **kwargs):
)


# --- Global Variables for Worker Process ---
worker_loop = None
worker_client = None
worker_json_client = None


def _worker_init(bucket_type):
"""Initializes a persistent event loop and client for each worker process."""
cpu_affinity = get_irq_affinity()
if cpu_affinity:
os.sched_setaffinity(
0, {i for i in range(1, os.cpu_count()) if i not in cpu_affinity}
)
global worker_loop, worker_client, worker_json_client
if bucket_type == "zonal":
worker_loop = asyncio.new_event_loop()
asyncio.set_event_loop(worker_loop)
worker_client = worker_loop.run_until_complete(create_client())
else: # regional
from google.cloud import storage

worker_json_client = storage.Client()


def _upload_files_worker(files_to_upload, other_params, bucket_type):
"""A worker function for multi-processing uploads.

Initializes a client and calls the appropriate multi-coroutine upload function.
Calls the appropriate multi-coroutine upload function using the global client.
This function is intended to be called in a separate process.

Args:
Expand All @@ -333,41 +357,28 @@ def _upload_files_worker(files_to_upload, other_params, bucket_type):
float: The maximum latency from the uploads performed by this worker.
"""
if bucket_type == "zonal":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = loop.run_until_complete(create_client())
try:
result = upload_files_using_grpc_multi_coro(
loop, client, files_to_upload, other_params
)
finally:
# cleanup loop
tasks = asyncio.all_tasks(loop=loop)
for task in tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
loop.close()
return result
return upload_files_using_grpc_multi_coro(
worker_loop, worker_client, files_to_upload, other_params
)
else: # regional
json_client = storage.Client()
return upload_files_using_json_multi_threaded(
None, json_client, files_to_upload, other_params
None, worker_json_client, files_to_upload, other_params
)


def upload_files_mp_mc_wrapper(files_names, params):
def upload_files_mp_mc_wrapper(pool, files_names, params):
"""Wrapper for multi-process, multi-coroutine uploads.

Distributes files among a pool of processes and calls the worker function.

Args:
pool: The multiprocessing pool.
files_names (list): The full list of filenames to upload.
params: An object containing benchmark parameters (num_processes, num_coros).
params: An object containing benchmark parameters (num_coros).

Returns:
float: The maximum latency observed across all processes.
"""
num_processes = params.num_processes
num_coros = params.num_coros

filenames_per_process = [
Expand All @@ -383,9 +394,7 @@ def upload_files_mp_mc_wrapper(files_names, params):
for filenames in filenames_per_process
]

ctx = multiprocessing.get_context("spawn")
with ctx.Pool(processes=num_processes) as pool:
results = pool.starmap(_upload_files_worker, args)
results = pool.starmap(_upload_files_worker, args)

return max(results)

Expand Down Expand Up @@ -414,18 +423,27 @@ def target_wrapper(*args, **kwargs):
output_times.append(result)
return output_times

ctx = multiprocessing.get_context("spawn")
pool = ctx.Pool(
processes=params.num_processes,
initializer=_worker_init,
initargs=(params.bucket_type,),
)
try:
with monitor() as m:
output_times = benchmark.pedantic(
target=target_wrapper,
iterations=1,
rounds=params.rounds,
args=(
pool,
files_names,
params,
),
)
finally:
pool.close()
pool.join()
publish_benchmark_extra_info(
benchmark, params, benchmark_group="write", true_times=output_times
)
Expand Down