diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp index 90aef3bf00bf..c0706cc57b6c 100644 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp @@ -1,11 +1,16 @@ #include +#include +#include #include +#include namespace ProfileEvents { extern const Event ExportPartitionZooKeeperRequests; + extern const Event ExportPartitionZooKeeperGet; extern const Event ExportPartitionZooKeeperGetChildren; extern const Event ExportPartitionZooKeeperCreate; + extern const Event ExportPartitionZooKeeperMulti; } namespace DB { @@ -13,10 +18,12 @@ namespace DB ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask( StorageReplicatedMergeTree & storage_, const std::string & key_, - const MergeTreePartExportManifest & manifest_) + const MergeTreePartExportManifest & manifest_, + size_t max_retries_) : storage(storage_), key(key_), - manifest(manifest_) + manifest(manifest_), + max_retries(max_retries_) { export_part_task = std::make_shared(storage, manifest); } @@ -25,19 +32,58 @@ bool ExportPartFromPartitionExportTask::executeStep() { const auto zk = storage.getZooKeeper(); const auto part_name = manifest.data_part->name; + const auto processing_part_path = fs::path(storage.zookeeper_path) / "exports" / key / "processing" / part_name; - LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name); + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock and increment retry count for part: {}", part_name); + + Coordination::Stat processing_part_stat; + std::string processing_part_string; + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + if (!zk->tryGet(processing_part_path, processing_part_string, &processing_part_stat)) + { + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to get processing part for {}, skipping", part_name); + return false; + } + + auto processing_part_entry = ExportReplicatedMergeTreePartitionProcessingPartEntry::fromJsonString(processing_part_string); + + /// If retry count already exceeds limit, mark as failed (recovery for stale state). + if (processing_part_entry.retry_count > max_retries) + { + LOG_INFO( + storage.log, + "ExportPartFromPartitionExportTask: Retry count limit exceeded for part {} and it is not marked as failed. Trying to mark it as failed", + part_name); + + processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED; + processing_part_entry.finished_by = "unknown"; + + const auto export_path = fs::path(storage.zookeeper_path) / "exports" / key; + zk->trySet(processing_part_path, processing_part_entry.toJsonString(), processing_part_stat.version); + zk->trySet(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1); + return false; + } + + processing_part_entry.retry_count++; + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeSetRequest(processing_part_path.string(), processing_part_entry.toJsonString(), processing_part_stat.version)); + ops.emplace_back(zkutil::makeCreateRequest((fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name).string(), storage.replica_name, zkutil::CreateMode::Ephemeral)); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); - if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses responses; + if (Coordination::Error::ZOK != zk->tryMulti(ops, responses)) { - LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name); - export_part_task->executeStep(); + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to either lock or increment retry count for part {}, skipping", part_name); return false; } - LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name); + LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name); + export_part_task->executeStep(); return false; } diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h index e170b22b470d..1b74b6848880 100644 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h +++ b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h @@ -17,7 +17,8 @@ class ExportPartFromPartitionExportTask : public IExecutableTask explicit ExportPartFromPartitionExportTask( StorageReplicatedMergeTree & storage_, const std::string & key_, - const MergeTreePartExportManifest & manifest_); + const MergeTreePartExportManifest & manifest_, + size_t max_retries_); bool executeStep() override; void onCompleted() override; StorageID getStorageID() const override; @@ -30,6 +31,7 @@ class ExportPartFromPartitionExportTask : public IExecutableTask StorageReplicatedMergeTree & storage; std::string key; MergeTreePartExportManifest manifest; + size_t max_retries; std::shared_ptr export_part_task; }; diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 3beb67e1968f..9dec667e6dfb 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -190,6 +190,49 @@ void ExportPartitionTaskScheduler::run() continue; } + Coordination::Stat processing_part_stat; + const auto processing_part_path = fs::path(storage.zookeeper_path) / "exports" / key / "processing" / zk_part_name; + + std::string processing_part_string; + + if (!zk->tryGet(processing_part_path, processing_part_string, &processing_part_stat)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get processing part, will not attempt to schedule export task"); + continue; + } + + /// todo arthur could this have been cached? + auto processing_part_entry = ExportReplicatedMergeTreePartitionProcessingPartEntry::fromJsonString(processing_part_string); + + /// if the status is pending and the retry count is greater than the limit, then it means the last node that attempted to export this part failed to set the status of the task to failed + /// try to fix it manually. + if (processing_part_entry.retry_count > manifest.max_retries) + { + LOG_INFO( + storage.log, + "ExportPartition scheduler task: Retry count limit exceeded for part {} and it is not marked as failed. Trying to mark it as failed", + zk_part_name); + + processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED; + processing_part_entry.finished_by = "unknown"; + + Coordination::Requests ops; + const auto export_path = fs::path(storage.zookeeper_path) / "exports" / key; + + ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), processing_part_stat.version)); + ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1)); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + Coordination::Responses responses; + if (Coordination::Error::ZOK != zk->tryMulti(ops, responses)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to mark part as failed, skipping"); + continue; + } + continue; + } + LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduling part export: {}", zk_part_name); auto context = getContextCopyWithTaskSettings(storage.getContext(), manifest); @@ -215,7 +258,7 @@ void ExportPartitionTaskScheduler::run() handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); }); - part_export_manifest.task = std::make_shared(storage, key, part_export_manifest); + part_export_manifest.task = std::make_shared(storage, key, part_export_manifest, manifest.max_retries); /// todo arthur this might conflict with the standalone export part. what to do in this case? if (!storage.export_manifests.emplace(part_export_manifest).second) @@ -237,13 +280,22 @@ void ExportPartitionTaskScheduler::run() { LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table"); - LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name); + LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock and increment retry count for part: {}", zk_part_name); + + processing_part_entry.retry_count++; + + Coordination::Requests ops; + + ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), processing_part_stat.version)); + ops.emplace_back(zkutil::makeCreateRequest(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); - if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses responses; + if (Coordination::Error::ZOK != zk->tryMulti(ops, responses)) { - LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to either lock or increment retry count for part {}, skipping", zk_part_name); continue; } @@ -342,7 +394,7 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( size_t max_retries ) { - LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed, will now increment counters", part_name); + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed", part_name); if (!exception) { @@ -390,10 +442,7 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( /// todo arthur could this have been cached? auto processing_part_entry = ExportReplicatedMergeTreePartitionProcessingPartEntry::fromJsonString(processing_part_string); - processing_part_entry.retry_count++; - ops.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version)); - ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1)); LOG_INFO(storage.log, "ExportPartition scheduler task: Updating processing part entry for part {}, retry count: {}, max retries: {}", part_name, processing_part_entry.retry_count, max_retries); @@ -406,10 +455,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( ops.emplace_back(zkutil::makeSetRequest(export_path / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), -1)); LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name); } - else - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit not exceeded for part {}, will increment retry count", part_name); - } + + ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1)); std::size_t num_exceptions = 0; diff --git a/src/Storages/MergeTree/MergeTreeMutationsSnapshot.h b/src/Storages/MergeTree/MergeTreeMutationsSnapshot.h new file mode 100644 index 000000000000..b2ab7e4ae8e1 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeMutationsSnapshot.h @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +/// Type aliases for partition-related maps (also defined in MergeTreeData.h for backward compatibility) +using PartitionIdToMinBlock = std::unordered_map; +using PartitionIdToMinBlockPtr = std::shared_ptr; + +/// A snapshot of pending mutations that weren't applied to some of the parts yet +/// and should be applied on the fly (i.e. when reading from the part). +/// Mutations not supported by AlterConversions (isSupported*Mutation) can be omitted. +struct IMutationsSnapshot +{ + /// Contains info that doesn't depend on state of mutations. + struct Params + { + Int64 metadata_version = -1; + Int64 min_part_metadata_version = -1; + PartitionIdToMinBlockPtr min_part_data_versions = nullptr; + PartitionIdToMaxBlockPtr max_mutation_versions = nullptr; + bool need_data_mutations = false; + bool need_alter_mutations = false; + bool need_patch_parts = false; + }; + + static Int64 getMinPartDataVersionForPartition(const Params & params, const String & partition_id); + static Int64 getMaxMutationVersionForPartition(const Params & params, const String & partition_id); + + static bool needIncludeMutationToSnapshot(const Params & params, const MutationCommands & commands); + + virtual ~IMutationsSnapshot() = default; + virtual void addPatches(DataPartsVector patches_) = 0; + + /// Returns mutation commands that are required to be applied to the `part`. + /// @return list of mutation commands in order: oldest to newest. + virtual MutationCommands getOnFlyMutationCommandsForPart(const DataPartPtr & part) const = 0; + virtual PatchParts getPatchesForPart(const DataPartPtr & part) const = 0; + virtual std::shared_ptr cloneEmpty() const = 0; + virtual NameSet getAllUpdatedColumns() const = 0; + + virtual bool hasPatchParts() const = 0; + virtual bool hasDataMutations() const = 0; + virtual bool hasAlterMutations() const = 0; + virtual bool hasMetadataMutations() const = 0; + bool hasAnyMutations() const { return hasDataMutations() || hasAlterMutations() || hasMetadataMutations(); } +}; + +struct MutationsSnapshotBase : public IMutationsSnapshot +{ +public: + Params params; + MutationCounters counters; + PatchesByPartition patches_by_partition; + + MutationsSnapshotBase() = default; + MutationsSnapshotBase(Params params_, MutationCounters counters_, DataPartsVector patches_); + + void addPatches(DataPartsVector patches_) override; + PatchParts getPatchesForPart(const DataPartPtr & part) const final; + + bool hasPatchParts() const final { return !patches_by_partition.empty(); } + bool hasDataMutations() const final { return counters.num_data > 0; } + bool hasAlterMutations() const final { return counters.num_alter > 0; } + bool hasMetadataMutations() const final { return counters.num_metadata > 0; } + +protected: + NameSet getColumnsUpdatedInPatches() const; + void addSupportedCommands(const MutationCommands & commands, UInt64 mutation_version, MutationCommands & result_commands) const; +}; + +using MutationsSnapshotPtr = std::shared_ptr; + +} +