Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/tl/constants/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ inline constexpr uint32_t TL_INT_KEY_DICTIONARY_FIELD = 0x721ea8b9U;
inline constexpr uint32_t TL_LEFT = 0x0a29cd5dU;
inline constexpr uint32_t TL_LONG = 0x22076cbaU;
inline constexpr uint32_t TL_LONG_KEY_DICTIONARY = 0xb424d8f1U;
inline constexpr uint32_t TL_REQ_ERROR = 0xb527877dU;
inline constexpr uint32_t TL_REQ_RESULT_HEADER = 0x8cc84ce1U;
inline constexpr uint32_t TL_MAYBE_FALSE = 0x27930a7bU;
inline constexpr uint32_t TL_MAYBE_TRUE = 0x3f9c8ef8U;
Expand Down
32 changes: 32 additions & 0 deletions runtime-light/stdlib/rpc/rpc-exceptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,36 @@ inline auto make(std::string_view func_name, std::source_location loc = std::sou

} // namespace cant_store_function

namespace cant_fetch_error {

namespace details {

inline constexpr int64_t CODE = -5;
inline constexpr std::string_view DESCRIPTION = "can't fetch error";

} // namespace details

inline auto make(std::source_location loc = std::source_location::current()) noexcept {
return kphp::exception::make_throwable<C$Exception>(string{loc.file_name()}, loc.line(), details::CODE,
string{details::DESCRIPTION.data(), details::DESCRIPTION.size()});
}

} // namespace cant_fetch_error

namespace cant_fetch_header {

namespace details {

inline constexpr int64_t CODE = -6;
inline constexpr std::string_view DESCRIPTION = "can't fetch header";

} // namespace details

inline auto make(std::source_location loc = std::source_location::current()) noexcept {
return kphp::exception::make_throwable<C$Exception>(string{loc.file_name()}, loc.line(), details::CODE,
string{details::DESCRIPTION.data(), details::DESCRIPTION.size()});
}

} // namespace cant_fetch_header

} // namespace kphp::rpc::exception
79 changes: 25 additions & 54 deletions runtime-light/stdlib/rpc/rpc-tl-error.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,39 @@

#include "runtime-light/stdlib/rpc/rpc-tl-error.h"

#include <tuple>

#include "common/tl/constants/common.h"
#include "runtime-light/server/rpc/rpc-server-state.h"
#include "runtime-light/stdlib/diagnostics/exception-functions.h"
#include "runtime-light/stdlib/rpc/rpc-api.h"
#include "runtime-light/stdlib/rpc/rpc-tl-builtins.h"
#include "runtime-light/stdlib/rpc/rpc-exceptions.h"
#include "runtime-light/tl/tl-core.h"
#include "runtime-light/tl/tl-types.h"

