Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 5;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO;

static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;
Expand Down
34 changes: 34 additions & 0 deletions src/Core/Range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@
#include <Core/Range.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <Common/FieldVisitorToString.h>
#include <Common/FieldAccurateComparison.h>
#include <Common/Base64.h>


namespace DB
{

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
};


FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_)
: Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_)
{
Expand Down Expand Up @@ -283,6 +291,32 @@ bool Range::nearByWith(const Range & r) const
return false;
}

String Range::serialize(bool base64) const
{
WriteBufferFromOwnString str;

str << left_included << right_included;
writeFieldBinary(left, str);
writeFieldBinary(right, str);

if (base64)
return base64Encode(str.str());
else
return str.str();
}

void Range::deserialize(const String & range, bool base64)
{
if (range.empty())
throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump");

ReadBufferFromOwnString str(base64 ? base64Decode(range) : range);

str >> left_included >> right_included;
left = readFieldBinary(str);
right = readFieldBinary(str);
}

Range intersect(const Range & a, const Range & b)
{
Range res = Range::createWholeUniverse();
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Range.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ struct Range
bool nearByWith(const Range & r) const;

String toString() const;

String serialize(bool base64 = false) const;
void deserialize(const String & range, bool base64 = false);
};

Range intersect(const Range & a, const Range & b);
Expand Down
9 changes: 8 additions & 1 deletion src/Disks/ObjectStorages/IObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,12 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string

is_valid = true;

if (json->has("file_path"))
file_path = json->getValue<std::string>("file_path");
if (json->has("retry_after_us"))
retry_after_us = json->getValue<size_t>("retry_after_us");
if (json->has("meta_info"))
file_meta_info = std::make_shared<DataFileMetaInfo>(json->getObject("meta_info"));
}
catch (const Poco::JSON::JSONException &)
{ /// Not a JSON
Expand All @@ -132,16 +136,19 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string
std::string PathWithMetadata::CommandInTaskResponse::toString() const
{
Poco::JSON::Object json;
if (file_path.has_value())
json.set("file_path", file_path.value());
if (retry_after_us.has_value())
json.set("retry_after_us", retry_after_us.value());
if (file_meta_info.has_value())
json.set("meta_info", file_meta_info.value()->toJson());

std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}


void PathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file)
{
if (!metadata)
Expand Down
19 changes: 18 additions & 1 deletion src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,33 @@ struct PathWithMetadata
explicit CommandInTaskResponse(const std::string & task);

bool isValid() const { return is_valid; }
void setFilePath(const std::string & file_path_ )
{
file_path = file_path_;
is_valid = true;
}
void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us)
{
retry_after_us = time_us;
is_valid = true;
}
void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ )
{
file_meta_info = file_meta_info_;
is_valid = true;
}

std::string toString() const;

std::optional<std::string> getFilePath() const { return file_path; }
std::optional<Poco::Timestamp::TimeDiff> getRetryAfterUs() const { return retry_after_us; }
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }

private:
bool is_valid = false;
std::optional<std::string> file_path;
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
std::optional<DataFileMetaInfoPtr> file_meta_info;
};

String relative_path;
Expand Down Expand Up @@ -166,7 +180,10 @@ struct PathWithMetadata
, object_storage_to_use(object_storage_to_use_)
{
if (command.isValid())
relative_path = "";
{
relative_path = command.getFilePath().value_or("");
file_meta_info = command.getFileMetaInfo();
}
}

PathWithMetadata(const PathWithMetadata & other) = default;
Expand Down
27 changes: 5 additions & 22 deletions src/Interpreters/ClusterFunctionReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ namespace ErrorCodes
namespace Setting
{
extern const SettingsBool cluster_function_process_archive_on_multiple_nodes;
extern const SettingsBool allow_experimental_iceberg_read_optimization;
}

ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr object, const ContextPtr & context)
: iceberg_read_optimization_enabled(context->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization])
{
if (!object)
throw Exception(ErrorCodes::LOGICAL_ERROR, "`object` cannot be null");
Expand All @@ -42,9 +40,10 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
{
const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
absolute_path = object->getAbsolutePath();
file_bucket_info = object->file_bucket_info;
}

absolute_path = object->getAbsolutePath();
file_bucket_info = object->file_bucket_info;
}

ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(const std::string & path_)
Expand All @@ -59,7 +58,8 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const

auto object = std::make_shared<ObjectInfo>(path);
object->data_lake_metadata = data_lake_metadata;
object->file_meta_info = file_meta_info;
if (file_meta_info.has_value())
object->file_meta_info = file_meta_info;
if (absolute_path.has_value() && !absolute_path.value().empty())
object->absolute_path = absolute_path;

Expand Down Expand Up @@ -96,15 +96,6 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc
writeStringBinary("", out);
}
}

if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
{
/// This info is not used when optimization is disabled, so there is no need to send it.
if (iceberg_read_optimization_enabled && file_meta_info.has_value())
file_meta_info.value()->serialize(out);
else
DataFileMetaInfo().serialize(out);
}
}

void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
Expand Down Expand Up @@ -142,14 +133,6 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
file_bucket_info->deserialize(in);
}
}

