Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/VecSim/algorithms/svs/svs.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
leanvec_dim{
svs_details::getOrDefault(params.leanvec_dim, SVS_VAMANA_DEFAULT_LEANVEC_DIM)},
epsilon{svs_details::getOrDefault(params.epsilon, SVS_VAMANA_DEFAULT_EPSILON)},
is_two_level_lvq{isTwoLevelLVQ(params.quantBits)}, threadpool_{this->logCallbackCtx},
impl_{nullptr} {
is_two_level_lvq{isTwoLevelLVQ(params.quantBits)},
threadpool_{this->allocator, this->logCallbackCtx}, impl_{nullptr} {
logger_ = makeLogger();
if (params.num_threads != 0) {
this->log(VecSimCommonStrings::LOG_WARNING_STRING,
Expand Down Expand Up @@ -429,8 +429,10 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl

VecSimDebugInfoIterator *debugInfoIterator() const override {
VecSimIndexDebugInfo info = this->debugInfo();
// For readability. Update this number when needed.
size_t numberOfInfoFields = 23;
// Capacity hint for the iterator. Must equal the number of addInfoField()
// calls below (1 for ALGORITHM + 9 from addCommonInfoToIterator + 16 SVS-specific).
// Update this number when fields are added or removed.
size_t numberOfInfoFields = 26;
VecSimDebugInfoIterator *infoIterator =
new VecSimDebugInfoIterator(numberOfInfoFields, this->allocator);

Expand Down Expand Up @@ -517,6 +519,15 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
.fieldType = INFOFIELD_FLOAT64,
.fieldValue = {FieldValue{.floatingPointValue = info.svsInfo.epsilon}}});

// Bytes held by the shared SVS thread pool singleton (slot vector +
// per-slot ThreadSlot objects, allocated through the pool's tracked
// allocator). Always present (value may be 0).
infoIterator->addInfoField(VecSim_InfoField{
.fieldName = VecSimCommonStrings::SHARED_SVS_THREADPOOL_MEMORY_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {
FieldValue{.uintegerValue = VecSimSVSThreadPool::getSharedAllocationSize()}}});

return infoIterator;
}

Expand Down
66 changes: 58 additions & 8 deletions src/VecSim/algorithms/svs/svs_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,35 +407,73 @@ class VecSimSVSThreadPoolImpl {
std::vector<ThreadSlot *> slots_;
};

// Allocator type for the slots vector.
using SlotPtr = std::shared_ptr<ThreadSlot>;
using SlotVecAllocator = VecsimSTLAllocator<SlotPtr>;

// Create a pool with `num_threads` total parallelism (including the calling thread).
// Spawns `num_threads - 1` worker OS threads. num_threads must be >= 1.
// In write-in-place mode, the pool is created with num_threads == 1 (0 worker threads,
// only the calling thread participates).
// Private — use instance() to access the shared singleton.
explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1) {
explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1)
: allocator_(VecSimAllocator::newVecsimAllocator()), slots_(SlotVecAllocator(allocator_)) {
assert(num_threads && "VecSimSVSThreadPoolImpl should not be created with 0 threads");
slots_.reserve(num_threads - 1);
for (size_t i = 0; i < num_threads - 1; ++i) {
slots_.push_back(std::make_shared<ThreadSlot>());
slots_.push_back(
std::allocate_shared<ThreadSlot>(VecsimSTLAllocator<ThreadSlot>(allocator_)));
}
}

// Set to true the first time instance() constructs the singleton. Allows other
// code paths (e.g., global stats reporting) to query whether the pool has been
// touched without forcing its lazy construction.
static std::atomic<bool> &initialized_flag() {
static std::atomic<bool> flag{false};
return flag;
}

public:
// Singleton accessor for the shared SVS thread pool.
// Always valid — initialized with size 1 (write-in-place mode: 0 worker threads,
// only the calling thread participates). Resized on VecSim_UpdateThreadPoolSize() calls.
static std::shared_ptr<VecSimSVSThreadPoolImpl> instance() {
static auto shared_pool = std::shared_ptr<VecSimSVSThreadPoolImpl>(
new VecSimSVSThreadPoolImpl(1), [](VecSimSVSThreadPoolImpl *) { /* leak at exit */ });
static auto shared_pool = [] {
auto p = std::shared_ptr<VecSimSVSThreadPoolImpl>(
new VecSimSVSThreadPoolImpl(1),
[](VecSimSVSThreadPoolImpl *) { /* leak at exit */ });
initialized_flag().store(true, std::memory_order_release);
return p;
}();
return shared_pool;
}

// Returns true iff instance() has ever been called (singleton constructed).
static bool isInitialized() { return initialized_flag().load(std::memory_order_acquire); }

// Total parallelism: worker slots + 1 (the calling thread always participates).
size_t size() const {
std::lock_guard lock{pool_mutex_};
return slots_.size() + 1;
}

// Bytes currently allocated through the pool's internal allocator (the slots vector
// and the ThreadSlot objects). Does not include allocations performed by SVS itself
// outside of the pool, nor per-index wrapper state.
size_t getAllocationSize() const { return allocator_->getAllocationSize(); }

// Bytes allocated by the shared pool singleton. Returns 0 if the singleton has
// never been constructed (e.g., no SVS index was ever created and
// VecSim_UpdateThreadPoolSize was never called). Safe to call from any context;
// does not force singleton construction.
static size_t getSharedAllocationSize() {
if (!isInitialized()) {
return 0;
}
return instance()->getAllocationSize();
}

// Physically resize the pool. Creates new OS threads on grow, shuts down idle threads
// on shrink. new_size is total parallelism including the calling thread (minimum 1).
// Occupied threads (held by renters) survive shrink via the deferred-resize protocol —
Expand Down Expand Up @@ -599,7 +637,8 @@ class VecSimSVSThreadPoolImpl {
// Grow (or same size): apply immediately, cancel any pending deferred shrink.
deferred_size_.reset();
for (size_t i = slots_.size(); i < target_workers; ++i) {
slots_.push_back(std::make_shared<ThreadSlot>());
slots_.push_back(
std::allocate_shared<ThreadSlot>(VecsimSTLAllocator<ThreadSlot>(allocator_)));
}
} else {
// Shrink.
Expand All @@ -615,8 +654,9 @@ class VecSimSVSThreadPoolImpl {
}
}

std::shared_ptr<VecSimAllocator> allocator_; // pool's own allocator for memory tracking
mutable std::mutex pool_mutex_;
std::vector<std::shared_ptr<ThreadSlot>> slots_;
std::vector<SlotPtr, SlotVecAllocator> slots_;
size_t pending_jobs_ = 0; // jobs currently scheduled / in-flight
std::optional<size_t> deferred_size_; // resize target deferred until pending_jobs_ == 0
};
Expand Down Expand Up @@ -646,9 +686,14 @@ class VecSimSVSThreadPool {
// parallelism_ starts at 1 (the calling thread always participates), matching the
// pool's minimum size. Safe for immediate use in write-in-place mode without an
// explicit setParallelism() call.
explicit VecSimSVSThreadPool(void *log_ctx = nullptr)
// parallelism_ is allocated through the provided VecsimAllocator so that the
// allocation is tracked by the index's memory accounting.
explicit VecSimSVSThreadPool(const std::shared_ptr<VecSimAllocator> &allocator,
void *log_ctx = nullptr)
: pool_(VecSimSVSThreadPoolImpl::instance()),
parallelism_(std::make_shared<std::atomic<size_t>>(1)), log_ctx_(log_ctx) {}
parallelism_(std::allocate_shared<std::atomic<size_t>>(
VecsimSTLAllocator<std::atomic<size_t>>(allocator), size_t{1})),
log_ctx_(log_ctx) {}

// Resize the shared pool singleton. Delegates to VecSimSVSThreadPoolImpl::instance().
static void resize(size_t new_size) { VecSimSVSThreadPoolImpl::instance()->resize(new_size); }
Expand Down Expand Up @@ -677,6 +722,11 @@ class VecSimSVSThreadPool {
// Shared pool size — used by scheduling to decide how many reserve jobs to submit.
static size_t poolSize() { return VecSimSVSThreadPoolImpl::instance()->size(); }

// See VecSimSVSThreadPoolImpl::getSharedAllocationSize().
static size_t getSharedAllocationSize() {
return VecSimSVSThreadPoolImpl::getSharedAllocationSize();
}

// Delegates to the shared pool's parallel_for, passing the per-index log context.
// n may be less than parallelism_ when the problem size is smaller than the
// thread count (SVS computes n = min(arg.size(), pool.size())).
Expand Down
20 changes: 20 additions & 0 deletions src/VecSim/index_factories/svs_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,30 @@
#include "VecSim/memory/vecsim_malloc.h"
#include "VecSim/vec_sim_index.h"
#include "VecSim/algorithms/svs/svs.h"
#include "VecSim/algorithms/svs/svs_utils.h"
#include "VecSim/index_factories/components/components_factory.h"
#include "VecSim/index_factories/factory_utils.h"
#include <atomic>

namespace SVSFactory {

namespace {

// Bytes consumed by the per-index VecSimSVSThreadPool::parallelism_ allocation
// (a std::allocate_shared<std::atomic<size_t>> through the index's allocator).
// The exact size of the inplace shared_ptr control block is implementation-defined,
// so measure the allocation delta once at startup against a throwaway VecSimAllocator.
size_t getThreadPoolParallelismAllocationSize() {
static const size_t size = []() {
auto allocator = VecSimAllocator::newVecsimAllocator();
size_t before = allocator->getAllocationSize();
auto p = std::allocate_shared<std::atomic<size_t>>(
VecsimSTLAllocator<std::atomic<size_t>>(allocator), size_t{1});
return allocator->getAllocationSize() - before;
}();
return size;
}

// NewVectorsImpl() is the chain of a template helper functions to create a new SVS index.
template <typename MetricType, typename DataType, size_t QuantBits, size_t ResidualBits,
bool IsLeanVec>
Expand Down Expand Up @@ -230,6 +247,9 @@ size_t EstimateInitialSize(const SVSParams *params, bool is_normalized) {
est += EstimateSVSIndexSize(params);
est += EstimateComponentsMemorySVS(params->type, params->metric, is_normalized);
est += sizeof(DataBlocksContainer) + allocations_overhead;
// Per-index VecSimSVSThreadPool::parallelism_ allocation tracked through
// the index's VecSimAllocator (see VecSimSVSThreadPool ctor).
est += getThreadPoolParallelismAllocationSize();
return est;
}

Expand Down
4 changes: 4 additions & 0 deletions src/VecSim/utils/vec_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ const char *VecSimCommonStrings::TIERED_SVS_UPDATE_THRESHOLD_STRING = "TIERED_SV
const char *VecSimCommonStrings::TIERED_SVS_THREADS_RESERVE_TIMEOUT_STRING =
"TIERED_SVS_THREADS_RESERVE_TIMEOUT";

const char *VecSimCommonStrings::GLOBAL_MEMORY_STRING = "GLOBAL_MEMORY";
const char *VecSimCommonStrings::SHARED_SVS_THREADPOOL_MEMORY_STRING =
"SHARED_SVS_THREADPOOL_MEMORY";

// Log levels
const char *VecSimCommonStrings::LOG_DEBUG_STRING = "debug";
const char *VecSimCommonStrings::LOG_VERBOSE_STRING = "verbose";
Expand Down
10 changes: 10 additions & 0 deletions src/VecSim/utils/vec_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ struct VecSimCommonStrings {
static const char *TIERED_SVS_UPDATE_THRESHOLD_STRING;
static const char *TIERED_SVS_THREADS_RESERVE_TIMEOUT_STRING;

// Total bytes returned by VecSim_GetGlobalMemory() — process-wide VecSim
// allocations not tied to any single index. Appended at the top level of
// every VECSIM_INFO response so callers don't need to know what specifically
// contributes to it.
static const char *GLOBAL_MEMORY_STRING;
// Bytes allocated by the shared (global) SVS thread pool singleton. A
// breakdown of GLOBAL_MEMORY exposed only in SVS tiered indexes' debug
// info, since that's where the pool semantically belongs.
static const char *SHARED_SVS_THREADPOOL_MEMORY_STRING;

// Log levels
static const char *LOG_DEBUG_STRING;
static const char *LOG_VERBOSE_STRING;
Expand Down
16 changes: 15 additions & 1 deletion src/VecSim/vec_sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,17 @@ extern "C" VecSimIndexDebugInfo VecSimIndex_DebugInfo(VecSimIndex *index) {
}

extern "C" VecSimDebugInfoIterator *VecSimIndex_DebugInfoIterator(VecSimIndex *index) {
return index->debugInfoIterator();
auto *infoIterator = index->debugInfoIterator();
// Append VecSim_GetGlobalMemory() — process-wide VecSim memory not tied to
// any single index — at the top level of every algorithm's debug info.
// Always present (value may be 0); algorithm-specific breakdowns of this
// value (e.g. the SVS thread pool in SVS tiered) live in their respective
// debugInfoIterator() overrides.
infoIterator->addInfoField(
VecSim_InfoField{.fieldName = VecSimCommonStrings::GLOBAL_MEMORY_STRING,
.fieldType = INFOFIELD_UINT64,
.fieldValue = {FieldValue{.uintegerValue = VecSim_GetGlobalMemory()}}});
return infoIterator;
}

extern "C" VecSimIndexBasicInfo VecSimIndex_BasicInfo(VecSimIndex *index) {
Expand All @@ -390,6 +400,10 @@ extern "C" VecSimIndexStatsInfo VecSimIndex_StatsInfo(VecSimIndex *index) {
return index->statisticInfo();
}

extern "C" size_t VecSim_GetGlobalMemory(void) {
return VecSimSVSThreadPool::getSharedAllocationSize();
}

extern "C" VecSimBatchIterator *VecSimBatchIterator_New(VecSimIndex *index, const void *queryBlob,
VecSimQueryParams *queryParams) {
return index->newBatchIterator(queryBlob, queryParams);
Expand Down
12 changes: 12 additions & 0 deletions src/VecSim/vec_sim.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ VecSimIndexBasicInfo VecSimIndex_BasicInfo(VecSimIndex *index);
*/
VecSimIndexStatsInfo VecSimIndex_StatsInfo(VecSimIndex *index);

/**
* @brief Return process-wide VecSim statistics that are not tied to any single index.
* Currently exposes the memory used by the shared SVS thread pool singleton.
* Safe to call without holding any index lock; does not force initialization of the
* shared SVS pool (returns 0 in fields whose backing singleton has not been touched).
*
* @return Total bytes currently allocated by VecSim outside any single index
* (e.g. the shared SVS thread pool singleton). 0 if no such allocations
* have been made.
*/
size_t VecSim_GetGlobalMemory(void);

/**
* @brief Returns an info iterator for generic reply purposes.
*
Expand Down
5 changes: 4 additions & 1 deletion src/VecSim/vec_sim_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,10 @@ typedef struct {
* production without worrying about performance
*/
typedef struct {
size_t memory;
size_t memory; // Memory tracked by the index's own allocator. Does NOT include
// process-wide allocations such as the shared SVS thread pool;
// those are reported via VecSim_GetGlobalMemory() so callers
// that aggregate across indexes don't double-count them.
size_t numberOfMarkedDeleted; // The number of vectors that are marked as deleted (HNSW/tiered
// only).
size_t directHNSWInsertions; // Count of vectors inserted directly into HNSW by main thread
Expand Down
54 changes: 54 additions & 0 deletions tests/unit/test_svs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3362,6 +3362,60 @@ TEST(SVSTest, NumThreadsParamIgnored) {
VecSimIndexInterface::logCallback = nullptr;
}

// SVS debug info exposes both:
// * GLOBAL_MEMORY — top-level field appended by VecSimIndex_DebugInfoIterator
// (mirrors VecSim_GetGlobalMemory()).
// * SHARED_SVS_THREADPOOL_MEMORY — emitted by SVSIndex::debugInfoIterator().
// They are sourced from the same VecSimSVSThreadPool::getSharedAllocationSize()
// (the only contributor to global memory today), so their values must match.
TYPED_TEST(SVSTest, debugInfoGlobalMemoryEqualsSharedSVSThreadPoolMemory) {
// Ensure the shared SVS thread pool singleton has allocated memory so both
// fields report a non-zero value. resize() always lazy-initializes the singleton.
VecSim_UpdateThreadPoolSize(2);
ASSERT_GT(VecSim_GetGlobalMemory(), 0u);

size_t dim = 4;
SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2};
VecSimIndex *index = this->CreateNewIndex(params);
ASSERT_INDEX(index);

VecSimDebugInfoIterator *infoIterator = VecSimIndex_DebugInfoIterator(index);

bool seen_global = false;
bool seen_shared = false;
uint64_t global_value = 0;
uint64_t shared_value = 0;
while (VecSimDebugInfoIterator_HasNextField(infoIterator)) {
VecSim_InfoField *f = VecSimDebugInfoIterator_NextField(infoIterator);
if (!strcmp(f->fieldName, VecSimCommonStrings::GLOBAL_MEMORY_STRING)) {
ASSERT_FALSE(seen_global) << "GLOBAL_MEMORY appears more than once";
ASSERT_EQ(f->fieldType, INFOFIELD_UINT64);
global_value = f->fieldValue.uintegerValue;
seen_global = true;
} else if (!strcmp(f->fieldName,
VecSimCommonStrings::SHARED_SVS_THREADPOOL_MEMORY_STRING)) {
ASSERT_FALSE(seen_shared) << "SHARED_SVS_THREADPOOL_MEMORY appears more than once";
ASSERT_EQ(f->fieldType, INFOFIELD_UINT64);
shared_value = f->fieldValue.uintegerValue;
seen_shared = true;
}
}
EXPECT_TRUE(seen_global) << "GLOBAL_MEMORY field missing from SVS debug info";
EXPECT_TRUE(seen_shared) << "SHARED_SVS_THREADPOOL_MEMORY field missing from SVS debug info";
EXPECT_EQ(global_value, shared_value)
<< "GLOBAL_MEMORY and SHARED_SVS_THREADPOOL_MEMORY should report the same bytes "
"(only the SVS thread pool contributes to VecSim global memory today)";
EXPECT_EQ(global_value, VecSim_GetGlobalMemory());

VecSimDebugInfoIterator_Free(infoIterator);
VecSimIndex_Free(index);

// Reset the shared singleton pool to size 1 so the next test is not affected.
// Use VecSimSVSThreadPool::resize(1) directly (matching other thread-pool tests)
// to avoid the write-mode side effect that VecSim_UpdateThreadPoolSize(0) carries.
VecSimSVSThreadPool::resize(1);
}

#else // HAVE_SVS

TEST(SVSTest, svs_not_supported) {
Expand Down
Loading
Loading