Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/cucascade/data/representation_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <functional>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <stdexcept>
#include <typeindex>
#include <unordered_map>
Expand Down Expand Up @@ -240,7 +241,7 @@ class representation_converter_registry {
rmm::cuda_stream_view stream) const;
bool unregister_converter_impl(const converter_key& key);

mutable std::mutex _mutex;
mutable std::shared_mutex _mutex;
std::unordered_map<converter_key, representation_converter_fn, converter_key_hash> _converters;
};

Expand Down
136 changes: 69 additions & 67 deletions src/data/representation_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ namespace cucascade {
void representation_converter_registry::register_converter_impl(
const converter_key& key, representation_converter_fn converter)
{
std::lock_guard<std::mutex> lock(_mutex);
std::unique_lock lock(_mutex);

if (_converters.find(key) != _converters.end()) {
std::ostringstream oss;
Expand All @@ -68,7 +68,7 @@ void representation_converter_registry::register_converter_impl(

bool representation_converter_registry::has_converter_impl(const converter_key& key) const
{
std::lock_guard<std::mutex> lock(_mutex);
std::shared_lock lock(_mutex);
return _converters.find(key) != _converters.end();
}

Expand All @@ -80,7 +80,7 @@ std::unique_ptr<idata_representation> representation_converter_registry::convert
{
representation_converter_fn converter;
{
std::lock_guard<std::mutex> lock(_mutex);
std::shared_lock lock(_mutex);

auto it = _converters.find(key);
if (it == _converters.end()) {
Expand Down Expand Up @@ -108,13 +108,13 @@ std::unique_ptr<idata_representation> representation_converter_registry::convert

bool representation_converter_registry::unregister_converter_impl(const converter_key& key)
{
std::lock_guard<std::mutex> lock(_mutex);
std::unique_lock lock(_mutex);
return _converters.erase(key) > 0;
}

void representation_converter_registry::clear()
{
std::lock_guard<std::mutex> lock(_mutex);
std::unique_lock lock(_mutex);
_converters.clear();
}

Expand All @@ -124,6 +124,55 @@ void representation_converter_registry::clear()

namespace {

/**
* @brief Accumulates (dst, src, size) copy operations for a single cudaMemcpyBatchAsync call.
*
* Collect all copies first, then call flush() once to submit them all to the GPU in one driver
* call. This eliminates per-buffer driver overhead versus issuing individual cudaMemcpyAsync calls.
*/
struct BatchCopyAccumulator {
std::vector<void*> dsts;
std::vector<const void*> srcs;
std::vector<std::size_t> sizes;

void add(void* dst, const void* src, std::size_t size)
{
if (size == 0 || src == nullptr || dst == nullptr) { return; }
dsts.push_back(dst);
srcs.push_back(src);
sizes.push_back(size);
}

std::size_t count() const { return dsts.size(); }

void flush(rmm::cuda_stream_view stream, cudaMemcpySrcAccessOrder src_order)
{
if (count() == 0) { return; }
#if CUDART_VERSION >= 12080
cudaMemcpyAttributes attr{};
attr.srcAccessOrder = src_order;
attr.flags = cudaMemcpyFlagDefault;
// Single-attribute template overload (cuda_runtime.h): deduces direction from pointer types.
// NOTE: cudaMemcpyBatchAsync requires a real (non-default) CUDA stream.
// CUDA 12.x has a failIdx parameter that was removed in CUDA 13.
#if CUDART_VERSION < 13000
CUCASCADE_CUDA_TRY(cudaMemcpyBatchAsync(
dsts.data(), srcs.data(), sizes.data(), count(), attr, nullptr, stream.value()));
#else
CUCASCADE_CUDA_TRY(
cudaMemcpyBatchAsync(dsts.data(), srcs.data(), sizes.data(), count(), attr, stream.value()));
#endif
#else
// cudaMemcpyBatchAsync requires CUDA 12.8+; fall back to individual copies.
(void)src_order;
for (std::size_t i = 0; i < count(); ++i) {
CUCASCADE_CUDA_TRY(
cudaMemcpyAsync(dsts[i], srcs[i], sizes[i], cudaMemcpyDefault, stream.value()));
}
#endif
}
};

/**
* @brief Convert gpu_table_representation to gpu_table_representation (cross-GPU copy)
*/
Expand All @@ -132,10 +181,6 @@ std::unique_ptr<idata_representation> convert_gpu_to_gpu(
const memory::memory_space* target_memory_space,
rmm::cuda_stream_view stream)
{
// Synchronize the stream to ensure any prior operations (like table creation)
// are complete before we read from the source table
stream.synchronize();

auto& gpu_source = source.cast<gpu_table_representation>();
auto packed_data = cudf::pack(gpu_source.get_table(), stream);

Expand Down Expand Up @@ -238,10 +283,12 @@ std::unique_ptr<idata_representation> convert_host_to_gpu(
auto& host_table = host_source.get_host_table();
auto const data_size = host_table->data_size;

auto mr = target_memory_space->get_default_allocator();
int previous_device = -1;
auto mr = target_memory_space->get_default_allocator();
int previous_device = -1;
auto const target_device = target_memory_space->get_device_id();
CUCASCADE_CUDA_TRY(cudaGetDevice(&previous_device));
CUCASCADE_CUDA_TRY(cudaSetDevice(target_memory_space->get_device_id()));
bool const switch_device = (previous_device != target_device);
if (switch_device) { CUCASCADE_CUDA_TRY(cudaSetDevice(target_device)); }

rmm::device_buffer dst_buffer(data_size, stream, mr);
size_t src_block_index = 0;
Expand All @@ -266,15 +313,16 @@ std::unique_ptr<idata_representation> convert_host_to_gpu(
}
}

auto new_metadata = std::make_unique<std::vector<uint8_t>>(*host_table->metadata);
auto new_gpu_data = std::make_unique<rmm::device_buffer>(std::move(dst_buffer));
stream.synchronize();
// cudf::unpack only reads metadata to produce a non-owning table_view — no GPU work.
// Use source metadata directly since it remains alive for the duration of this call.
// The H→D memcpy and table copy are on the same stream, so ordering is guaranteed.
auto new_table_view =
cudf::unpack(new_metadata->data(), static_cast<uint8_t const*>(new_gpu_data->data()));
cudf::unpack(host_table->metadata->data(), static_cast<uint8_t const*>(new_gpu_data->data()));
auto new_table = std::make_unique<cudf::table>(new_table_view, stream, mr);
stream.synchronize();

CUCASCADE_CUDA_TRY(cudaSetDevice(previous_device));
if (switch_device) { CUCASCADE_CUDA_TRY(cudaSetDevice(previous_device)); }
return std::make_unique<gpu_table_representation>(
std::move(new_table), *const_cast<memory::memory_space*>(target_memory_space));
}
Expand Down Expand Up @@ -375,54 +423,6 @@ static std::size_t element_size_bytes(const cudf::column_view& col)
return cudf::size_of(col.type());
}

/**
* @brief Accumulates (dst, src, size) copy operations for a single cudaMemcpyBatchAsync call.
*
* Collect all copies first, then call flush() once to submit them all to the GPU in one driver
* call. This eliminates per-buffer driver overhead versus issuing individual cudaMemcpyAsync calls.
*/
struct BatchCopyAccumulator {
std::vector<void*> dsts;
std::vector<const void*> srcs;
std::vector<std::size_t> sizes;

void add(void* dst, const void* src, std::size_t size)
{
if (size == 0 || src == nullptr || dst == nullptr) { return; }
dsts.push_back(dst);
srcs.push_back(src);
sizes.push_back(size);
}

std::size_t count() const { return dsts.size(); }

void flush(rmm::cuda_stream_view stream, cudaMemcpySrcAccessOrder src_order)
{
if (count() == 0) { return; }
#if CUDART_VERSION >= 12080
cudaMemcpyAttributes attr{};
attr.srcAccessOrder = src_order;
attr.flags = cudaMemcpyFlagDefault;
// Single-attribute template overload (cuda_runtime.h): deduces direction from pointer types.
// NOTE: cudaMemcpyBatchAsync requires a real (non-default) CUDA stream.
// CUDA 12.x has a failIdx parameter that was removed in CUDA 13.
#if CUDART_VERSION < 13000
CUCASCADE_CUDA_TRY(cudaMemcpyBatchAsync(
dsts.data(), srcs.data(), sizes.data(), count(), attr, nullptr, stream.value()));
#else
CUCASCADE_CUDA_TRY(
cudaMemcpyBatchAsync(dsts.data(), srcs.data(), sizes.data(), count(), attr, stream.value()));
#endif
#else
// cudaMemcpyBatchAsync requires CUDA 12.8+; fall back to individual copies.
(void)src_order;
for (std::size_t i = 0; i < count(); ++i) {
CUCASCADE_CUDA_TRY(cudaMemcpyAsync(dsts[i], srcs[i], sizes[i], cudaMemcpyDefault, stream.value()));
}
#endif
}
};

/**
* @brief Recursively plan the buffer layout for one column, filling in column_metadata.
*
Expand Down Expand Up @@ -736,9 +736,11 @@ std::unique_ptr<idata_representation> convert_host_fast_to_gpu(
throw std::runtime_error("convert_host_fast_to_gpu: host table allocation is null");
}

int previous_device = -1;
int previous_device = -1;
auto const target_device = target_memory_space->get_device_id();
CUCASCADE_CUDA_TRY(cudaGetDevice(&previous_device));
CUCASCADE_CUDA_TRY(cudaSetDevice(target_memory_space->get_device_id()));
bool const switch_device = (previous_device != target_device);
if (switch_device) { CUCASCADE_CUDA_TRY(cudaSetDevice(target_device)); }

auto mr = target_memory_space->get_default_allocator();

Expand All @@ -755,7 +757,7 @@ std::unique_ptr<idata_representation> convert_host_fast_to_gpu(
auto new_table = std::make_unique<cudf::table>(std::move(gpu_columns));
stream.synchronize();

CUCASCADE_CUDA_TRY(cudaSetDevice(previous_device));
if (switch_device) { CUCASCADE_CUDA_TRY(cudaSetDevice(previous_device)); }
return std::make_unique<gpu_table_representation>(
std::move(new_table), *const_cast<memory::memory_space*>(target_memory_space));
}
Expand Down
Loading