diff --git a/src/iceberg/catalog/rest/catalog_properties.cc b/src/iceberg/catalog/rest/catalog_properties.cc index 4d956837c..5287e57fa 100644 --- a/src/iceberg/catalog/rest/catalog_properties.cc +++ b/src/iceberg/catalog/rest/catalog_properties.cc @@ -48,4 +48,17 @@ Result RestCatalogProperties::Uri() const { return it->second; } +Result RestCatalogProperties::SnapshotLoadingMode() const { + auto mode_str = Get(kSnapshotLoadingMode); + + if (mode_str == "ALL") { + return SnapshotMode::ALL; + } else if (mode_str == "REFS") { + return SnapshotMode::REFS; + } else { + return InvalidArgument( + "Invalid snapshot loading mode: '{}'. Expected 'ALL' or 'REFS'.", mode_str); + } +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/catalog_properties.h b/src/iceberg/catalog/rest/catalog_properties.h index d351b50fc..542f1b5b2 100644 --- a/src/iceberg/catalog/rest/catalog_properties.h +++ b/src/iceberg/catalog/rest/catalog_properties.h @@ -32,6 +32,14 @@ namespace iceberg::rest { +/// \brief Snapshot loading mode for REST catalog. +enum class SnapshotMode { + /// Load all snapshots from the table metadata. + ALL, + /// Load only snapshots referenced in snapshot refs (branches/tags). + REFS +}; + /// \brief Configuration class for a REST Catalog. class ICEBERG_REST_EXPORT RestCatalogProperties : public ConfigBase { @@ -47,6 +55,8 @@ class ICEBERG_REST_EXPORT RestCatalogProperties inline static Entry kWarehouse{"warehouse", ""}; /// \brief The optional prefix for REST API paths. inline static Entry kPrefix{"prefix", ""}; + /// \brief The snapshot loading mode (ALL or REFS). + inline static Entry kSnapshotLoadingMode{"snapshot-loading-mode", "ALL"}; /// \brief The prefix for HTTP headers. inline static constexpr std::string_view kHeaderPrefix = "header."; @@ -64,6 +74,11 @@ class ICEBERG_REST_EXPORT RestCatalogProperties /// \return The URI if configured, or an error if not set or empty. Result Uri() const; + /// \brief Get the snapshot loading mode. + /// \return SnapshotMode::ALL if configured as "ALL", SnapshotMode::REFS if "REFS", + /// or an error if the value is invalid. + Result SnapshotLoadingMode() const; + private: RestCatalogProperties() = default; }; diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index cc052e241..c973f461c 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -139,21 +139,26 @@ Result> RestCatalog::Make( paths, ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)), final_config->Get(RestCatalogProperties::kPrefix))); + // Get snapshot loading mode + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config->SnapshotLoadingMode()); + return std::shared_ptr( new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths), - std::move(endpoints))); + std::move(endpoints), snapshot_mode)); } RestCatalog::RestCatalog(std::unique_ptr config, std::shared_ptr file_io, std::unique_ptr paths, - std::unordered_set endpoints) + std::unordered_set endpoints, + SnapshotMode snapshot_mode) : config_(std::move(config)), file_io_(std::move(file_io)), client_(std::make_unique(config_->ExtractHeaders())), paths_(std::move(paths)), name_(config_->Get(RestCatalogProperties::kName)), - supported_endpoints_(std::move(endpoints)) {} + supported_endpoints_(std::move(endpoints)), + snapshot_mode_(snapshot_mode) {} std::string_view RestCatalog::name() const { return name_; } @@ -376,8 +381,8 @@ Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) { Result RestCatalog::TableExists(const TableIdentifier& identifier) const { if (!supported_endpoints_.contains(Endpoint::TableExists())) { - // Fall back to call LoadTable - return CaptureNoSuchTable(LoadTableInternal(identifier)); + // Fall back to call LoadTable with ALL mode (just checking existence) + return CaptureNoSuchTable(LoadTableInternal(identifier, SnapshotMode::ALL)); } ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); @@ -398,21 +403,31 @@ Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifi return {}; } -Result RestCatalog::LoadTableInternal( - const TableIdentifier& identifier) const { +Result RestCatalog::LoadTableInternal(const TableIdentifier& identifier, + SnapshotMode mode) const { ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); + + std::unordered_map params; + if (mode == SnapshotMode::REFS) { + params["snapshots"] = "refs"; + } + ICEBERG_ASSIGN_OR_RAISE( const auto response, - client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance())); + client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance())); return response.body(); } Result> RestCatalog::LoadTable(const TableIdentifier& identifier) { - ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier)); + ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier, snapshot_mode_)); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body)); ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); + // In REFS mode, the server filters snapshots in the response to reduce payload size. + // Unlike the Java implementation, we do not perform implicit lazy-loading of full + // snapshots. We store only the returned (filtered) snapshots. Users requiring full + // snapshot history must explicitly call LoadTable again with SnapshotMode::ALL. return Table::Make(identifier, std::move(load_result.metadata), std::move(load_result.metadata_location), file_io_, shared_from_this()); diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 721df29d8..ad5f09798 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -24,6 +24,7 @@ #include #include "iceberg/catalog.h" +#include "iceberg/catalog/rest/catalog_properties.h" #include "iceberg/catalog/rest/endpoint.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/catalog/rest/type_fwd.h" @@ -106,9 +107,10 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, private: RestCatalog(std::unique_ptr config, std::shared_ptr file_io, std::unique_ptr paths, - std::unordered_set endpoints); + std::unordered_set endpoints, SnapshotMode snapshot_mode); - Result LoadTableInternal(const TableIdentifier& identifier) const; + Result LoadTableInternal(const TableIdentifier& identifier, + SnapshotMode mode) const; Result CreateTableInternal( const TableIdentifier& identifier, const std::shared_ptr& schema, @@ -122,6 +124,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, std::unique_ptr paths_; std::string name_; std::unordered_set supported_endpoints_; + SnapshotMode snapshot_mode_; }; } // namespace iceberg::rest diff --git a/src/iceberg/test/resources/iceberg-rest-fixture/docker-compose.yml b/src/iceberg/test/resources/iceberg-rest-fixture/docker-compose.yml index 0a5c37ecb..3ae6a6da6 100644 --- a/src/iceberg/test/resources/iceberg-rest-fixture/docker-compose.yml +++ b/src/iceberg/test/resources/iceberg-rest-fixture/docker-compose.yml @@ -24,3 +24,5 @@ services: - CATALOG_WAREHOUSE=file:///tmp/iceberg_warehouse ports: - "8181:8181" + volumes: + - ${ICEBERG_TEST_DATA_DIR}:/tmp diff --git a/src/iceberg/test/rest_catalog_test.cc b/src/iceberg/test/rest_catalog_test.cc index 20560979b..c7b5a388b 100644 --- a/src/iceberg/test/rest_catalog_test.cc +++ b/src/iceberg/test/rest_catalog_test.cc @@ -21,8 +21,9 @@ #include -#include #include +#include +#include #include #include #include @@ -687,4 +688,142 @@ TEST_F(RestCatalogIntegrationTest, StageCreateTable) { EXPECT_EQ(props.at("key1"), "value1"); } +TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotLoadingMode) { + auto catalog_result = CreateCatalog(); + ASSERT_THAT(catalog_result, IsOk()); + auto& catalog = catalog_result.value(); + + Namespace ns{.levels = {"test_snapshot_mode"}}; + auto status = catalog->CreateNamespace(ns, {}); + ASSERT_THAT(status, IsOk()); + + auto schema = CreateDefaultSchema(); + auto partition_spec = PartitionSpec::Unpartitioned(); + auto sort_order = SortOrder::Unsorted(); + + TableIdentifier table_id{.ns = ns, .name = "snapshot_mode_table"}; + std::unordered_map table_properties; + auto table_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, + "", table_properties); + ASSERT_THAT(table_result, IsOk()); + auto& table = table_result.value(); + + std::string original_metadata_location(table->metadata_file_location()); + status = catalog->DropTable(table_id, /*purge=*/false); + ASSERT_THAT(status, IsOk()); + + // Create a fake metadata JSON with 2 snapshots: + // - Snapshot 1: not referenced by any branch/tag (will be filtered in REFS mode) + // - Snapshot 2: referenced by main branch (will be loaded in both modes) + auto& test_data_dir = docker_compose_->test_data_dir(); + auto metadata_filename = std::format("00000-{}.metadata.json", getpid()); + auto metadata_path = test_data_dir / metadata_filename; + auto container_metadata_path = std::format("/tmp/{}", metadata_filename); + std::string fake_metadata_json = std::format(R"({{ + "format-version": 2, + "table-uuid": "12345678-1234-5678-1234-123456789abc", + "location": "file:/tmp/iceberg_warehouse/{}", + "last-sequence-number": 2, + "last-updated-ms": 1602638573590, + "last-column-id": 2, + "current-schema-id": 0, + "schemas": [{{"type": "struct", "schema-id": 0, "fields": [ + {{"id": 1, "name": "id", "required": true, "type": "int"}}, + {{"id": 2, "name": "data", "required": false, "type": "string"}} + ]}}], + "default-spec-id": 0, + "partition-specs": [{{"spec-id": 0, "fields": []}}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{{"order-id": 0, "fields": []}}], + "properties": {{}}, + "current-snapshot-id": 2, + "snapshots": [ + {{ + "snapshot-id": 1, + "timestamp-ms": 1515100955770, + "sequence-number": 1, + "summary": {{"operation": "append"}}, + "manifest-list": "file:/tmp/iceberg_warehouse/{}/metadata/snap-1.avro" + }}, + {{ + "snapshot-id": 2, + "parent-snapshot-id": 1, + "timestamp-ms": 1525100955770, + "sequence-number": 2, + "summary": {{"operation": "append"}}, + "manifest-list": "file:/tmp/iceberg_warehouse/{}/metadata/snap-2.avro" + }} + ], + "snapshot-log": [ + {{"snapshot-id": 1, "timestamp-ms": 1515100955770}}, + {{"snapshot-id": 2, "timestamp-ms": 1525100955770}} + ], + "metadata-log": [], + "refs": {{ + "main": {{ + "snapshot-id": 2, + "type": "branch" + }} + }} +}})", + ns.levels[0], ns.levels[0], ns.levels[0]); + + // Write metadata file and register the table + std::ofstream metadata_file(metadata_path.string()); + metadata_file << fake_metadata_json; + metadata_file.close(); + auto register_metadata_location = std::format("file:{}", container_metadata_path); + auto register_result = catalog->RegisterTable(table_id, register_metadata_location); + ASSERT_THAT(register_result, IsOk()); + + // Test with ALL mode (default) + auto config_all = RestCatalogProperties::default_properties(); + config_all + ->Set(RestCatalogProperties::kUri, + std::format("{}:{}", kLocalhostUri, kRestCatalogPort)) + .Set(RestCatalogProperties::kName, std::string(kCatalogName)) + .Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName)) + .Set(RestCatalogProperties::kSnapshotLoadingMode, std::string("ALL")); + auto catalog_all_result = + RestCatalog::Make(*config_all, std::make_shared()); + ASSERT_THAT(catalog_all_result, IsOk()); + auto& catalog_all = catalog_all_result.value(); + + // Load table with ALL mode and verify both snapshots are loaded + auto table_all_result = catalog_all->LoadTable(table_id); + ASSERT_THAT(table_all_result, IsOk()); + auto& table_all = table_all_result.value(); + EXPECT_EQ(table_all->metadata()->snapshots.size(), 2); + + // Test with REFS mode + auto config_refs = RestCatalogProperties::default_properties(); + config_refs + ->Set(RestCatalogProperties::kUri, + std::format("{}:{}", kLocalhostUri, kRestCatalogPort)) + .Set(RestCatalogProperties::kName, std::string(kCatalogName)) + .Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName)) + .Set(RestCatalogProperties::kSnapshotLoadingMode, std::string("REFS")); + auto catalog_refs_result = + RestCatalog::Make(*config_refs, std::make_shared()); + ASSERT_THAT(catalog_refs_result, IsOk()); + auto& catalog_refs = catalog_refs_result.value(); + + // Load table with REFS mode and verify only referenced snapshot is loaded + auto table_refs_result = catalog_refs->LoadTable(table_id); + ASSERT_THAT(table_refs_result, IsOk()); + auto& table_refs = table_refs_result.value(); + EXPECT_EQ(table_refs->metadata()->snapshots.size(), 1); + EXPECT_EQ(table_refs->metadata()->snapshots[0]->snapshot_id, 2); + + // Verify refs are preserved in both modes + EXPECT_EQ(table_all->metadata()->refs.size(), 1); + EXPECT_EQ(table_refs->metadata()->refs.size(), 1); + EXPECT_TRUE(table_all->metadata()->refs.contains("main")); + EXPECT_TRUE(table_refs->metadata()->refs.contains("main")); + + // Clean up metadata file + std::filesystem::remove(metadata_path); +} + } // namespace iceberg::rest diff --git a/src/iceberg/test/util/docker_compose_util.cc b/src/iceberg/test/util/docker_compose_util.cc index da26da760..f14b1591b 100644 --- a/src/iceberg/test/util/docker_compose_util.cc +++ b/src/iceberg/test/util/docker_compose_util.cc @@ -19,16 +19,38 @@ #include "iceberg/test/util/docker_compose_util.h" +#include + #include +#include +#include +#include + +#include #include "iceberg/test/util/cmd_util.h" namespace iceberg { +namespace { +/// \brief Generate a unique test data directory path +std::filesystem::path GenerateTestDataDir() { + auto timestamp = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + auto pid = getpid(); + return {std::format("/tmp/iceberg-test-{}-{}", timestamp, pid)}; +} +} // namespace + DockerCompose::DockerCompose(std::string project_name, std::filesystem::path docker_compose_dir) : project_name_(std::move(project_name)), - docker_compose_dir_(std::move(docker_compose_dir)) {} + docker_compose_dir_(std::move(docker_compose_dir)), + test_data_dir_(GenerateTestDataDir()) { + std::filesystem::create_directories(test_data_dir_); + chmod(test_data_dir_.c_str(), 0777); +} DockerCompose::~DockerCompose() { Down(); } @@ -39,13 +61,24 @@ void DockerCompose::Up() { void DockerCompose::Down() { auto cmd = BuildDockerCommand({"down", "-v", "--remove-orphans"}); - return cmd.RunCommand("docker compose down"); + cmd.RunCommand("docker compose down"); + + // Clean up the test data directory + if (!test_data_dir_.empty() && std::filesystem::exists(test_data_dir_)) { + std::error_code ec; + std::filesystem::remove_all(test_data_dir_, ec); + if (!ec) { + std::println("[INFO] Cleaned up test data directory: {}", test_data_dir_.string()); + } + } } Command DockerCompose::BuildDockerCommand(const std::vector& args) const { Command cmd("docker"); // Set working directory cmd.CurrentDir(docker_compose_dir_); + // Set the test data directory environment variable + cmd.Env("ICEBERG_TEST_DATA_DIR", test_data_dir_.string()); // Use 'docker compose' subcommand with project name cmd.Arg("compose").Arg("-p").Arg(project_name_).Args(args); return cmd; diff --git a/src/iceberg/test/util/docker_compose_util.h b/src/iceberg/test/util/docker_compose_util.h index 63928eb8c..91870a074 100644 --- a/src/iceberg/test/util/docker_compose_util.h +++ b/src/iceberg/test/util/docker_compose_util.h @@ -46,6 +46,10 @@ class DockerCompose { /// \brief Get the docker project name. const std::string& project_name() const { return project_name_; } + /// \brief Get the test data directory path. + /// This directory is mounted into the container and is unique per test run. + const std::filesystem::path& test_data_dir() const { return test_data_dir_; } + /// \brief Executes 'docker-compose up' to start services. /// \note May throw an exception if the services fail to start. void Up(); @@ -57,6 +61,7 @@ class DockerCompose { private: std::string project_name_; std::filesystem::path docker_compose_dir_; + std::filesystem::path test_data_dir_; /// \brief Build a docker compose Command with proper environment. /// \param args Additional command line arguments.