Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions src/iceberg/catalog/rest/auth/auth_managers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

#include <algorithm>
#include <cctype>
#include <unordered_set>

#include "iceberg/catalog/rest/auth/auth_properties.h"
#include "iceberg/catalog/rest/auth/auth_session.h"
#include "iceberg/util/string_util.h"

namespace iceberg::rest::auth {
Expand All @@ -33,6 +35,17 @@ namespace {
using AuthManagerRegistry =
std::unordered_map<std::string, AuthManagerFactory, StringHash, StringEqual>;

/// \brief Known authentication types that are defined in the Iceberg spec.
const std::unordered_set<std::string, StringHash, StringEqual>& KnownAuthTypes() {
static const std::unordered_set<std::string, StringHash, StringEqual> types = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static const std::unordered_set<std::string, StringHash, StringEqual> types = {
static const std::unordered_set<std::string, StringHash, StringEqual> kAuthTypes = {

AuthProperties::kAuthTypeNone,
AuthProperties::kAuthTypeBasic,
AuthProperties::kAuthTypeOAuth2,
AuthProperties::kAuthTypeSigV4,
};
return types;
}

// Infer the authentication type from properties.
std::string InferAuthType(
const std::unordered_map<std::string, std::string>& properties) {
Expand All @@ -51,9 +64,29 @@ std::string InferAuthType(
return AuthProperties::kAuthTypeNone;
}

/// \brief Authentication manager that performs no authentication.
class NoopAuthManager : public AuthManager {
public:
Result<std::shared_ptr<AuthSession>> CatalogSession(
[[maybe_unused]] HttpClient& shared_client,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[[maybe_unused]] HttpClient& shared_client,
[[maybe_unused]] HttpClient& client,

Let's rename it to remove shared_ prefix as it is misleading.

[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties)
override {
return AuthSession::MakeDefault({});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check auth type from properties and return error for unimplemented auth types instead of blindly returning the noop impl?

}
};

// Get the global registry of auth manager factories.
AuthManagerRegistry& GetRegistry() {
static AuthManagerRegistry registry;
static AuthManagerRegistry registry = [] {
AuthManagerRegistry r;
r[AuthProperties::kAuthTypeNone] =
[]([[maybe_unused]] std::string_view name,
[[maybe_unused]] const std::unordered_map<std::string, std::string>& props)
-> Result<std::unique_ptr<AuthManager>> {
return std::make_unique<NoopAuthManager>();
};
return r;
}();
return registry;
}

Expand All @@ -71,8 +104,10 @@ Result<std::unique_ptr<AuthManager>> AuthManagers::Load(
auto& registry = GetRegistry();
auto it = registry.find(auth_type);
if (it == registry.end()) {
// TODO(Li Shuxu): Fallback to default auth manager implementations
return NotImplemented("Authentication type '{}' is not supported", auth_type);
if (KnownAuthTypes().contains(auth_type)) {
return NotImplemented("Authentication type '{}' is not yet supported", auth_type);
}
return InvalidArgument("Unknown authentication type: '{}'", auth_type);
}

return it->second(name, properties);
Expand Down
108 changes: 82 additions & 26 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

#include <nlohmann/json.hpp>

#include "iceberg/catalog/rest/auth/auth_manager.h"
#include "iceberg/catalog/rest/auth/auth_managers.h"
#include "iceberg/catalog/rest/auth/auth_session.h"
#include "iceberg/catalog/rest/catalog_properties.h"
#include "iceberg/catalog/rest/constant.h"
#include "iceberg/catalog/rest/endpoint.h"
Expand Down Expand Up @@ -65,13 +68,19 @@ std::unordered_set<Endpoint> GetDefaultEndpoints() {
};
}

/// \brief Fetch server config and merge it with client config
Result<CatalogConfig> FetchServerConfig(const ResourcePaths& paths,
const RestCatalogProperties& current_config) {
/// \brief Fetch server configuration from the REST catalog server.
Result<CatalogConfig> FetchServerConfig(
const ResourcePaths& paths, const RestCatalogProperties& current_config,
const std::shared_ptr<auth::AuthSession>& session) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const std::shared_ptr<auth::AuthSession>& session) {
const auth::AuthSession& session) {

If the session cannot be null and it is read-only, let's use its reference directly.

ICEBERG_ASSIGN_OR_RAISE(auto config_path, paths.Config());
HttpClient client(current_config.ExtractHeaders());

// Get authentication headers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Get authentication headers

We don't need such comment

std::unordered_map<std::string, std::string> auth_headers;
ICEBERG_RETURN_UNEXPECTED(session->Authenticate(auth_headers));

ICEBERG_ASSIGN_OR_RAISE(const auto response,
client.Get(config_path, /*params=*/{}, /*headers=*/{},
client.Get(config_path, /*params=*/{}, auth_headers,
*DefaultErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
return CatalogConfigFromJson(json);
Expand Down Expand Up @@ -114,10 +123,21 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
if (!file_io) {
return InvalidArgument("FileIO is required to create RestCatalog");
}

std::string catalog_name = config.Get(RestCatalogProperties::kName);
ICEBERG_ASSIGN_OR_RAISE(auto auth_manager,
auth::AuthManagers::Load(catalog_name, config.configs()));

ICEBERG_ASSIGN_OR_RAISE(
auto paths, ResourcePaths::Make(std::string(TrimTrailingSlash(uri)),
config.Get(RestCatalogProperties::kPrefix)));
ICEBERG_ASSIGN_OR_RAISE(auto server_config, FetchServerConfig(*paths, config));

// Create init session for fetching server configuration
HttpClient init_client(config.ExtractHeaders());
ICEBERG_ASSIGN_OR_RAISE(auto init_session,
auth_manager->InitSession(init_client, config.configs()));
ICEBERG_ASSIGN_OR_RAISE(auto server_config,
FetchServerConfig(*paths, config, init_session));

std::unique_ptr<RestCatalogProperties> final_config = RestCatalogProperties::FromMap(
MergeConfigs(server_config.defaults, config.configs(), server_config.overrides));
Expand All @@ -139,27 +159,43 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
paths, ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)),
final_config->Get(RestCatalogProperties::kPrefix)));

return std::shared_ptr<RestCatalog>(
new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths),
std::move(endpoints)));
auto client = std::make_unique<HttpClient>(final_config->ExtractHeaders());
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
auth_manager->CatalogSession(*client, final_config->configs()));
return std::shared_ptr<RestCatalog>(new RestCatalog(
std::move(final_config), std::move(file_io), std::move(client), std::move(paths),
std::move(endpoints), std::move(auth_manager), std::move(catalog_session)));
}

RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
std::shared_ptr<FileIO> file_io,
std::unique_ptr<HttpClient> client,
std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints)
std::unordered_set<Endpoint> endpoints,
std::unique_ptr<auth::AuthManager> auth_manager,
std::shared_ptr<auth::AuthSession> catalog_session)
: config_(std::move(config)),
file_io_(std::move(file_io)),
client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
client_(std::move(client)),
paths_(std::move(paths)),
name_(config_->Get(RestCatalogProperties::kName)),
supported_endpoints_(std::move(endpoints)) {}
supported_endpoints_(std::move(endpoints)),
auth_manager_(std::move(auth_manager)),
catalog_session_(std::move(catalog_session)) {}