bool TlRpcError::try_fetch() noexcept {
const auto backup_pos{tl_parse_save_pos()};
auto op{TRY_CALL(decltype(f$fetch_int()), bool, f$fetch_int())};
if (op == TL_REQ_RESULT_HEADER) {
fetch_and_skip_header();
op = TRY_CALL(decltype(f$fetch_int()), bool, f$fetch_int());
}
if (op != TL_RPC_REQ_ERROR) {
tl_parse_restore_pos(backup_pos);
auto& rpc_server_instance_state_fetcher{RpcServerInstanceState::get().tl_fetcher};
auto fetcher{rpc_server_instance_state_fetcher};
tl::magic magic{};
if (!magic.fetch(fetcher)) [[unlikely]] {
THROW_EXCEPTION(kphp::rpc::exception::not_enough_data_to_fetch::make());
return false;
}

std::ignore = TRY_CALL(decltype(f$fetch_long()), bool, f$fetch_long());
error_code = static_cast<int32_t>(TRY_CALL(decltype(f$fetch_int()), bool, f$fetch_int()));
error_msg = TRY_CALL(decltype(f$fetch_string()), bool, f$fetch_string());
return true;
}

void TlRpcError::fetch_and_skip_header() const noexcept {
const auto flags{static_cast<int32_t>(TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int()))};

if (flags & vk::tl::common::rpc_req_result_extra_flags::binlog_pos) {
std::ignore = TRY_CALL(decltype(f$fetch_long()), void, f$fetch_long());
}
if (flags & vk::tl::common::rpc_req_result_extra_flags::binlog_time) {
std::ignore = TRY_CALL(decltype(f$fetch_long()), void, f$fetch_long());
}
if (flags & vk::tl::common::rpc_req_result_extra_flags::engine_pid) {
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
}
if (flags & vk::tl::common::rpc_req_result_extra_flags::request_size) {
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
}
if (flags & vk::tl::common::rpc_req_result_extra_flags::response_size) {
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
}
if (flags & vk::tl::common::rpc_req_result_extra_flags::failed_subqueries) {
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
}
if (flags & vk::tl::common::rpc_req_result_extra_flags::compression_version) {
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
}
if (flags & vk::tl::common::rpc_req_result_extra_flags::stats) {
const auto size{TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int())};
for (auto i = 0; i < size; ++i) {
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
if (magic.expect(TL_REQ_RESULT_HEADER)) {
tl::reqResultHeader req_result_header{};
if (!req_result_header.fetch(fetcher)) [[unlikely]] {
THROW_EXCEPTION(kphp::rpc::exception::cant_fetch_header::make());
return false;
}
fetcher = tl::fetcher{req_result_header.result};
}
if (flags & vk::tl::common::rpc_req_result_extra_flags::epoch_number) {
std::ignore = TRY_CALL(decltype(f$fetch_long()), void, f$fetch_long());
if (!magic.expect(TL_RPC_REQ_ERROR)) {
return false;
}
if (flags & vk::tl::common::rpc_req_result_extra_flags::view_number) {
std::ignore = TRY_CALL(decltype(f$fetch_long()), void, f$fetch_long());
tl::rpcReqError rpc_req_error{};
if (!rpc_req_error.fetch(fetcher)) [[unlikely]] {
THROW_EXCEPTION(kphp::rpc::exception::cant_fetch_error::make());
return false;
}
error_code = rpc_req_error.error_code.value;
error_msg = {rpc_req_error.error.value.data(), static_cast<string::size_type>(rpc_req_error.error.value.size())};
rpc_server_instance_state_fetcher = std::move(fetcher);
return true;
}
3 changes: 0 additions & 3 deletions runtime-light/stdlib/rpc/rpc-tl-error.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ struct TlRpcError {
}

bool try_fetch() noexcept;

private:
void fetch_and_skip_header() const noexcept;
};

class RpcErrorFactory {
Expand Down
87 changes: 65 additions & 22 deletions runtime-light/tl/tl-types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,62 +218,105 @@ tl::mask rpcInvokeReqExtra::get_flags() const noexcept {
return flags;
}

bool rpcReqResultExtra::fetch(tl::fetcher& tlf, const tl::mask& flags) noexcept {
bool ok{true};
if (ok && static_cast<bool>(flags.value & BINLOG_POS_FLAG)) {
ok = opt_binlog_pos.emplace().fetch(tlf);
}
if (ok && static_cast<bool>(flags.value & BINLOG_TIME_FLAG)) {
ok = opt_binlog_time.emplace().fetch(tlf);
}
if (ok && static_cast<bool>(flags.value & ENGINE_PID_FLAG)) {
ok = opt_engine_pid.emplace().fetch(tlf);
}
if (ok && static_cast<bool>(flags.value & REQUEST_SIZE_FLAG)) {
kphp::log::assertion(static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG));
ok = opt_request_size.emplace().fetch(tlf) && opt_response_size.emplace().fetch(tlf);
}
if (ok && static_cast<bool>(flags.value & FAILED_SUBQUERIES_FLAG)) {
ok = opt_failed_subqueries.emplace().fetch(tlf);
}
if (ok && static_cast<bool>(flags.value & COMPRESSION_VERSION_FLAG)) {
ok = opt_compression_version.emplace().fetch(tlf);
}
if (ok && static_cast<bool>(flags.value & STATS_FLAG)) {
ok = opt_stats.emplace().fetch(tlf);
}
if (ok && static_cast<bool>(flags.value & EPOCH_NUMBER_FLAG)) {
kphp::log::assertion(static_cast<bool>(flags.value & VIEW_NUMBER_FLAG));
ok = opt_epoch_number.emplace().fetch(tlf) && opt_view_number.emplace().fetch(tlf);
}
return ok;
}

void rpcReqResultExtra::store(tl::storer& tls, const tl::mask& flags) const noexcept {
if (static_cast<bool>(flags.value & BINLOG_POS_FLAG)) {
binlog_pos.store(tls);
kphp::log::assertion(opt_binlog_pos.has_value());
opt_binlog_pos->store(tls);
}
if (static_cast<bool>(flags.value & BINLOG_TIME_FLAG)) {
binlog_time.store(tls);
kphp::log::assertion(opt_binlog_time.has_value());
opt_binlog_time->store(tls);
}
if (static_cast<bool>(flags.value) & ENGINE_PID_FLAG) {
engine_pid.store(tls);
if (static_cast<bool>(flags.value & ENGINE_PID_FLAG)) {
kphp::log::assertion(opt_engine_pid.has_value());
opt_engine_pid->store(tls);
}
if (static_cast<bool>(flags.value & REQUEST_SIZE_FLAG)) {
kphp::log::assertion(static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG));
request_size.store(tls), response_size.store(tls);
kphp::log::assertion(opt_request_size.has_value() && static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG) && opt_response_size.has_value());
opt_request_size->store(tls), opt_response_size->store(tls);
}
if (static_cast<bool>(flags.value & FAILED_SUBQUERIES_FLAG)) {
failed_subqueries.store(tls);
kphp::log::assertion(opt_failed_subqueries.has_value());
opt_failed_subqueries->store(tls);
}
if (static_cast<bool>(flags.value & COMPRESSION_VERSION_FLAG)) {
compression_version.store(tls);
kphp::log::assertion(opt_compression_version.has_value());
opt_compression_version->store(tls);
}
if (static_cast<bool>(flags.value & STATS_FLAG)) {
stats.store(tls);
kphp::log::assertion(opt_stats.has_value());
opt_stats->store(tls);
}
if (static_cast<bool>(flags.value & EPOCH_NUMBER_FLAG)) {
kphp::log::assertion(static_cast<bool>(flags.value & VIEW_NUMBER_FLAG));
epoch_number.store(tls), view_number.store(tls);
kphp::log::assertion(opt_epoch_number.has_value() && static_cast<bool>(flags.value & VIEW_NUMBER_FLAG) && opt_view_number.has_value());
opt_epoch_number->store(tls), opt_view_number->store(tls);
}
}

size_t rpcReqResultExtra::footprint(const tl::mask& flags) const noexcept {
size_t footprint{};
if (static_cast<bool>(flags.value & BINLOG_POS_FLAG)) {
footprint += binlog_pos.footprint();
kphp::log::assertion(opt_binlog_pos.has_value());
footprint += opt_binlog_pos->footprint();
}
if (static_cast<bool>(flags.value & BINLOG_TIME_FLAG)) {
footprint += binlog_time.footprint();
kphp::log::assertion(opt_binlog_time.has_value());
footprint += opt_binlog_time->footprint();
}
if (static_cast<bool>(flags.value) & ENGINE_PID_FLAG) {
footprint += engine_pid.footprint();
if (static_cast<bool>(flags.value & ENGINE_PID_FLAG)) {
kphp::log::assertion(opt_engine_pid.has_value());
footprint += opt_engine_pid->footprint();
}
if (static_cast<bool>(flags.value & REQUEST_SIZE_FLAG)) {
kphp::log::assertion(static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG));
footprint += request_size.footprint() + response_size.footprint();
kphp::log::assertion(opt_request_size.has_value() && static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG) && opt_response_size.has_value());
footprint += opt_request_size->footprint() + opt_response_size->footprint();
}
if (static_cast<bool>(flags.value & FAILED_SUBQUERIES_FLAG)) {
footprint += failed_subqueries.footprint();
kphp::log::assertion(opt_failed_subqueries.has_value());
footprint += opt_failed_subqueries->footprint();
}
if (static_cast<bool>(flags.value & COMPRESSION_VERSION_FLAG)) {
footprint += compression_version.footprint();
kphp::log::assertion(opt_compression_version.has_value());
footprint += opt_compression_version->footprint();
}
if (static_cast<bool>(flags.value & STATS_FLAG)) {
footprint += stats.footprint();
kphp::log::assertion(opt_stats.has_value());
footprint += opt_stats->footprint();
}
if (static_cast<bool>(flags.value & EPOCH_NUMBER_FLAG)) {
kphp::log::assertion(static_cast<bool>(flags.value & VIEW_NUMBER_FLAG));
footprint += epoch_number.footprint() + view_number.footprint();
kphp::log::assertion(opt_epoch_number.has_value() && static_cast<bool>(flags.value & VIEW_NUMBER_FLAG) && opt_view_number.has_value());
footprint += opt_epoch_number->footprint() + opt_view_number->footprint();
}
return footprint;
}
Expand Down
48 changes: 38 additions & 10 deletions runtime-light/tl/tl-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "common/tl/constants/common.h"
#include "runtime-common/core/allocator/script-allocator.h"
#include "runtime-common/core/std/containers.h"
#include "runtime-light/stdlib/diagnostics/logs.h"
#include "runtime-light/tl/tl-core.h"