if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
{
auto info = std::make_shared<DataFileMetaInfo>(DataFileMetaInfo::deserialize(in));

if (!path.empty() && !info->empty())
file_meta_info = info;
}
}

}
2 changes: 0 additions & 2 deletions src/Interpreters/ClusterFunctionReadTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ struct ClusterFunctionReadTaskResponse
/// File's columns info
std::optional<DataFileMetaInfoPtr> file_meta_info;

const bool iceberg_read_optimization_enabled = false;

/// Convert received response into ObjectInfo.
ObjectInfoPtr getObjectInfo() const;

Expand Down
77 changes: 77 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Core/Field.h>
#include <Common/logger_useful.h>

namespace DB
{
Expand Down Expand Up @@ -130,6 +131,82 @@ DataFileMetaInfo::DataFileMetaInfo(
}
}

DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info)
{
if (!file_info)
return;

auto log = getLogger("DataFileMetaInfo");

if (file_info->has("columns"))
{
auto columns = file_info->getArray("columns");
for (size_t i = 0; i < columns->size(); ++i)
{
auto column = columns->getObject(static_cast<UInt32>(i));

std::string name;
if (column->has("name"))
name = column->get("name").toString();
else
{
LOG_WARNING(log, "Can't read column name, ignored");
continue;
}

DB::DataFileMetaInfo::ColumnInfo column_info;
if (column->has("rows"))
column_info.rows_count = column->get("rows");
if (column->has("nulls"))
column_info.nulls_count = column->get("nulls");
if (column->has("range"))
{
Range range("");
std::string r = column->get("range");
try
{
range.deserialize(r, /*base64*/ true);
column_info.hyperrectangle = std::move(range);
}
catch (const Exception & e)
{
LOG_WARNING(log, "Can't read range for column {}, range '{}' ignored, error: {}", name, r, e.what());
}
}

columns_info[name] = column_info;
}
}
}

Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const
{
Poco::JSON::Object::Ptr file_info = new Poco::JSON::Object();

if (!columns_info.empty())
{
Poco::JSON::Array::Ptr columns = new Poco::JSON::Array();

for (const auto & column : columns_info)
{
Poco::JSON::Object::Ptr column_info = new Poco::JSON::Object();
column_info->set("name", column.first);
if (column.second.rows_count.has_value())
column_info->set("rows", column.second.rows_count.value());
if (column.second.nulls_count.has_value())
column_info->set("nulls", column.second.nulls_count.value());
if (column.second.hyperrectangle.has_value())
column_info->set("range", column.second.hyperrectangle.value().serialize(/*base64*/ true));

columns->add(column_info);
}

file_info->set("columns", columns);
}

return file_info;
}

constexpr size_t FIELD_MASK_ROWS = 0x1;
constexpr size_t FIELD_MASK_NULLS = 0x2;
constexpr size_t FIELD_MASK_RECT = 0x4;
Expand Down
7 changes: 6 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class DataFileMetaInfo
public:
DataFileMetaInfo() = default;

// Deserialize from json in distributed requests
explicit DataFileMetaInfo(const Poco::JSON::Object::Ptr file_info);

// Serialize to json in distributed requests
Poco::JSON::Object::Ptr toJson() const;

// subset of Iceberg::ColumnInfo now
struct ColumnInfo
{
Expand Down Expand Up @@ -92,7 +98,6 @@ class StorageObjectStorageConfiguration;
using StorageObjectStorageConfigurationPtr = std::shared_ptr<StorageObjectStorageConfiguration>;
struct StorageID;
struct IObjectIterator;
struct RelativePathWithMetadata;
class IObjectStorage;
using ObjectIterator = std::shared_ptr<IObjectIterator>;
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;
Expand Down
15 changes: 11 additions & 4 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace Setting
extern const SettingsBool cluster_function_process_archive_on_multiple_nodes;
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
extern const SettingsString object_storage_cluster;
extern const SettingsBool allow_experimental_iceberg_read_optimization;
}

namespace ErrorCodes
Expand Down Expand Up @@ -493,9 +494,14 @@ class TaskDistributor : public TaskIterator
std::vector<std::string> && ids_of_hosts,
bool send_over_whole_archive,
uint64_t lock_object_storage_task_distribution_ms,
ContextPtr context_
)
: task_distributor(iterator, std::move(ids_of_hosts), send_over_whole_archive, lock_object_storage_task_distribution_ms)
ContextPtr context_,
bool iceberg_read_optimization_enabled)
: task_distributor(
iterator,
std::move(ids_of_hosts),
send_over_whole_archive,
lock_object_storage_task_distribution_ms,
iceberg_read_optimization_enabled)
, context(context_) {}
~TaskDistributor() override = default;
bool supportRerunTask() const override { return true; }
Expand Down Expand Up @@ -569,7 +575,8 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
std::move(ids_of_hosts),
/* send_over_whole_archive */!local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes],
lock_object_storage_task_distribution_ms,
local_context);
local_context,
/* iceberg_read_optimization_enabled */local_context->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]);

return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
sleepForMicroseconds(wait_time);
continue;
}
object_info->setFileMetaInfo(object_info->getCommand().getFileMetaInfo());
}

if (object_info->getPath().empty())
Expand Down
Loading
Loading