std::string_view RestCatalog::name() const { return name_; }

Result<std::unordered_map<std::string, std::string>> RestCatalog::AuthHeaders() const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to add AuthSession* session as an optional input parameter to HttpClient::Get (and its friends)? Auth headers should be handled internally in the client instead of scattering them around here.

std::unordered_map<std::string, std::string> headers;
ICEBERG_RETURN_UNEXPECTED(catalog_session_->Authenticate(headers));
return headers;
}

Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListNamespaces());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

std::vector<Namespace> result;
std::string next_token;
while (true) {
Expand All @@ -172,7 +208,7 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, params, /*headers=*/{}, *NamespaceErrorHandler::Instance()));
client_->Get(path, params, auth_headers, *NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListNamespacesResponseFromJson(json));
result.insert(result.end(), list_response.namespaces.begin(),
Expand All @@ -189,10 +225,12 @@ Status RestCatalog::CreateNamespace(
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateNamespace());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

CreateNamespaceRequest request{.namespace_ = ns, .properties = properties};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Post(path, json_request, /*headers=*/{},
client_->Post(path, json_request, auth_headers,
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto create_response, CreateNamespaceResponseFromJson(json));
Expand All @@ -203,8 +241,10 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
const Namespace& ns) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::GetNamespaceProperties());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Get(path, /*params=*/{}, /*headers=*/{},
client_->Get(path, /*params=*/{}, auth_headers,
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto get_response, GetNamespaceResponseFromJson(json));
Expand All @@ -214,8 +254,10 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
Status RestCatalog::DropNamespace(const Namespace& ns) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DropNamespace());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Delete(path, /*params=*/{}, /*headers=*/{},
client_->Delete(path, /*params=*/{}, auth_headers,
*DropNamespaceErrorHandler::Instance()));
return {};
}
Expand All @@ -227,21 +269,25 @@ Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
}

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

