Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 54 additions & 8 deletions src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
#include <Storages/MergeTree/ExportPartFromPartitionExportTask.h>
#include <Storages/ExportReplicatedMergeTreePartitionManifest.h>
#include <Storages/ExportReplicatedMergeTreePartitionTaskEntry.h>
#include <Common/ProfileEvents.h>
#include <Common/ZooKeeper/Types.h>

namespace ProfileEvents
{
extern const Event ExportPartitionZooKeeperRequests;
extern const Event ExportPartitionZooKeeperGet;
extern const Event ExportPartitionZooKeeperGetChildren;
extern const Event ExportPartitionZooKeeperCreate;
extern const Event ExportPartitionZooKeeperMulti;
}
namespace DB
{

ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask(
StorageReplicatedMergeTree & storage_,
const std::string & key_,
const MergeTreePartExportManifest & manifest_)
const MergeTreePartExportManifest & manifest_,
size_t max_retries_)
: storage(storage_),
key(key_),
manifest(manifest_)
manifest(manifest_),
max_retries(max_retries_)
{
export_part_task = std::make_shared<ExportPartTask>(storage, manifest);
}
Expand All @@ -25,19 +32,58 @@ bool ExportPartFromPartitionExportTask::executeStep()
{
const auto zk = storage.getZooKeeper();
const auto part_name = manifest.data_part->name;
const auto processing_part_path = fs::path(storage.zookeeper_path) / "exports" / key / "processing" / part_name;

LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name);
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock and increment retry count for part: {}", part_name);

Coordination::Stat processing_part_stat;
std::string processing_part_string;

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet);
if (!zk->tryGet(processing_part_path, processing_part_string, &processing_part_stat))
{
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to get processing part for {}, skipping", part_name);
return false;
}

auto processing_part_entry = ExportReplicatedMergeTreePartitionProcessingPartEntry::fromJsonString(processing_part_string);

/// If retry count already exceeds limit, mark as failed (recovery for stale state).
if (processing_part_entry.retry_count > max_retries)
{
Comment on lines +52 to +54

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use >= when enforcing retry cap

The new stale-state recovery check only trips when retry_count > max_retries, so a part that has already consumed its last allowed retry (e.g. a node crashes after incrementing retry_count but before handlePartExportFailure can mark FAILED) will still be re-attempted. That yields an extra export attempt beyond the configured export_merge_tree_partition_max_retries, and can lead to repeated retries if the same crash repeats. Using >= here (and in the analogous scheduler check) would keep behavior consistent with the failure handler’s >= limit and avoid exceeding the configured cap in this crash/pending scenario.

Useful? React with 👍 / 👎.

LOG_INFO(
storage.log,
"ExportPartFromPartitionExportTask: Retry count limit exceeded for part {} and it is not marked as failed. Trying to mark it as failed",
part_name);

processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED;
processing_part_entry.finished_by = "unknown";

const auto export_path = fs::path(storage.zookeeper_path) / "exports" / key;
zk->trySet(processing_part_path, processing_part_entry.toJsonString(), processing_part_stat.version);
zk->trySet(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1);
return false;
}

processing_part_entry.retry_count++;

Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(processing_part_path.string(), processing_part_entry.toJsonString(), processing_part_stat.version));
ops.emplace_back(zkutil::makeCreateRequest((fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name).string(), storage.replica_name, zkutil::CreateMode::Ephemeral));

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti);

Coordination::Responses responses;
if (Coordination::Error::ZOK != zk->tryMulti(ops, responses))
{
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name);
export_part_task->executeStep();
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to either lock or increment retry count for part {}, skipping", part_name);
return false;
}

LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name);
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name);
export_part_task->executeStep();
return false;
}

Expand Down
4 changes: 3 additions & 1 deletion src/Storages/MergeTree/ExportPartFromPartitionExportTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class ExportPartFromPartitionExportTask : public IExecutableTask
explicit ExportPartFromPartitionExportTask(
StorageReplicatedMergeTree & storage_,
const std::string & key_,
const MergeTreePartExportManifest & manifest_);
const MergeTreePartExportManifest & manifest_,
size_t max_retries_);
bool executeStep() override;
void onCompleted() override;
StorageID getStorageID() const override;
Expand All @@ -30,6 +31,7 @@ class ExportPartFromPartitionExportTask : public IExecutableTask
StorageReplicatedMergeTree & storage;
std::string key;
MergeTreePartExportManifest manifest;
size_t max_retries;
std::shared_ptr<ExportPartTask> export_part_task;
};

Expand Down
73 changes: 60 additions & 13 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,49 @@ void ExportPartitionTaskScheduler::run()
continue;
}

Coordination::Stat processing_part_stat;
const auto processing_part_path = fs::path(storage.zookeeper_path) / "exports" / key / "processing" / zk_part_name;

std::string processing_part_string;

if (!zk->tryGet(processing_part_path, processing_part_string, &processing_part_stat))
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get processing part, will not attempt to schedule export task");
continue;
}

/// todo arthur could this have been cached?
auto processing_part_entry = ExportReplicatedMergeTreePartitionProcessingPartEntry::fromJsonString(processing_part_string);

/// if the status is pending and the retry count is greater than the limit, then it means the last node that attempted to export this part failed to set the status of the task to failed
/// try to fix it manually.
if (processing_part_entry.retry_count > manifest.max_retries)
{
LOG_INFO(
storage.log,
"ExportPartition scheduler task: Retry count limit exceeded for part {} and it is not marked as failed. Trying to mark it as failed",
zk_part_name);

processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED;
processing_part_entry.finished_by = "unknown";

Coordination::Requests ops;
const auto export_path = fs::path(storage.zookeeper_path) / "exports" / key;

ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), processing_part_stat.version));
ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1));

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti);
Coordination::Responses responses;
if (Coordination::Error::ZOK != zk->tryMulti(ops, responses))
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to mark part as failed, skipping");
continue;
}
continue;
}

LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduling part export: {}", zk_part_name);

auto context = getContextCopyWithTaskSettings(storage.getContext(), manifest);
Expand All @@ -215,7 +258,7 @@ void ExportPartitionTaskScheduler::run()
handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result);
});

part_export_manifest.task = std::make_shared<ExportPartFromPartitionExportTask>(storage, key, part_export_manifest);
part_export_manifest.task = std::make_shared<ExportPartFromPartitionExportTask>(storage, key, part_export_manifest, manifest.max_retries);

/// todo arthur this might conflict with the standalone export part. what to do in this case?
if (!storage.export_manifests.emplace(part_export_manifest).second)
Expand All @@ -237,13 +280,22 @@ void ExportPartitionTaskScheduler::run()
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table");

LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name);
LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock and increment retry count for part: {}", zk_part_name);

processing_part_entry.retry_count++;

Coordination::Requests ops;

ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), processing_part_stat.version));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral));

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti);

Coordination::Responses responses;
if (Coordination::Error::ZOK != zk->tryMulti(ops, responses))
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name);
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to either lock or increment retry count for part {}, skipping", zk_part_name);
continue;
}

Expand Down Expand Up @@ -342,7 +394,7 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
size_t max_retries
)
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed, will now increment counters", part_name);
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed", part_name);

if (!exception)
{
Expand Down Expand Up @@ -390,10 +442,7 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
/// todo arthur could this have been cached?
auto processing_part_entry = ExportReplicatedMergeTreePartitionProcessingPartEntry::fromJsonString(processing_part_string);

processing_part_entry.retry_count++;

ops.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version));
ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1));

LOG_INFO(storage.log, "ExportPartition scheduler task: Updating processing part entry for part {}, retry count: {}, max retries: {}", part_name, processing_part_entry.retry_count, max_retries);

Expand All @@ -406,10 +455,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1));
LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name);
}
else
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit not exceeded for part {}, will increment retry count", part_name);
}

ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1));

std::size_t num_exceptions = 0;

Expand Down
84 changes: 84 additions & 0 deletions src/Storages/MergeTree/MergeTreeMutationsSnapshot.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#pragma once

#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/PatchParts/PatchPartInfo.h>
#include <Storages/MergeTree/PatchParts/PatchPartsUtils.h>
#include <Storages/MergeTree/AlterConversions.h>
#include <Core/Types.h>
#include <memory>
#include <unordered_map>

namespace DB
{

/// Type aliases for partition-related maps (also defined in MergeTreeData.h for backward compatibility)
using PartitionIdToMinBlock = std::unordered_map<String, Int64>;
using PartitionIdToMinBlockPtr = std::shared_ptr<const PartitionIdToMinBlock>;

/// A snapshot of pending mutations that weren't applied to some of the parts yet
/// and should be applied on the fly (i.e. when reading from the part).
/// Mutations not supported by AlterConversions (isSupported*Mutation) can be omitted.
struct IMutationsSnapshot
{
/// Contains info that doesn't depend on state of mutations.
struct Params
{
Int64 metadata_version = -1;
Int64 min_part_metadata_version = -1;
PartitionIdToMinBlockPtr min_part_data_versions = nullptr;
PartitionIdToMaxBlockPtr max_mutation_versions = nullptr;
bool need_data_mutations = false;
bool need_alter_mutations = false;
bool need_patch_parts = false;
};

static Int64 getMinPartDataVersionForPartition(const Params & params, const String & partition_id);
static Int64 getMaxMutationVersionForPartition(const Params & params, const String & partition_id);

static bool needIncludeMutationToSnapshot(const Params & params, const MutationCommands & commands);

virtual ~IMutationsSnapshot() = default;
virtual void addPatches(DataPartsVector patches_) = 0;

/// Returns mutation commands that are required to be applied to the `part`.
/// @return list of mutation commands in order: oldest to newest.
virtual MutationCommands getOnFlyMutationCommandsForPart(const DataPartPtr & part) const = 0;
virtual PatchParts getPatchesForPart(const DataPartPtr & part) const = 0;
virtual std::shared_ptr<IMutationsSnapshot> cloneEmpty() const = 0;
virtual NameSet getAllUpdatedColumns() const = 0;

virtual bool hasPatchParts() const = 0;
virtual bool hasDataMutations() const = 0;
virtual bool hasAlterMutations() const = 0;
virtual bool hasMetadataMutations() const = 0;
bool hasAnyMutations() const { return hasDataMutations() || hasAlterMutations() || hasMetadataMutations(); }
};

struct MutationsSnapshotBase : public IMutationsSnapshot
{
public:
Params params;
MutationCounters counters;
PatchesByPartition patches_by_partition;

MutationsSnapshotBase() = default;
MutationsSnapshotBase(Params params_, MutationCounters counters_, DataPartsVector patches_);

void addPatches(DataPartsVector patches_) override;
PatchParts getPatchesForPart(const DataPartPtr & part) const final;

bool hasPatchParts() const final { return !patches_by_partition.empty(); }
bool hasDataMutations() const final { return counters.num_data > 0; }
bool hasAlterMutations() const final { return counters.num_alter > 0; }
bool hasMetadataMutations() const final { return counters.num_metadata > 0; }

protected:
NameSet getColumnsUpdatedInPatches() const;
void addSupportedCommands(const MutationCommands & commands, UInt64 mutation_version, MutationCommands & result_commands) const;
};

using MutationsSnapshotPtr = std::shared_ptr<const IMutationsSnapshot>;

}