diff --git a/include/cucascade/data/representation_converter.hpp b/include/cucascade/data/representation_converter.hpp index 3114483..5bd8399 100644 --- a/include/cucascade/data/representation_converter.hpp +++ b/include/cucascade/data/representation_converter.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -240,7 +241,7 @@ class representation_converter_registry { rmm::cuda_stream_view stream) const; bool unregister_converter_impl(const converter_key& key); - mutable std::mutex _mutex; + mutable std::shared_mutex _mutex; std::unordered_map _converters; }; diff --git a/src/data/representation_converter.cpp b/src/data/representation_converter.cpp index 117eb75..079dba5 100644 --- a/src/data/representation_converter.cpp +++ b/src/data/representation_converter.cpp @@ -54,7 +54,7 @@ namespace cucascade { void representation_converter_registry::register_converter_impl( const converter_key& key, representation_converter_fn converter) { - std::lock_guard lock(_mutex); + std::unique_lock lock(_mutex); if (_converters.find(key) != _converters.end()) { std::ostringstream oss; @@ -68,7 +68,7 @@ void representation_converter_registry::register_converter_impl( bool representation_converter_registry::has_converter_impl(const converter_key& key) const { - std::lock_guard lock(_mutex); + std::shared_lock lock(_mutex); return _converters.find(key) != _converters.end(); } @@ -80,7 +80,7 @@ std::unique_ptr representation_converter_registry::convert { representation_converter_fn converter; { - std::lock_guard lock(_mutex); + std::shared_lock lock(_mutex); auto it = _converters.find(key); if (it == _converters.end()) { @@ -108,13 +108,13 @@ std::unique_ptr representation_converter_registry::convert bool representation_converter_registry::unregister_converter_impl(const converter_key& key) { - std::lock_guard lock(_mutex); + std::unique_lock lock(_mutex); return _converters.erase(key) > 0; } void representation_converter_registry::clear() { - std::lock_guard lock(_mutex); + std::unique_lock lock(_mutex); _converters.clear(); } @@ -124,6 +124,55 @@ void representation_converter_registry::clear() namespace { +/** + * @brief Accumulates (dst, src, size) copy operations for a single cudaMemcpyBatchAsync call. + * + * Collect all copies first, then call flush() once to submit them all to the GPU in one driver + * call. This eliminates per-buffer driver overhead versus issuing individual cudaMemcpyAsync calls. + */ +struct BatchCopyAccumulator { + std::vector dsts; + std::vector srcs; + std::vector sizes; + + void add(void* dst, const void* src, std::size_t size) + { + if (size == 0 || src == nullptr || dst == nullptr) { return; } + dsts.push_back(dst); + srcs.push_back(src); + sizes.push_back(size); + } + + std::size_t count() const { return dsts.size(); } + + void flush(rmm::cuda_stream_view stream, cudaMemcpySrcAccessOrder src_order) + { + if (count() == 0) { return; } +#if CUDART_VERSION >= 12080 + cudaMemcpyAttributes attr{}; + attr.srcAccessOrder = src_order; + attr.flags = cudaMemcpyFlagDefault; + // Single-attribute template overload (cuda_runtime.h): deduces direction from pointer types. + // NOTE: cudaMemcpyBatchAsync requires a real (non-default) CUDA stream. + // CUDA 12.x has a failIdx parameter that was removed in CUDA 13. +#if CUDART_VERSION < 13000 + CUCASCADE_CUDA_TRY(cudaMemcpyBatchAsync( + dsts.data(), srcs.data(), sizes.data(), count(), attr, nullptr, stream.value())); +#else + CUCASCADE_CUDA_TRY( + cudaMemcpyBatchAsync(dsts.data(), srcs.data(), sizes.data(), count(), attr, stream.value())); +#endif +#else + // cudaMemcpyBatchAsync requires CUDA 12.8+; fall back to individual copies. + (void)src_order; + for (std::size_t i = 0; i < count(); ++i) { + CUCASCADE_CUDA_TRY( + cudaMemcpyAsync(dsts[i], srcs[i], sizes[i], cudaMemcpyDefault, stream.value())); + } +#endif + } +}; + /** * @brief Convert gpu_table_representation to gpu_table_representation (cross-GPU copy) */ @@ -132,10 +181,6 @@ std::unique_ptr convert_gpu_to_gpu( const memory::memory_space* target_memory_space, rmm::cuda_stream_view stream) { - // Synchronize the stream to ensure any prior operations (like table creation) - // are complete before we read from the source table - stream.synchronize(); - auto& gpu_source = source.cast(); auto packed_data = cudf::pack(gpu_source.get_table(), stream); @@ -238,10 +283,12 @@ std::unique_ptr convert_host_to_gpu( auto& host_table = host_source.get_host_table(); auto const data_size = host_table->data_size; - auto mr = target_memory_space->get_default_allocator(); - int previous_device = -1; + auto mr = target_memory_space->get_default_allocator(); + int previous_device = -1; + auto const target_device = target_memory_space->get_device_id(); CUCASCADE_CUDA_TRY(cudaGetDevice(&previous_device)); - CUCASCADE_CUDA_TRY(cudaSetDevice(target_memory_space->get_device_id())); + bool const switch_device = (previous_device != target_device); + if (switch_device) { CUCASCADE_CUDA_TRY(cudaSetDevice(target_device)); } rmm::device_buffer dst_buffer(data_size, stream, mr); size_t src_block_index = 0; @@ -266,15 +313,16 @@ std::unique_ptr convert_host_to_gpu( } } - auto new_metadata = std::make_unique>(*host_table->metadata); auto new_gpu_data = std::make_unique(std::move(dst_buffer)); - stream.synchronize(); + // cudf::unpack only reads metadata to produce a non-owning table_view — no GPU work. + // Use source metadata directly since it remains alive for the duration of this call. + // The H→D memcpy and table copy are on the same stream, so ordering is guaranteed. auto new_table_view = - cudf::unpack(new_metadata->data(), static_cast(new_gpu_data->data())); + cudf::unpack(host_table->metadata->data(), static_cast(new_gpu_data->data())); auto new_table = std::make_unique(new_table_view, stream, mr); stream.synchronize(); - CUCASCADE_CUDA_TRY(cudaSetDevice(previous_device)); + if (switch_device) { CUCASCADE_CUDA_TRY(cudaSetDevice(previous_device)); } return std::make_unique( std::move(new_table), *const_cast(target_memory_space)); } @@ -375,54 +423,6 @@ static std::size_t element_size_bytes(const cudf::column_view& col) return cudf::size_of(col.type()); } -/** - * @brief Accumulates (dst, src, size) copy operations for a single cudaMemcpyBatchAsync call. - * - * Collect all copies first, then call flush() once to submit them all to the GPU in one driver - * call. This eliminates per-buffer driver overhead versus issuing individual cudaMemcpyAsync calls. - */ -struct BatchCopyAccumulator { - std::vector dsts; - std::vector srcs; - std::vector sizes; - - void add(void* dst, const void* src, std::size_t size) - { - if (size == 0 || src == nullptr || dst == nullptr) { return; } - dsts.push_back(dst); - srcs.push_back(src); - sizes.push_back(size); - } - - std::size_t count() const { return dsts.size(); } - - void flush(rmm::cuda_stream_view stream, cudaMemcpySrcAccessOrder src_order) - { - if (count() == 0) { return; } -#if CUDART_VERSION >= 12080 - cudaMemcpyAttributes attr{}; - attr.srcAccessOrder = src_order; - attr.flags = cudaMemcpyFlagDefault; - // Single-attribute template overload (cuda_runtime.h): deduces direction from pointer types. - // NOTE: cudaMemcpyBatchAsync requires a real (non-default) CUDA stream. - // CUDA 12.x has a failIdx parameter that was removed in CUDA 13. -#if CUDART_VERSION < 13000 - CUCASCADE_CUDA_TRY(cudaMemcpyBatchAsync( - dsts.data(), srcs.data(), sizes.data(), count(), attr, nullptr, stream.value())); -#else - CUCASCADE_CUDA_TRY( - cudaMemcpyBatchAsync(dsts.data(), srcs.data(), sizes.data(), count(), attr, stream.value())); -#endif -#else - // cudaMemcpyBatchAsync requires CUDA 12.8+; fall back to individual copies. - (void)src_order; - for (std::size_t i = 0; i < count(); ++i) { - CUCASCADE_CUDA_TRY(cudaMemcpyAsync(dsts[i], srcs[i], sizes[i], cudaMemcpyDefault, stream.value())); - } -#endif - } -}; - /** * @brief Recursively plan the buffer layout for one column, filling in column_metadata. * @@ -736,9 +736,11 @@ std::unique_ptr convert_host_fast_to_gpu( throw std::runtime_error("convert_host_fast_to_gpu: host table allocation is null"); } - int previous_device = -1; + int previous_device = -1; + auto const target_device = target_memory_space->get_device_id(); CUCASCADE_CUDA_TRY(cudaGetDevice(&previous_device)); - CUCASCADE_CUDA_TRY(cudaSetDevice(target_memory_space->get_device_id())); + bool const switch_device = (previous_device != target_device); + if (switch_device) { CUCASCADE_CUDA_TRY(cudaSetDevice(target_device)); } auto mr = target_memory_space->get_default_allocator(); @@ -755,7 +757,7 @@ std::unique_ptr convert_host_fast_to_gpu( auto new_table = std::make_unique(std::move(gpu_columns)); stream.synchronize(); - CUCASCADE_CUDA_TRY(cudaSetDevice(previous_device)); + if (switch_device) { CUCASCADE_CUDA_TRY(cudaSetDevice(previous_device)); } return std::make_unique( std::move(new_table), *const_cast(target_memory_space)); }