Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/iceberg/catalog/rest/catalog_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,17 @@ Result<std::string_view> RestCatalogProperties::Uri() const {
return it->second;
}

Result<SnapshotMode> RestCatalogProperties::SnapshotLoadingMode() const {
auto mode_str = Get(kSnapshotLoadingMode);

if (mode_str == "ALL") {
return SnapshotMode::ALL;
} else if (mode_str == "REFS") {
return SnapshotMode::REFS;
} else {
return InvalidArgument(
"Invalid snapshot loading mode: '{}'. Expected 'ALL' or 'REFS'.", mode_str);
}
}

} // namespace iceberg::rest
15 changes: 15 additions & 0 deletions src/iceberg/catalog/rest/catalog_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@

namespace iceberg::rest {

/// \brief Snapshot loading mode for REST catalog.
enum class SnapshotMode {
/// Load all snapshots from the table metadata.
ALL,
/// Load only snapshots referenced in snapshot refs (branches/tags).
REFS
};

/// \brief Configuration class for a REST Catalog.
class ICEBERG_REST_EXPORT RestCatalogProperties
: public ConfigBase<RestCatalogProperties> {
Expand All @@ -47,6 +55,8 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
inline static Entry<std::string> kWarehouse{"warehouse", ""};
/// \brief The optional prefix for REST API paths.
inline static Entry<std::string> kPrefix{"prefix", ""};
/// \brief The snapshot loading mode (ALL or REFS).
inline static Entry<std::string> kSnapshotLoadingMode{"snapshot-loading-mode", "ALL"};
/// \brief The prefix for HTTP headers.
inline static constexpr std::string_view kHeaderPrefix = "header.";

Expand All @@ -64,6 +74,11 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
/// \return The URI if configured, or an error if not set or empty.
Result<std::string_view> Uri() const;

/// \brief Get the snapshot loading mode.
/// \return SnapshotMode::ALL if configured as "ALL", SnapshotMode::REFS if "REFS",
/// or an error if the value is invalid.
Result<SnapshotMode> SnapshotLoadingMode() const;

private:
RestCatalogProperties() = default;
};
Expand Down
33 changes: 24 additions & 9 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,26 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
paths, ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)),
final_config->Get(RestCatalogProperties::kPrefix)));

// Get snapshot loading mode
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config->SnapshotLoadingMode());

return std::shared_ptr<RestCatalog>(
new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths),
std::move(endpoints)));
std::move(endpoints), snapshot_mode));
}

RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
std::shared_ptr<FileIO> file_io,
std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints)
std::unordered_set<Endpoint> endpoints,
SnapshotMode snapshot_mode)
: config_(std::move(config)),
file_io_(std::move(file_io)),
client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
paths_(std::move(paths)),
name_(config_->Get(RestCatalogProperties::kName)),
supported_endpoints_(std::move(endpoints)) {}
supported_endpoints_(std::move(endpoints)),
snapshot_mode_(snapshot_mode) {}

std::string_view RestCatalog::name() const { return name_; }

Expand Down Expand Up @@ -376,8 +381,8 @@ Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {

Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
if (!supported_endpoints_.contains(Endpoint::TableExists())) {
// Fall back to call LoadTable
return CaptureNoSuchTable(LoadTableInternal(identifier));
// Fall back to call LoadTable with ALL mode (just checking existence)
return CaptureNoSuchTable(LoadTableInternal(identifier, SnapshotMode::ALL));
}

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
Expand All @@ -398,21 +403,31 @@ Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifi
return {};
}

Result<std::string> RestCatalog::LoadTableInternal(
const TableIdentifier& identifier) const {
Result<std::string> RestCatalog::LoadTableInternal(const TableIdentifier& identifier,
SnapshotMode mode) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));

std::unordered_map<std::string, std::string> params;
if (mode == SnapshotMode::REFS) {
params["snapshots"] = "refs";
}

ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
return response.body();
}

Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& identifier) {
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier, snapshot_mode_));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));

// In REFS mode, the server filters snapshots in the response to reduce payload size.
// Unlike the Java implementation, we do not perform implicit lazy-loading of full
// snapshots. We store only the returned (filtered) snapshots. Users requiring full
// snapshot history must explicitly call LoadTable again with SnapshotMode::ALL.
return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
shared_from_this());
Expand Down
7 changes: 5 additions & 2 deletions src/iceberg/catalog/rest/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <unordered_set>

#include "iceberg/catalog.h"
#include "iceberg/catalog/rest/catalog_properties.h"
#include "iceberg/catalog/rest/endpoint.h"
#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/type_fwd.h"
Expand Down Expand Up @@ -106,9 +107,10 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
private:
RestCatalog(std::unique_ptr<RestCatalogProperties> config,
std::shared_ptr<FileIO> file_io, std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints);
std::unordered_set<Endpoint> endpoints, SnapshotMode snapshot_mode);

Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;
Result<std::string> LoadTableInternal(const TableIdentifier& identifier,
SnapshotMode mode) const;