return CaptureNoSuchNamespace(
client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance()));
client_->Head(path, auth_headers, *NamespaceErrorHandler::Instance()));
}

Status RestCatalog::UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateNamespace());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->NamespaceProperties(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

UpdateNamespacePropertiesRequest request{
.removals = std::vector<std::string>(removals.begin(), removals.end()),
.updates = updates};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Post(path, json_request, /*headers=*/{},
client_->Post(path, json_request, auth_headers,
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto update_response,
Expand All @@ -251,8 +297,9 @@ Status RestCatalog::UpdateNamespaceProperties(

Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListTables());

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

std::vector<TableIdentifier> result;
std::string next_token;
while (true) {
Expand All @@ -262,7 +309,7 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Get(path, params, auth_headers, *TableErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListTablesResponseFromJson(json));
result.insert(result.end(), list_response.identifiers.begin(),
Expand All @@ -282,6 +329,7 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
const std::unordered_map<std::string, std::string>& properties, bool stage_create) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

CreateTableRequest request{
.name = identifier.name,
Expand All @@ -296,7 +344,7 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
return LoadTableResultFromJson(json);
Expand All @@ -320,6 +368,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

CommitTableRequest request{.identifier = identifier};
request.requirements.reserve(requirements.size());
Expand All @@ -334,7 +383,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto commit_response, CommitTableResponseFromJson(json));
Expand Down Expand Up @@ -363,14 +412,15 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DeleteTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

std::unordered_map<std::string, std::string> params;
if (purge) {
params["purgeRequested"] = "true";
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Delete(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Delete(path, params, auth_headers, *TableErrorHandler::Instance()));
return {};
}

Expand All @@ -381,19 +431,22 @@ Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
}

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

return CaptureNoSuchTable(
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Head(path, auth_headers, *TableErrorHandler::Instance()));
}

Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifier& to) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RenameTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Rename());
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

RenameTableRequest request{.source = from, .destination = to};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));

return {};
}
Expand All @@ -402,9 +455,11 @@ Result<std::string> RestCatalog::LoadTableInternal(
const TableIdentifier& identifier) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Get(path, /*params=*/{}, auth_headers, *TableErrorHandler::Instance()));
return response.body();
}

Expand All @@ -422,6 +477,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RegisterTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Register(identifier.ns));
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());

RegisterTableRequest request{
.name = identifier.name,
Expand All @@ -431,7 +487,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
Expand Down
Loading
Loading