From fcdf3f78f9570f9994f6671f3cdc079781f61e88 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 2 Feb 2026 19:26:31 +0100 Subject: [PATCH 1/5] Change protocol version on JSON for file column info --- src/Core/ProtocolDefines.h | 3 +- src/Core/Range.cpp | 30 ++++++++ src/Core/Range.h | 3 + src/Disks/ObjectStorages/IObjectStorage.cpp | 9 ++- src/Disks/ObjectStorages/IObjectStorage.h | 19 ++++- src/Interpreters/ClusterFunctionReadTask.cpp | 22 +----- .../DataLakes/IDataLakeMetadata.cpp | 77 +++++++++++++++++++ .../DataLakes/IDataLakeMetadata.h | 7 +- .../StorageObjectStorageSource.cpp | 1 + ...rageObjectStorageStableTaskDistributor.cpp | 8 ++ 10 files changed, 155 insertions(+), 24 deletions(-) diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index b8bbf2c9e945..b2d9eae7bb01 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -37,8 +37,7 @@ static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 5; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO; static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4; diff --git a/src/Core/Range.cpp b/src/Core/Range.cpp index 6d037d7e9004..54d2ae87d200 100644 --- a/src/Core/Range.cpp +++ b/src/Core/Range.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -9,6 +10,12 @@ namespace DB { +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +}; + + FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_) : Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_) { @@ -283,6 +290,29 @@ bool Range::nearByWith(const Range & r) const return false; } +String Range::serialize() const +{ + WriteBufferFromOwnString str; + + str << left_included << right_included; + writeFieldBinary(left, str); + writeFieldBinary(right, str); + + return str.str(); +} + +void Range::deserialize(const String & range) +{ + if (range.empty()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump"); + + ReadBufferFromOwnString str(range); + + str >> left_included >> right_included; + left = readFieldBinary(str); + right = readFieldBinary(str); +} + Range intersect(const Range & a, const Range & b) { Range res = Range::createWholeUniverse(); diff --git a/src/Core/Range.h b/src/Core/Range.h index 921e1e6aa3f0..12463d448945 100644 --- a/src/Core/Range.h +++ b/src/Core/Range.h @@ -116,6 +116,9 @@ struct Range bool nearByWith(const Range & r) const; String toString() const; + + String serialize() const; + void deserialize(const String & range); }; Range intersect(const Range & a, const Range & b); diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index 2541d71c49f2..fa342cd36d6c 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -120,8 +120,12 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string is_valid = true; + if (json->has("file_path")) + file_path = json->getValue("file_path"); if (json->has("retry_after_us")) retry_after_us = json->getValue("retry_after_us"); + if (json->has("meta_info")) + file_meta_info = std::make_shared(json->getObject("meta_info")); } catch (const Poco::JSON::JSONException &) { /// Not a JSON @@ -132,8 +136,12 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string std::string PathWithMetadata::CommandInTaskResponse::toString() const { Poco::JSON::Object json; + if (file_path.has_value()) + json.set("file_path", file_path.value()); if (retry_after_us.has_value()) json.set("retry_after_us", retry_after_us.value()); + if (file_meta_info.has_value()) + json.set("meta_info", file_meta_info.value()->toJson()); std::ostringstream oss; oss.exceptions(std::ios::failbit); @@ -141,7 +149,6 @@ std::string PathWithMetadata::CommandInTaskResponse::toString() const return oss.str(); } - void PathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file) { if (!metadata) diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index a6093d4d92f9..2204fd49d29e 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -123,19 +123,33 @@ struct PathWithMetadata explicit CommandInTaskResponse(const std::string & task); bool isValid() const { return is_valid; } + void setFilePath(const std::string & file_path_ ) + { + file_path = file_path_; + is_valid = true; + } void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; is_valid = true; } + void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) + { + file_meta_info = file_meta_info_; + is_valid = true; + } std::string toString() const; + std::optional getFilePath() const { return file_path; } std::optional getRetryAfterUs() const { return retry_after_us; } + std::optional getFileMetaInfo() const { return file_meta_info; } private: bool is_valid = false; + std::optional file_path; std::optional retry_after_us; + std::optional file_meta_info; }; String relative_path; @@ -166,7 +180,10 @@ struct PathWithMetadata , object_storage_to_use(object_storage_to_use_) { if (command.isValid()) - relative_path = ""; + { + relative_path = command.getFilePath().value_or(""); + file_meta_info = command.getFileMetaInfo(); + } } PathWithMetadata(const PathWithMetadata & other) = default; diff --git a/src/Interpreters/ClusterFunctionReadTask.cpp b/src/Interpreters/ClusterFunctionReadTask.cpp index 23bd3a73843d..5d1671ce05b4 100644 --- a/src/Interpreters/ClusterFunctionReadTask.cpp +++ b/src/Interpreters/ClusterFunctionReadTask.cpp @@ -42,9 +42,10 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o { const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath(); - absolute_path = object->getAbsolutePath(); - file_bucket_info = object->file_bucket_info; } + + absolute_path = object->getAbsolutePath(); + file_bucket_info = object->file_bucket_info; } ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(const std::string & path_) @@ -96,15 +97,6 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc writeStringBinary("", out); } } - - if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA) - { - /// This info is not used when optimization is disabled, so there is no need to send it. - if (iceberg_read_optimization_enabled && file_meta_info.has_value()) - file_meta_info.value()->serialize(out); - else - DataFileMetaInfo().serialize(out); - } } void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in) @@ -142,14 +134,6 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in) file_bucket_info->deserialize(in); } } - - if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA) - { - auto info = std::make_shared(DataFileMetaInfo::deserialize(in)); - - if (!path.empty() && !info->empty()) - file_meta_info = info; - } } } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index 3205da746054..3e913e7df400 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -130,6 +131,82 @@ DataFileMetaInfo::DataFileMetaInfo( } } +DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info) +{ + if (!file_info) + return; + + auto log = getLogger("DataFileMetaInfo"); + + if (file_info->has("columns")) + { + auto columns = file_info->getArray("columns"); + for (size_t i = 0; i < columns->size(); ++i) + { + auto column = columns->getObject(static_cast(i)); + + std::string name; + if (column->has("name")) + name = column->get("name").toString(); + else + { + LOG_WARNING(log, "Can't read column name, ignored"); + continue; + } + + DB::DataFileMetaInfo::ColumnInfo column_info; + if (column->has("rows")) + column_info.rows_count = column->get("rows"); + if (column->has("nulls")) + column_info.nulls_count = column->get("nulls"); + if (column->has("range")) + { + Range range(""); + std::string r = column->get("range"); + try + { + range.deserialize(r); + column_info.hyperrectangle = std::move(range); + } + catch (const Exception & e) + { + LOG_WARNING(log, "Can't read range for column {}, range '{}' ignored, error: {}", name, r, e.what()); + } + } + + columns_info[name] = column_info; + } + } +} + +Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const +{ + Poco::JSON::Object::Ptr file_info = new Poco::JSON::Object(); + + if (!columns_info.empty()) + { + Poco::JSON::Array::Ptr columns = new Poco::JSON::Array(); + + for (const auto & column : columns_info) + { + Poco::JSON::Object::Ptr column_info = new Poco::JSON::Object(); + column_info->set("name", column.first); + if (column.second.rows_count.has_value()) + column_info->set("rows", column.second.rows_count.value()); + if (column.second.nulls_count.has_value()) + column_info->set("nulls", column.second.nulls_count.value()); + if (column.second.hyperrectangle.has_value()) + column_info->set("range", column.second.hyperrectangle.value().serialize()); + + columns->add(column_info); + } + + file_info->set("columns", columns); + } + + return file_info; +} + constexpr size_t FIELD_MASK_ROWS = 0x1; constexpr size_t FIELD_MASK_NULLS = 0x2; constexpr size_t FIELD_MASK_RECT = 0x4; diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 28d7faf1a765..41916f2b7188 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -44,6 +44,12 @@ class DataFileMetaInfo public: DataFileMetaInfo() = default; + // Deserialize from json in distributed requests + explicit DataFileMetaInfo(const Poco::JSON::Object::Ptr file_info); + + // Serialize to json in distributed requests + Poco::JSON::Object::Ptr toJson() const; + // subset of Iceberg::ColumnInfo now struct ColumnInfo { @@ -92,7 +98,6 @@ class StorageObjectStorageConfiguration; using StorageObjectStorageConfigurationPtr = std::shared_ptr; struct StorageID; struct IObjectIterator; -struct RelativePathWithMetadata; class IObjectStorage; using ObjectIterator = std::shared_ptr; using ObjectStoragePtr = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 1fc4860831e1..40e0e32ff6fe 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -528,6 +528,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade sleepForMicroseconds(wait_time); continue; } + object_info->setFileMetaInfo(object_info->getCommand().getFileMetaInfo()); } if (object_info->getPath().empty()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index aec80ef8e082..4d65c42b361f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -179,6 +179,14 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter file_identifier = object_info->getIdentifier(); } + auto file_meta_info = object_info->getFileMetaInfo(); + if (file_meta_info.has_value()) + { + auto file_path = send_over_whole_archive ? file_identifier : object_info->getPath(); + object_info->command.setFilePath(file_path); + object_info->command.setFileMetaInfo(file_meta_info.value()); + } + size_t file_replica_idx = getReplicaForFile(file_identifier); if (file_replica_idx == number_of_current_replica) { From 8a8f1e4b63d62a2c1ebd05e6499e678c695b876a Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 3 Feb 2026 15:10:11 +0100 Subject: [PATCH 2/5] Serialize range in Base64 --- src/Core/Range.cpp | 12 ++++++++---- src/Core/Range.h | 4 ++-- .../ObjectStorage/DataLakes/IDataLakeMetadata.cpp | 4 ++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/Core/Range.cpp b/src/Core/Range.cpp index 54d2ae87d200..766c2d4719c8 100644 --- a/src/Core/Range.cpp +++ b/src/Core/Range.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -290,7 +291,7 @@ bool Range::nearByWith(const Range & r) const return false; } -String Range::serialize() const +String Range::serialize(bool base64) const { WriteBufferFromOwnString str; @@ -298,15 +299,18 @@ String Range::serialize() const writeFieldBinary(left, str); writeFieldBinary(right, str); - return str.str(); + if (base64) + return base64Encode(str.str()); + else + return str.str(); } -void Range::deserialize(const String & range) +void Range::deserialize(const String & range, bool base64) { if (range.empty()) throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump"); - ReadBufferFromOwnString str(range); + ReadBufferFromOwnString str(base64 ? base64Decode(range) : range); str >> left_included >> right_included; left = readFieldBinary(str); diff --git a/src/Core/Range.h b/src/Core/Range.h index 12463d448945..059b7470d1d7 100644 --- a/src/Core/Range.h +++ b/src/Core/Range.h @@ -117,8 +117,8 @@ struct Range String toString() const; - String serialize() const; - void deserialize(const String & range); + String serialize(bool base64 = false) const; + void deserialize(const String & range, bool base64 = false); }; Range intersect(const Range & a, const Range & b); diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index 3e913e7df400..b4322f390805 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -165,7 +165,7 @@ DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info) std::string r = column->get("range"); try { - range.deserialize(r); + range.deserialize(r, /*base64*/ true); column_info.hyperrectangle = std::move(range); } catch (const Exception & e) @@ -196,7 +196,7 @@ Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const if (column.second.nulls_count.has_value()) column_info->set("nulls", column.second.nulls_count.value()); if (column.second.hyperrectangle.has_value()) - column_info->set("range", column.second.hyperrectangle.value().serialize()); + column_info->set("range", column.second.hyperrectangle.value().serialize(/*base64*/ true)); columns->add(column_info); } From 8a4dfad2aa72c0b9d59838198af6d1d0e7816684 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 3 Feb 2026 15:12:21 +0100 Subject: [PATCH 3/5] Check setting before JSON sending --- src/Interpreters/ClusterFunctionReadTask.cpp | 2 -- src/Interpreters/ClusterFunctionReadTask.h | 2 -- .../StorageObjectStorageCluster.cpp | 15 +++++++++++---- ...orageObjectStorageStableTaskDistributor.cpp | 17 +++++++++++------ ...StorageObjectStorageStableTaskDistributor.h | 4 +++- .../tests/gtest_rendezvous_hashing.cpp | 18 +++++++++--------- 6 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/Interpreters/ClusterFunctionReadTask.cpp b/src/Interpreters/ClusterFunctionReadTask.cpp index 5d1671ce05b4..104e6b8adb2d 100644 --- a/src/Interpreters/ClusterFunctionReadTask.cpp +++ b/src/Interpreters/ClusterFunctionReadTask.cpp @@ -22,11 +22,9 @@ namespace ErrorCodes namespace Setting { extern const SettingsBool cluster_function_process_archive_on_multiple_nodes; - extern const SettingsBool allow_experimental_iceberg_read_optimization; } ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr object, const ContextPtr & context) - : iceberg_read_optimization_enabled(context->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]) { if (!object) throw Exception(ErrorCodes::LOGICAL_ERROR, "`object` cannot be null"); diff --git a/src/Interpreters/ClusterFunctionReadTask.h b/src/Interpreters/ClusterFunctionReadTask.h index 7e6af60424f8..365374f8a0e2 100644 --- a/src/Interpreters/ClusterFunctionReadTask.h +++ b/src/Interpreters/ClusterFunctionReadTask.h @@ -32,8 +32,6 @@ struct ClusterFunctionReadTaskResponse /// File's columns info std::optional file_meta_info; - const bool iceberg_read_optimization_enabled = false; - /// Convert received response into ObjectInfo. ObjectInfoPtr getObjectInfo() const; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index f02f65e5ad80..3631fcf04daa 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -34,6 +34,7 @@ namespace Setting extern const SettingsBool cluster_function_process_archive_on_multiple_nodes; extern const SettingsUInt64 lock_object_storage_task_distribution_ms; extern const SettingsString object_storage_cluster; + extern const SettingsBool allow_experimental_iceberg_read_optimization; } namespace ErrorCodes @@ -493,9 +494,14 @@ class TaskDistributor : public TaskIterator std::vector && ids_of_hosts, bool send_over_whole_archive, uint64_t lock_object_storage_task_distribution_ms, - ContextPtr context_ - ) - : task_distributor(iterator, std::move(ids_of_hosts), send_over_whole_archive, lock_object_storage_task_distribution_ms) + ContextPtr context_, + bool iceberg_read_optimization_enabled) + : task_distributor( + iterator, + std::move(ids_of_hosts), + send_over_whole_archive, + lock_object_storage_task_distribution_ms, + iceberg_read_optimization_enabled) , context(context_) {} ~TaskDistributor() override = default; bool supportRerunTask() const override { return true; } @@ -569,7 +575,8 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten std::move(ids_of_hosts), /* send_over_whole_archive */!local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes], lock_object_storage_task_distribution_ms, - local_context); + local_context, + /* iceberg_read_optimization_enabled */local_context->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]); return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 4d65c42b361f..2356644acd66 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -22,13 +22,15 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib std::shared_ptr iterator_, std::vector && ids_of_nodes_, bool send_over_whole_archive_, - uint64_t lock_object_storage_task_distribution_ms_) + uint64_t lock_object_storage_task_distribution_ms_, + bool iceberg_read_optimization_enabled_) : iterator(std::move(iterator_)) , send_over_whole_archive(send_over_whole_archive_) , connection_to_files(ids_of_nodes_.size()) , ids_of_nodes(std::move(ids_of_nodes_)) , lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) , iterator_exhausted(false) + , iceberg_read_optimization_enabled(iceberg_read_optimization_enabled_) { Poco::Timestamp now; size_t nodes = ids_of_nodes.size(); @@ -179,12 +181,15 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter file_identifier = object_info->getIdentifier(); } - auto file_meta_info = object_info->getFileMetaInfo(); - if (file_meta_info.has_value()) + if (iceberg_read_optimization_enabled) { - auto file_path = send_over_whole_archive ? file_identifier : object_info->getPath(); - object_info->command.setFilePath(file_path); - object_info->command.setFileMetaInfo(file_meta_info.value()); + auto file_meta_info = object_info->getFileMetaInfo(); + if (file_meta_info.has_value()) + { + auto file_path = send_over_whole_archive ? file_identifier : object_info->getPath(); + object_info->command.setFilePath(file_path); + object_info->command.setFileMetaInfo(file_meta_info.value()); + } } size_t file_replica_idx = getReplicaForFile(file_identifier); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 25673b3eeb02..0cd10ac7188e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -25,7 +25,8 @@ class StorageObjectStorageStableTaskDistributor std::shared_ptr iterator_, std::vector && ids_of_nodes_, bool send_over_whole_archive_, - uint64_t lock_object_storage_task_distribution_ms_); + uint64_t lock_object_storage_task_distribution_ms_, + bool iceberg_read_optimization_enabled_); ObjectInfoPtr getNextTask(size_t number_of_current_replica); @@ -54,6 +55,7 @@ class StorageObjectStorageStableTaskDistributor std::mutex mutex; bool iterator_exhausted = false; + bool iceberg_read_optimization_enabled = false; LoggerPtr log = getLogger("StorageClusterTaskDistributor"); }; diff --git a/src/Storages/ObjectStorage/tests/gtest_rendezvous_hashing.cpp b/src/Storages/ObjectStorage/tests/gtest_rendezvous_hashing.cpp index 47a45d925ebf..c9fc2831b214 100644 --- a/src/Storages/ObjectStorage/tests/gtest_rendezvous_hashing.cpp +++ b/src/Storages/ObjectStorage/tests/gtest_rendezvous_hashing.cpp @@ -101,7 +101,7 @@ TEST(RendezvousHashing, SingleNode) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0, false); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 0, 10)); ASSERT_TRUE(checkHead(paths, {6})); @@ -110,7 +110,7 @@ TEST(RendezvousHashing, SingleNode) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0, false); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 1, 10)); ASSERT_TRUE(checkHead(paths, {0, 2, 4})); @@ -119,7 +119,7 @@ TEST(RendezvousHashing, SingleNode) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0, false); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 2, 10)); ASSERT_TRUE(checkHead(paths, {1, 5, 7, 8})); @@ -128,7 +128,7 @@ TEST(RendezvousHashing, SingleNode) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0, false); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 3, 10)); ASSERT_TRUE(checkHead(paths, {3, 9})); @@ -139,7 +139,7 @@ TEST(RendezvousHashing, MultipleNodes) { auto iterator = makeIterator(); std::vector replicas = {"replica0", "replica1", "replica2", "replica3"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0, false); { std::vector paths; @@ -171,7 +171,7 @@ TEST(RendezvousHashing, SingleNodeReducedCluster) { auto iterator = makeIterator(); std::vector replicas = {"replica2", "replica1"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0, false); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 0, 10)); ASSERT_TRUE(checkHead(paths, {1, 5, 6, 7, 8, 9})); @@ -180,7 +180,7 @@ TEST(RendezvousHashing, SingleNodeReducedCluster) { auto iterator = makeIterator(); std::vector replicas = {"replica2", "replica1"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0, false); std::vector paths; ASSERT_TRUE(extractNForReplica(distributor, paths, 1, 10)); ASSERT_TRUE(checkHead(paths, {0, 2, 3, 4})); @@ -191,7 +191,7 @@ TEST(RendezvousHashing, MultipleNodesReducedCluster) { auto iterator = makeIterator(); std::vector replicas = {"replica2", "replica1"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0, false); { std::vector paths; @@ -210,7 +210,7 @@ TEST(RendezvousHashing, MultipleNodesReducedClusterOneByOne) { auto iterator = makeIterator(); std::vector replicas = {"replica2", "replica1"}; - StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0); + StorageObjectStorageStableTaskDistributor distributor(iterator, std::move(replicas), false, 0, false); std::vector paths0; std::vector paths1; From f9ab0c6ab2c89f9e7b854c4d32ca141d9efe27ac Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 3 Feb 2026 15:46:29 +0100 Subject: [PATCH 4/5] Fix path when is true and object is not an archive --- .../ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 2356644acd66..f26e2a5019a8 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -186,7 +186,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter auto file_meta_info = object_info->getFileMetaInfo(); if (file_meta_info.has_value()) { - auto file_path = send_over_whole_archive ? file_identifier : object_info->getPath(); + auto file_path = send_over_whole_archive ? object_info->getPathOrPathToArchiveIfArchive() : object_info->getPath(); object_info->command.setFilePath(file_path); object_info->command.setFileMetaInfo(file_meta_info.value()); } From ef00829426c881af531e4689c87d03e43bfb3d62 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 3 Feb 2026 16:38:34 +0100 Subject: [PATCH 5/5] Set file_meta_info only when filled --- src/Interpreters/ClusterFunctionReadTask.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/ClusterFunctionReadTask.cpp b/src/Interpreters/ClusterFunctionReadTask.cpp index 104e6b8adb2d..5efeef0d7724 100644 --- a/src/Interpreters/ClusterFunctionReadTask.cpp +++ b/src/Interpreters/ClusterFunctionReadTask.cpp @@ -58,7 +58,8 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const auto object = std::make_shared(path); object->data_lake_metadata = data_lake_metadata; - object->file_meta_info = file_meta_info; + if (file_meta_info.has_value()) + object->file_meta_info = file_meta_info; if (absolute_path.has_value() && !absolute_path.value().empty()) object->absolute_path = absolute_path;