Result<LoadTableResult> CreateTableInternal(
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
Expand All @@ -122,6 +124,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
std::unique_ptr<ResourcePaths> paths_;
std::string name_;
std::unordered_set<Endpoint> supported_endpoints_;
SnapshotMode snapshot_mode_;
};

} // namespace iceberg::rest
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ services:
- CATALOG_WAREHOUSE=file:///tmp/iceberg_warehouse
ports:
- "8181:8181"
volumes:
- ${ICEBERG_TEST_DATA_DIR}:/tmp
141 changes: 140 additions & 1 deletion src/iceberg/test/rest_catalog_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

#include <unistd.h>

#include <algorithm>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <memory>
#include <print>
#include <string>
Expand Down Expand Up @@ -687,4 +688,142 @@ TEST_F(RestCatalogIntegrationTest, StageCreateTable) {
EXPECT_EQ(props.at("key1"), "value1");
}

TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotLoadingMode) {
auto catalog_result = CreateCatalog();
ASSERT_THAT(catalog_result, IsOk());
auto& catalog = catalog_result.value();

Namespace ns{.levels = {"test_snapshot_mode"}};
auto status = catalog->CreateNamespace(ns, {});
ASSERT_THAT(status, IsOk());

auto schema = CreateDefaultSchema();
auto partition_spec = PartitionSpec::Unpartitioned();
auto sort_order = SortOrder::Unsorted();

TableIdentifier table_id{.ns = ns, .name = "snapshot_mode_table"};
std::unordered_map<std::string, std::string> table_properties;
auto table_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order,
"", table_properties);
ASSERT_THAT(table_result, IsOk());
auto& table = table_result.value();

std::string original_metadata_location(table->metadata_file_location());
status = catalog->DropTable(table_id, /*purge=*/false);
ASSERT_THAT(status, IsOk());

// Create a fake metadata JSON with 2 snapshots:
// - Snapshot 1: not referenced by any branch/tag (will be filtered in REFS mode)
// - Snapshot 2: referenced by main branch (will be loaded in both modes)
auto& test_data_dir = docker_compose_->test_data_dir();
auto metadata_filename = std::format("00000-{}.metadata.json", getpid());
auto metadata_path = test_data_dir / metadata_filename;
auto container_metadata_path = std::format("/tmp/{}", metadata_filename);
std::string fake_metadata_json = std::format(R"({{
"format-version": 2,
"table-uuid": "12345678-1234-5678-1234-123456789abc",
"location": "file:/tmp/iceberg_warehouse/{}",
"last-sequence-number": 2,
"last-updated-ms": 1602638573590,
"last-column-id": 2,
"current-schema-id": 0,
"schemas": [{{"type": "struct", "schema-id": 0, "fields": [
{{"id": 1, "name": "id", "required": true, "type": "int"}},
{{"id": 2, "name": "data", "required": false, "type": "string"}}
]}}],
"default-spec-id": 0,
"partition-specs": [{{"spec-id": 0, "fields": []}}],
"last-partition-id": 1000,
"default-sort-order-id": 0,
"sort-orders": [{{"order-id": 0, "fields": []}}],
"properties": {{}},
"current-snapshot-id": 2,
"snapshots": [
{{
"snapshot-id": 1,
"timestamp-ms": 1515100955770,
"sequence-number": 1,
"summary": {{"operation": "append"}},
"manifest-list": "file:/tmp/iceberg_warehouse/{}/metadata/snap-1.avro"
}},
{{
"snapshot-id": 2,
"parent-snapshot-id": 1,
"timestamp-ms": 1525100955770,
"sequence-number": 2,
"summary": {{"operation": "append"}},
"manifest-list": "file:/tmp/iceberg_warehouse/{}/metadata/snap-2.avro"
}}
],
"snapshot-log": [
{{"snapshot-id": 1, "timestamp-ms": 1515100955770}},
{{"snapshot-id": 2, "timestamp-ms": 1525100955770}}
],
"metadata-log": [],
"refs": {{
"main": {{
"snapshot-id": 2,
"type": "branch"
}}
}}
}})",
ns.levels[0], ns.levels[0], ns.levels[0]);

// Write metadata file and register the table
std::ofstream metadata_file(metadata_path.string());
metadata_file << fake_metadata_json;
metadata_file.close();
auto register_metadata_location = std::format("file:{}", container_metadata_path);
auto register_result = catalog->RegisterTable(table_id, register_metadata_location);
ASSERT_THAT(register_result, IsOk());