namespace tl {
Expand Down Expand Up @@ -1142,17 +1143,18 @@ class rpcReqResultExtra final {
static constexpr uint32_t VIEW_NUMBER_FLAG = vk::tl::common::rpc_req_result_extra_flags::view_number;

public:
tl::i64 binlog_pos{};
tl::i64 binlog_time{};
tl::netPid engine_pid{};
tl::i32 request_size{};
tl::i32 response_size{};
tl::i32 failed_subqueries{};
tl::i32 compression_version{};
tl::dictionary<tl::string> stats{};
tl::i64 epoch_number{};
tl::i64 view_number{};
std::optional<tl::i64> opt_binlog_pos;
std::optional<tl::i64> opt_binlog_time;
std::optional<tl::netPid> opt_engine_pid;
std::optional<tl::i32> opt_request_size;
std::optional<tl::i32> opt_response_size;
std::optional<tl::i32> opt_failed_subqueries;
std::optional<tl::i32> opt_compression_version;
std::optional<tl::dictionary<tl::string>> opt_stats;
std::optional<tl::i64> opt_epoch_number;
std::optional<tl::i64> opt_view_number;

bool fetch(tl::fetcher& tlf, const tl::mask& flags) noexcept;
void store(tl::storer& tls, const tl::mask& flags) const noexcept;

size_t footprint(const tl::mask& flags) const noexcept;
Expand All @@ -1166,6 +1168,32 @@ struct RpcReqResultExtra final {
}
};

struct reqResultHeader final {
tl::mask flags{};
tl::rpcReqResultExtra extra{};
std::span<const std::byte> result;

bool fetch(tl::fetcher& tlf) noexcept {
if (!flags.fetch(tlf) || !extra.fetch(tlf, flags)) [[unlikely]] {
return false;
}
auto opt_result{tlf.fetch_bytes(tlf.remaining())};
kphp::log::assertion(opt_result.has_value());
result = *opt_result;
return true;
}
};

struct rpcReqError final {
tl::i64 query_id{};
tl::i32 error_code{};
tl::string error{};

bool fetch(tl::fetcher& tlf) noexcept {
return query_id.fetch(tlf) && error_code.fetch(tlf) && error.fetch(tlf);
}
};

struct k2RpcResponseError final {
tl::i32 error_code{};
tl::string error{};
Expand Down
Loading