// Test with ALL mode (default)
auto config_all = RestCatalogProperties::default_properties();
config_all
->Set(RestCatalogProperties::kUri,
std::format("{}:{}", kLocalhostUri, kRestCatalogPort))
.Set(RestCatalogProperties::kName, std::string(kCatalogName))
.Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName))
.Set(RestCatalogProperties::kSnapshotLoadingMode, std::string("ALL"));
auto catalog_all_result =
RestCatalog::Make(*config_all, std::make_shared<test::StdFileIO>());
ASSERT_THAT(catalog_all_result, IsOk());
auto& catalog_all = catalog_all_result.value();

// Load table with ALL mode and verify both snapshots are loaded
auto table_all_result = catalog_all->LoadTable(table_id);
ASSERT_THAT(table_all_result, IsOk());
auto& table_all = table_all_result.value();
EXPECT_EQ(table_all->metadata()->snapshots.size(), 2);

// Test with REFS mode
auto config_refs = RestCatalogProperties::default_properties();
config_refs
->Set(RestCatalogProperties::kUri,
std::format("{}:{}", kLocalhostUri, kRestCatalogPort))
.Set(RestCatalogProperties::kName, std::string(kCatalogName))
.Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName))
.Set(RestCatalogProperties::kSnapshotLoadingMode, std::string("REFS"));
auto catalog_refs_result =
RestCatalog::Make(*config_refs, std::make_shared<test::StdFileIO>());
ASSERT_THAT(catalog_refs_result, IsOk());
auto& catalog_refs = catalog_refs_result.value();

// Load table with REFS mode and verify only referenced snapshot is loaded
auto table_refs_result = catalog_refs->LoadTable(table_id);
ASSERT_THAT(table_refs_result, IsOk());
auto& table_refs = table_refs_result.value();
EXPECT_EQ(table_refs->metadata()->snapshots.size(), 1);
EXPECT_EQ(table_refs->metadata()->snapshots[0]->snapshot_id, 2);

// Verify refs are preserved in both modes
EXPECT_EQ(table_all->metadata()->refs.size(), 1);
EXPECT_EQ(table_refs->metadata()->refs.size(), 1);
EXPECT_TRUE(table_all->metadata()->refs.contains("main"));
EXPECT_TRUE(table_refs->metadata()->refs.contains("main"));

// Clean up metadata file
std::filesystem::remove(metadata_path);
}

} // namespace iceberg::rest
37 changes: 35 additions & 2 deletions src/iceberg/test/util/docker_compose_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,38 @@

#include "iceberg/test/util/docker_compose_util.h"

#include <unistd.h>

#include <cctype>
#include <chrono>
#include <format>
#include <print>

#include <sys/stat.h>

#include "iceberg/test/util/cmd_util.h"

namespace iceberg {

namespace {
/// \brief Generate a unique test data directory path
std::filesystem::path GenerateTestDataDir() {
auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
auto pid = getpid();
return {std::format("/tmp/iceberg-test-{}-{}", timestamp, pid)};
}
} // namespace

DockerCompose::DockerCompose(std::string project_name,
std::filesystem::path docker_compose_dir)
: project_name_(std::move(project_name)),
docker_compose_dir_(std::move(docker_compose_dir)) {}
docker_compose_dir_(std::move(docker_compose_dir)),
test_data_dir_(GenerateTestDataDir()) {
std::filesystem::create_directories(test_data_dir_);
chmod(test_data_dir_.c_str(), 0777);
}

DockerCompose::~DockerCompose() { Down(); }

Expand All @@ -39,13 +61,24 @@ void DockerCompose::Up() {

void DockerCompose::Down() {
auto cmd = BuildDockerCommand({"down", "-v", "--remove-orphans"});
return cmd.RunCommand("docker compose down");
cmd.RunCommand("docker compose down");

// Clean up the test data directory
if (!test_data_dir_.empty() && std::filesystem::exists(test_data_dir_)) {
std::error_code ec;
std::filesystem::remove_all(test_data_dir_, ec);
if (!ec) {
std::println("[INFO] Cleaned up test data directory: {}", test_data_dir_.string());
}
}
}

Command DockerCompose::BuildDockerCommand(const std::vector<std::string>& args) const {
Command cmd("docker");
// Set working directory
cmd.CurrentDir(docker_compose_dir_);
// Set the test data directory environment variable
cmd.Env("ICEBERG_TEST_DATA_DIR", test_data_dir_.string());
// Use 'docker compose' subcommand with project name
cmd.Arg("compose").Arg("-p").Arg(project_name_).Args(args);
return cmd;
Expand Down
Loading
Loading