diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index ffe40d2c1..06d384037 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -361,8 +361,8 @@ class SVSIndex : public VecSimIndexAbstract, fl leanvec_dim{ svs_details::getOrDefault(params.leanvec_dim, SVS_VAMANA_DEFAULT_LEANVEC_DIM)}, epsilon{svs_details::getOrDefault(params.epsilon, SVS_VAMANA_DEFAULT_EPSILON)}, - is_two_level_lvq{isTwoLevelLVQ(params.quantBits)}, threadpool_{this->logCallbackCtx}, - impl_{nullptr} { + is_two_level_lvq{isTwoLevelLVQ(params.quantBits)}, + threadpool_{this->allocator, this->logCallbackCtx}, impl_{nullptr} { logger_ = makeLogger(); if (params.num_threads != 0) { this->log(VecSimCommonStrings::LOG_WARNING_STRING, @@ -429,8 +429,10 @@ class SVSIndex : public VecSimIndexAbstract, fl VecSimDebugInfoIterator *debugInfoIterator() const override { VecSimIndexDebugInfo info = this->debugInfo(); - // For readability. Update this number when needed. - size_t numberOfInfoFields = 23; + // Capacity hint for the iterator. Must equal the number of addInfoField() + // calls below (1 for ALGORITHM + 9 from addCommonInfoToIterator + 16 SVS-specific). + // Update this number when fields are added or removed. + size_t numberOfInfoFields = 26; VecSimDebugInfoIterator *infoIterator = new VecSimDebugInfoIterator(numberOfInfoFields, this->allocator); @@ -517,6 +519,15 @@ class SVSIndex : public VecSimIndexAbstract, fl .fieldType = INFOFIELD_FLOAT64, .fieldValue = {FieldValue{.floatingPointValue = info.svsInfo.epsilon}}}); + // Bytes held by the shared SVS thread pool singleton (slot vector + + // per-slot ThreadSlot objects, allocated through the pool's tracked + // allocator). Always present (value may be 0). + infoIterator->addInfoField(VecSim_InfoField{ + .fieldName = VecSimCommonStrings::SHARED_SVS_THREADPOOL_MEMORY_STRING, + .fieldType = INFOFIELD_UINT64, + .fieldValue = { + FieldValue{.uintegerValue = VecSimSVSThreadPool::getSharedAllocationSize()}}}); + return infoIterator; } diff --git a/src/VecSim/algorithms/svs/svs_utils.h b/src/VecSim/algorithms/svs/svs_utils.h index 3e93faa79..0ec36849c 100644 --- a/src/VecSim/algorithms/svs/svs_utils.h +++ b/src/VecSim/algorithms/svs/svs_utils.h @@ -407,35 +407,73 @@ class VecSimSVSThreadPoolImpl { std::vector slots_; }; + // Allocator type for the slots vector. + using SlotPtr = std::shared_ptr; + using SlotVecAllocator = VecsimSTLAllocator; + // Create a pool with `num_threads` total parallelism (including the calling thread). // Spawns `num_threads - 1` worker OS threads. num_threads must be >= 1. // In write-in-place mode, the pool is created with num_threads == 1 (0 worker threads, // only the calling thread participates). // Private — use instance() to access the shared singleton. - explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1) { + explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1) + : allocator_(VecSimAllocator::newVecsimAllocator()), slots_(SlotVecAllocator(allocator_)) { assert(num_threads && "VecSimSVSThreadPoolImpl should not be created with 0 threads"); slots_.reserve(num_threads - 1); for (size_t i = 0; i < num_threads - 1; ++i) { - slots_.push_back(std::make_shared()); + slots_.push_back( + std::allocate_shared(VecsimSTLAllocator(allocator_))); } } + // Set to true the first time instance() constructs the singleton. Allows other + // code paths (e.g., global stats reporting) to query whether the pool has been + // touched without forcing its lazy construction. + static std::atomic &initialized_flag() { + static std::atomic flag{false}; + return flag; + } + public: // Singleton accessor for the shared SVS thread pool. // Always valid — initialized with size 1 (write-in-place mode: 0 worker threads, // only the calling thread participates). Resized on VecSim_UpdateThreadPoolSize() calls. static std::shared_ptr instance() { - static auto shared_pool = std::shared_ptr( - new VecSimSVSThreadPoolImpl(1), [](VecSimSVSThreadPoolImpl *) { /* leak at exit */ }); + static auto shared_pool = [] { + auto p = std::shared_ptr( + new VecSimSVSThreadPoolImpl(1), + [](VecSimSVSThreadPoolImpl *) { /* leak at exit */ }); + initialized_flag().store(true, std::memory_order_release); + return p; + }(); return shared_pool; } + // Returns true iff instance() has ever been called (singleton constructed). + static bool isInitialized() { return initialized_flag().load(std::memory_order_acquire); } + // Total parallelism: worker slots + 1 (the calling thread always participates). size_t size() const { std::lock_guard lock{pool_mutex_}; return slots_.size() + 1; } + // Bytes currently allocated through the pool's internal allocator (the slots vector + // and the ThreadSlot objects). Does not include allocations performed by SVS itself + // outside of the pool, nor per-index wrapper state. + size_t getAllocationSize() const { return allocator_->getAllocationSize(); } + + // Bytes allocated by the shared pool singleton. Returns 0 if the singleton has + // never been constructed (e.g., no SVS index was ever created and + // VecSim_UpdateThreadPoolSize was never called). Safe to call from any context; + // does not force singleton construction. + static size_t getSharedAllocationSize() { + if (!isInitialized()) { + return 0; + } + return instance()->getAllocationSize(); + } + // Physically resize the pool. Creates new OS threads on grow, shuts down idle threads // on shrink. new_size is total parallelism including the calling thread (minimum 1). // Occupied threads (held by renters) survive shrink via the deferred-resize protocol — @@ -599,7 +637,8 @@ class VecSimSVSThreadPoolImpl { // Grow (or same size): apply immediately, cancel any pending deferred shrink. deferred_size_.reset(); for (size_t i = slots_.size(); i < target_workers; ++i) { - slots_.push_back(std::make_shared()); + slots_.push_back( + std::allocate_shared(VecsimSTLAllocator(allocator_))); } } else { // Shrink. @@ -615,8 +654,9 @@ class VecSimSVSThreadPoolImpl { } } + std::shared_ptr allocator_; // pool's own allocator for memory tracking mutable std::mutex pool_mutex_; - std::vector> slots_; + std::vector slots_; size_t pending_jobs_ = 0; // jobs currently scheduled / in-flight std::optional deferred_size_; // resize target deferred until pending_jobs_ == 0 }; @@ -646,9 +686,14 @@ class VecSimSVSThreadPool { // parallelism_ starts at 1 (the calling thread always participates), matching the // pool's minimum size. Safe for immediate use in write-in-place mode without an // explicit setParallelism() call. - explicit VecSimSVSThreadPool(void *log_ctx = nullptr) + // parallelism_ is allocated through the provided VecsimAllocator so that the + // allocation is tracked by the index's memory accounting. + explicit VecSimSVSThreadPool(const std::shared_ptr &allocator, + void *log_ctx = nullptr) : pool_(VecSimSVSThreadPoolImpl::instance()), - parallelism_(std::make_shared>(1)), log_ctx_(log_ctx) {} + parallelism_(std::allocate_shared>( + VecsimSTLAllocator>(allocator), size_t{1})), + log_ctx_(log_ctx) {} // Resize the shared pool singleton. Delegates to VecSimSVSThreadPoolImpl::instance(). static void resize(size_t new_size) { VecSimSVSThreadPoolImpl::instance()->resize(new_size); } @@ -677,6 +722,11 @@ class VecSimSVSThreadPool { // Shared pool size — used by scheduling to decide how many reserve jobs to submit. static size_t poolSize() { return VecSimSVSThreadPoolImpl::instance()->size(); } + // See VecSimSVSThreadPoolImpl::getSharedAllocationSize(). + static size_t getSharedAllocationSize() { + return VecSimSVSThreadPoolImpl::getSharedAllocationSize(); + } + // Delegates to the shared pool's parallel_for, passing the per-index log context. // n may be less than parallelism_ when the problem size is smaller than the // thread count (SVS computes n = min(arg.size(), pool.size())). diff --git a/src/VecSim/index_factories/svs_factory.cpp b/src/VecSim/index_factories/svs_factory.cpp index b0fbaadb1..f793ce1bf 100644 --- a/src/VecSim/index_factories/svs_factory.cpp +++ b/src/VecSim/index_factories/svs_factory.cpp @@ -13,13 +13,30 @@ #include "VecSim/memory/vecsim_malloc.h" #include "VecSim/vec_sim_index.h" #include "VecSim/algorithms/svs/svs.h" +#include "VecSim/algorithms/svs/svs_utils.h" #include "VecSim/index_factories/components/components_factory.h" #include "VecSim/index_factories/factory_utils.h" +#include namespace SVSFactory { namespace { +// Bytes consumed by the per-index VecSimSVSThreadPool::parallelism_ allocation +// (a std::allocate_shared> through the index's allocator). +// The exact size of the inplace shared_ptr control block is implementation-defined, +// so measure the allocation delta once at startup against a throwaway VecSimAllocator. +size_t getThreadPoolParallelismAllocationSize() { + static const size_t size = []() { + auto allocator = VecSimAllocator::newVecsimAllocator(); + size_t before = allocator->getAllocationSize(); + auto p = std::allocate_shared>( + VecsimSTLAllocator>(allocator), size_t{1}); + return allocator->getAllocationSize() - before; + }(); + return size; +} + // NewVectorsImpl() is the chain of a template helper functions to create a new SVS index. template @@ -230,6 +247,9 @@ size_t EstimateInitialSize(const SVSParams *params, bool is_normalized) { est += EstimateSVSIndexSize(params); est += EstimateComponentsMemorySVS(params->type, params->metric, is_normalized); est += sizeof(DataBlocksContainer) + allocations_overhead; + // Per-index VecSimSVSThreadPool::parallelism_ allocation tracked through + // the index's VecSimAllocator (see VecSimSVSThreadPool ctor). + est += getThreadPoolParallelismAllocationSize(); return est; } diff --git a/src/VecSim/utils/vec_utils.cpp b/src/VecSim/utils/vec_utils.cpp index c2688d0e3..c013b2030 100644 --- a/src/VecSim/utils/vec_utils.cpp +++ b/src/VecSim/utils/vec_utils.cpp @@ -89,6 +89,10 @@ const char *VecSimCommonStrings::TIERED_SVS_UPDATE_THRESHOLD_STRING = "TIERED_SV const char *VecSimCommonStrings::TIERED_SVS_THREADS_RESERVE_TIMEOUT_STRING = "TIERED_SVS_THREADS_RESERVE_TIMEOUT"; +const char *VecSimCommonStrings::GLOBAL_MEMORY_STRING = "GLOBAL_MEMORY"; +const char *VecSimCommonStrings::SHARED_SVS_THREADPOOL_MEMORY_STRING = + "SHARED_SVS_THREADPOOL_MEMORY"; + // Log levels const char *VecSimCommonStrings::LOG_DEBUG_STRING = "debug"; const char *VecSimCommonStrings::LOG_VERBOSE_STRING = "verbose"; diff --git a/src/VecSim/utils/vec_utils.h b/src/VecSim/utils/vec_utils.h index 3c7c2f8bb..0186e03de 100644 --- a/src/VecSim/utils/vec_utils.h +++ b/src/VecSim/utils/vec_utils.h @@ -87,6 +87,16 @@ struct VecSimCommonStrings { static const char *TIERED_SVS_UPDATE_THRESHOLD_STRING; static const char *TIERED_SVS_THREADS_RESERVE_TIMEOUT_STRING; + // Total bytes returned by VecSim_GetGlobalMemory() — process-wide VecSim + // allocations not tied to any single index. Appended at the top level of + // every VECSIM_INFO response so callers don't need to know what specifically + // contributes to it. + static const char *GLOBAL_MEMORY_STRING; + // Bytes allocated by the shared (global) SVS thread pool singleton. A + // breakdown of GLOBAL_MEMORY exposed only in SVS tiered indexes' debug + // info, since that's where the pool semantically belongs. + static const char *SHARED_SVS_THREADPOOL_MEMORY_STRING; + // Log levels static const char *LOG_DEBUG_STRING; static const char *LOG_VERBOSE_STRING; diff --git a/src/VecSim/vec_sim.cpp b/src/VecSim/vec_sim.cpp index 485ecf950..6e9699bd5 100644 --- a/src/VecSim/vec_sim.cpp +++ b/src/VecSim/vec_sim.cpp @@ -379,7 +379,17 @@ extern "C" VecSimIndexDebugInfo VecSimIndex_DebugInfo(VecSimIndex *index) { } extern "C" VecSimDebugInfoIterator *VecSimIndex_DebugInfoIterator(VecSimIndex *index) { - return index->debugInfoIterator(); + auto *infoIterator = index->debugInfoIterator(); + // Append VecSim_GetGlobalMemory() — process-wide VecSim memory not tied to + // any single index — at the top level of every algorithm's debug info. + // Always present (value may be 0); algorithm-specific breakdowns of this + // value (e.g. the SVS thread pool in SVS tiered) live in their respective + // debugInfoIterator() overrides. + infoIterator->addInfoField( + VecSim_InfoField{.fieldName = VecSimCommonStrings::GLOBAL_MEMORY_STRING, + .fieldType = INFOFIELD_UINT64, + .fieldValue = {FieldValue{.uintegerValue = VecSim_GetGlobalMemory()}}}); + return infoIterator; } extern "C" VecSimIndexBasicInfo VecSimIndex_BasicInfo(VecSimIndex *index) { @@ -390,6 +400,10 @@ extern "C" VecSimIndexStatsInfo VecSimIndex_StatsInfo(VecSimIndex *index) { return index->statisticInfo(); } +extern "C" size_t VecSim_GetGlobalMemory(void) { + return VecSimSVSThreadPool::getSharedAllocationSize(); +} + extern "C" VecSimBatchIterator *VecSimBatchIterator_New(VecSimIndex *index, const void *queryBlob, VecSimQueryParams *queryParams) { return index->newBatchIterator(queryBlob, queryParams); diff --git a/src/VecSim/vec_sim.h b/src/VecSim/vec_sim.h index 14e1bf65d..0f9be256f 100644 --- a/src/VecSim/vec_sim.h +++ b/src/VecSim/vec_sim.h @@ -183,6 +183,18 @@ VecSimIndexBasicInfo VecSimIndex_BasicInfo(VecSimIndex *index); */ VecSimIndexStatsInfo VecSimIndex_StatsInfo(VecSimIndex *index); +/** + * @brief Return process-wide VecSim statistics that are not tied to any single index. + * Currently exposes the memory used by the shared SVS thread pool singleton. + * Safe to call without holding any index lock; does not force initialization of the + * shared SVS pool (returns 0 in fields whose backing singleton has not been touched). + * + * @return Total bytes currently allocated by VecSim outside any single index + * (e.g. the shared SVS thread pool singleton). 0 if no such allocations + * have been made. + */ +size_t VecSim_GetGlobalMemory(void); + /** * @brief Returns an info iterator for generic reply purposes. * diff --git a/src/VecSim/vec_sim_common.h b/src/VecSim/vec_sim_common.h index ecf6c33b9..afa3bbd45 100644 --- a/src/VecSim/vec_sim_common.h +++ b/src/VecSim/vec_sim_common.h @@ -355,7 +355,10 @@ typedef struct { * production without worrying about performance */ typedef struct { - size_t memory; + size_t memory; // Memory tracked by the index's own allocator. Does NOT include + // process-wide allocations such as the shared SVS thread pool; + // those are reported via VecSim_GetGlobalMemory() so callers + // that aggregate across indexes don't double-count them. size_t numberOfMarkedDeleted; // The number of vectors that are marked as deleted (HNSW/tiered // only). size_t directHNSWInsertions; // Count of vectors inserted directly into HNSW by main thread diff --git a/tests/unit/test_svs.cpp b/tests/unit/test_svs.cpp index ab47e4a45..0ff7c1f0e 100644 --- a/tests/unit/test_svs.cpp +++ b/tests/unit/test_svs.cpp @@ -3362,6 +3362,60 @@ TEST(SVSTest, NumThreadsParamIgnored) { VecSimIndexInterface::logCallback = nullptr; } +// SVS debug info exposes both: +// * GLOBAL_MEMORY — top-level field appended by VecSimIndex_DebugInfoIterator +// (mirrors VecSim_GetGlobalMemory()). +// * SHARED_SVS_THREADPOOL_MEMORY — emitted by SVSIndex::debugInfoIterator(). +// They are sourced from the same VecSimSVSThreadPool::getSharedAllocationSize() +// (the only contributor to global memory today), so their values must match. +TYPED_TEST(SVSTest, debugInfoGlobalMemoryEqualsSharedSVSThreadPoolMemory) { + // Ensure the shared SVS thread pool singleton has allocated memory so both + // fields report a non-zero value. resize() always lazy-initializes the singleton. + VecSim_UpdateThreadPoolSize(2); + ASSERT_GT(VecSim_GetGlobalMemory(), 0u); + + size_t dim = 4; + SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2}; + VecSimIndex *index = this->CreateNewIndex(params); + ASSERT_INDEX(index); + + VecSimDebugInfoIterator *infoIterator = VecSimIndex_DebugInfoIterator(index); + + bool seen_global = false; + bool seen_shared = false; + uint64_t global_value = 0; + uint64_t shared_value = 0; + while (VecSimDebugInfoIterator_HasNextField(infoIterator)) { + VecSim_InfoField *f = VecSimDebugInfoIterator_NextField(infoIterator); + if (!strcmp(f->fieldName, VecSimCommonStrings::GLOBAL_MEMORY_STRING)) { + ASSERT_FALSE(seen_global) << "GLOBAL_MEMORY appears more than once"; + ASSERT_EQ(f->fieldType, INFOFIELD_UINT64); + global_value = f->fieldValue.uintegerValue; + seen_global = true; + } else if (!strcmp(f->fieldName, + VecSimCommonStrings::SHARED_SVS_THREADPOOL_MEMORY_STRING)) { + ASSERT_FALSE(seen_shared) << "SHARED_SVS_THREADPOOL_MEMORY appears more than once"; + ASSERT_EQ(f->fieldType, INFOFIELD_UINT64); + shared_value = f->fieldValue.uintegerValue; + seen_shared = true; + } + } + EXPECT_TRUE(seen_global) << "GLOBAL_MEMORY field missing from SVS debug info"; + EXPECT_TRUE(seen_shared) << "SHARED_SVS_THREADPOOL_MEMORY field missing from SVS debug info"; + EXPECT_EQ(global_value, shared_value) + << "GLOBAL_MEMORY and SHARED_SVS_THREADPOOL_MEMORY should report the same bytes " + "(only the SVS thread pool contributes to VecSim global memory today)"; + EXPECT_EQ(global_value, VecSim_GetGlobalMemory()); + + VecSimDebugInfoIterator_Free(infoIterator); + VecSimIndex_Free(index); + + // Reset the shared singleton pool to size 1 so the next test is not affected. + // Use VecSimSVSThreadPool::resize(1) directly (matching other thread-pool tests) + // to avoid the write-mode side effect that VecSim_UpdateThreadPoolSize(0) carries. + VecSimSVSThreadPool::resize(1); +} + #else // HAVE_SVS TEST(SVSTest, svs_not_supported) { diff --git a/tests/unit/test_svs_threadpool.cpp b/tests/unit/test_svs_threadpool.cpp index 0f228b45e..aac389336 100644 --- a/tests/unit/test_svs_threadpool.cpp +++ b/tests/unit/test_svs_threadpool.cpp @@ -44,6 +44,9 @@ class SVSThreadPoolTest : public ::testing::Test { // don't assert on nullptr log_ctx (we don't have an index context). saved_callback_ = VecSimIndexInterface::logCallback; VecSimIndexInterface::logCallback = nullptr; + // Reset the shared singleton pool to size 1 — earlier test suites may have + // resized it via VecSim_UpdateThreadPoolSize() and left it in that state. + VecSimSVSThreadPool::resize(1); } void TearDown() override { // Reset the shared singleton pool to size 1 so tests don't leak state. @@ -51,6 +54,9 @@ class SVSThreadPoolTest : public ::testing::Test { VecSimIndexInterface::logCallback = saved_callback_; } + // Allocator used by VecSimSVSThreadPool wrappers constructed in tests. + std::shared_ptr allocator_ = VecSimAllocator::newVecsimAllocator(); + private: logCallbackFunction saved_callback_ = nullptr; }; @@ -116,7 +122,7 @@ TEST_F(SVSThreadPoolTest, ShrinkWhileRented) { ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 5); // Wrapper A uses parallelism 3 → rents 2 workers (s0, s1). - VecSimSVSThreadPool wrapperA; + VecSimSVSThreadPool wrapperA{allocator_}; wrapperA.setParallelism(3); std::latch hold(1); // blocks rented workers @@ -154,7 +160,7 @@ TEST_F(SVSThreadPoolTest, ShrinkWhileRented) { // While wrapperA's threads are still alive (blocked on latch), run // parallel_for on the shrunk pool with a second wrapper using a free slot. - VecSimSVSThreadPool wrapperB; + VecSimSVSThreadPool wrapperB{allocator_}; // Parallelism 2 = 1 rented worker + calling thread. The pool has 3 slots // [s0, s1, s2] after shrink; s0 and s1 are occupied by wrapperA, so the // single rented worker will get s2 (the only free slot). @@ -183,7 +189,7 @@ TEST_F(SVSThreadPoolTest, GrowWhileRented) { ASSERT_EQ(VecSimSVSThreadPool::poolSize(), 3); // Wrapper A uses parallelism 3 → rents 2 workers (s0, s1). - VecSimSVSThreadPool wrapperA; + VecSimSVSThreadPool wrapperA{allocator_}; wrapperA.setParallelism(3); std::latch hold(1); // blocks rented workers @@ -219,7 +225,7 @@ TEST_F(SVSThreadPoolTest, GrowWhileRented) { // Wrapper B uses parallelism 3 → rents 2 workers. s0, s1 are occupied by // wrapperA, so it gets the 2 newly created slots s2, s3... but we only // need 2 of the 3 free slots (s2, s3 are free, only need 2). - VecSimSVSThreadPool wrapperB; + VecSimSVSThreadPool wrapperB{allocator_}; wrapperB.setParallelism(3); std::atomic_int resultB{0}; wrapperB.parallel_for([&](size_t) { resultB++; }, 3); @@ -250,7 +256,7 @@ TEST_F(SVSThreadPoolTest, GrowWhileRented) { TEST_F(SVSThreadPoolTest, ParallelismPropagationAcrossCopies) { VecSimSVSThreadPool::resize(8); - VecSimSVSThreadPool original; + VecSimSVSThreadPool original{allocator_}; original.setParallelism(2); ASSERT_EQ(original.size(), 2); @@ -282,8 +288,8 @@ TEST_F(SVSThreadPoolTest, ParallelismPropagationAcrossCopies) { TEST_F(SVSThreadPoolTest, TwoIndexesIndependentParallelism) { VecSimSVSThreadPool::resize(8); - VecSimSVSThreadPool wrapperA; - VecSimSVSThreadPool wrapperB; + VecSimSVSThreadPool wrapperA{allocator_}; + VecSimSVSThreadPool wrapperB{allocator_}; wrapperA.setParallelism(2); wrapperB.setParallelism(5); @@ -366,9 +372,9 @@ TEST_F(SVSThreadPoolTest, ConcurrentRentalFromTwoIndexes) { // Pool size 8: wrappers A (4) and B (4) sum to exactly 8. VecSimSVSThreadPool::resize(8); - VecSimSVSThreadPool wrapperA; + VecSimSVSThreadPool wrapperA{allocator_}; wrapperA.setParallelism(4); - VecSimSVSThreadPool wrapperB; + VecSimSVSThreadPool wrapperB{allocator_}; wrapperB.setParallelism(4); std::atomic_int resultA{0}; @@ -445,7 +451,7 @@ TEST_F(SVSThreadPoolTest, AllThreadsOccupied) { // Pool size 4 (3 worker slots). Wrapper A rents all 3. VecSimSVSThreadPool::resize(4); - VecSimSVSThreadPool wrapperA; + VecSimSVSThreadPool wrapperA{allocator_}; wrapperA.setParallelism(4); std::latch hold(1); @@ -471,7 +477,7 @@ TEST_F(SVSThreadPoolTest, AllThreadsOccupied) { << resultA << ", pool_size=" << wrapperA.poolSize(); // All 3 worker slots are occupied. Wrapper B tries to rent 1 worker. - VecSimSVSThreadPool wrapperB; + VecSimSVSThreadPool wrapperB{allocator_}; wrapperB.setParallelism(2); #ifdef NDEBUG diff --git a/tests/unit/test_svs_tiered.cpp b/tests/unit/test_svs_tiered.cpp index 40942a49a..66a73aa14 100644 --- a/tests/unit/test_svs_tiered.cpp +++ b/tests/unit/test_svs_tiered.cpp @@ -3931,7 +3931,8 @@ TEST(SVSTieredIndexTest, testThreadPool) { // Test VecSimSVSThreadPool with shared pool const size_t num_threads = 4; VecSimSVSThreadPool::resize(num_threads); - VecSimSVSThreadPool pool; + auto allocator = VecSimAllocator::newVecsimAllocator(); + VecSimSVSThreadPool pool{allocator}; ASSERT_EQ(pool.poolSize(), num_threads); ASSERT_EQ(pool.size(), 1); // parallelism starts at 1 (calling thread) ASSERT_EQ(pool.getParallelism(), 1); @@ -3974,7 +3975,7 @@ TEST(SVSTieredIndexTest, testThreadPool) { // Test write-in-place mode (pool with size 1) VecSimSVSThreadPool::resize(1); - VecSimSVSThreadPool inplace_pool; + VecSimSVSThreadPool inplace_pool{allocator}; inplace_pool.setParallelism(1); ASSERT_EQ(inplace_pool.size(), 1); ASSERT_EQ(inplace_pool.poolSize(), 1); @@ -3984,7 +3985,7 @@ TEST(SVSTieredIndexTest, testThreadPool) { // parallel_for works immediately with default parallelism 1 VecSimSVSThreadPool::resize(num_threads); - VecSimSVSThreadPool default_pool; + VecSimSVSThreadPool default_pool{allocator}; counter = 0; default_pool.parallel_for(task, 1); ASSERT_EQ(counter, 1); // 0+1 = 1 diff --git a/tests/unit/unit_test_utils.cpp b/tests/unit/unit_test_utils.cpp index 5c79167f8..65ef3c647 100644 --- a/tests/unit/unit_test_utils.cpp +++ b/tests/unit/unit_test_utils.cpp @@ -15,15 +15,21 @@ #include "VecSim/types/bfloat16.h" #include "VecSim/types/float16.h" #include "VecSim/algorithms/hnsw/hnsw_tiered.h" +#include "VecSim/algorithms/svs/svs_utils.h" using bfloat16 = vecsim_types::bfloat16; using float16 = vecsim_types::float16; -// Map index types to their expected number of debug iterator fields +// Expected number of fields per debug iterator, as emitted by the **C++** method +// (index->debugInfoIterator()). +// The **C** API wrapper VecSimIndex_DebugInfoIterator +// adds one extra GLOBAL_MEMORY field on top — call sites that compare against +// the C API iterator pass expect_global_memory=true so the comparator asserts +// COUNT + 1. namespace DebugInfoIteratorFieldCount { constexpr size_t FLAT = 11; constexpr size_t HNSW = 18; -constexpr size_t SVS = 25; +constexpr size_t SVS = 26; constexpr size_t TIERED_HNSW = 16; constexpr size_t TIERED_SVS = 18; } // namespace DebugInfoIteratorFieldCount @@ -274,8 +280,11 @@ template void runRangeTieredIndexSearchTest( const std::function &, size_t, VecSimQueryReply_Order, VecSimQueryParams *); -void compareFlatIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter) { - ASSERT_EQ(DebugInfoIteratorFieldCount::FLAT, VecSimDebugInfoIterator_NumberOfFields(infoIter)); +void compareFlatIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter, + bool expect_global_memory) { + size_t extra = expect_global_memory ? 1 : 0; + ASSERT_EQ(DebugInfoIteratorFieldCount::FLAT + extra, + VecSimDebugInfoIterator_NumberOfFields(infoIter)); while (VecSimDebugInfoIterator_HasNextField(infoIter)) { VecSim_InfoField *infoField = VecSimDebugInfoIterator_NextField(infoIter); if (!strcmp(infoField->fieldName, VecSimCommonStrings::ALGORITHM_STRING)) { @@ -326,14 +335,22 @@ void compareFlatIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIt // Memory. ASSERT_EQ(infoField->fieldType, INFOFIELD_UINT64); ASSERT_EQ(infoField->fieldValue.uintegerValue, info.commonInfo.memory); + } else if (!strcmp(infoField->fieldName, VecSimCommonStrings::GLOBAL_MEMORY_STRING)) { + // Top-level field unconditionally appended by VecSimIndex_DebugInfoIterator. + ASSERT_TRUE(expect_global_memory); + ASSERT_EQ(infoField->fieldType, INFOFIELD_UINT64); + ASSERT_EQ(infoField->fieldValue.uintegerValue, VecSim_GetGlobalMemory()); } else { FAIL(); } } } -void compareHNSWIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter) { - ASSERT_EQ(DebugInfoIteratorFieldCount::HNSW, VecSimDebugInfoIterator_NumberOfFields(infoIter)); +void compareHNSWIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter, + bool expect_global_memory) { + size_t extra = expect_global_memory ? 1 : 0; + ASSERT_EQ(DebugInfoIteratorFieldCount::HNSW + extra, + VecSimDebugInfoIterator_NumberOfFields(infoIter)); while (VecSimDebugInfoIterator_HasNextField(infoIter)) { VecSim_InfoField *infoField = VecSimDebugInfoIterator_NextField(infoIter); if (!strcmp(infoField->fieldName, VecSimCommonStrings::ALGORITHM_STRING)) { @@ -414,6 +431,11 @@ void compareHNSWIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIt ASSERT_EQ(infoField->fieldType, INFOFIELD_UINT64); ASSERT_EQ(infoField->fieldValue.uintegerValue, info.hnswInfo.numberOfMarkedDeletedNodes); + } else if (!strcmp(infoField->fieldName, VecSimCommonStrings::GLOBAL_MEMORY_STRING)) { + // Top-level field unconditionally appended by VecSimIndex_DebugInfoIterator. + ASSERT_TRUE(expect_global_memory); + ASSERT_EQ(infoField->fieldType, INFOFIELD_UINT64); + ASSERT_EQ(infoField->fieldValue.uintegerValue, VecSim_GetGlobalMemory()); } else { FAIL(); } @@ -424,6 +446,11 @@ void compareTieredIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimIndexDebugInfo frontendIndexInfo, VecSimIndexDebugInfo backendIndexInfo, VecSimDebugInfoIterator *infoIterator) { + // Iterators passed here are produced by the C++ debugInfoIterator() method, not the + // VecSimIndex_DebugInfoIterator C API, so the top-level GLOBAL_MEMORY field is + // never appended at this level. For SVS-backed tiered indexes the + // SHARED_SVS_THREADPOOL_MEMORY field is emitted by SVSIndex::debugInfoIterator() and + // therefore appears inside the nested BACKEND_INDEX iterator, not at this level. VecSimAlgo backendAlgo = backendIndexInfo.commonInfo.basicInfo.algo; if (backendAlgo == VecSimAlgo_HNSWLIB) { ASSERT_EQ(DebugInfoIteratorFieldCount::TIERED_HNSW, @@ -490,7 +517,8 @@ void compareTieredIndexInfoToIterator(VecSimIndexDebugInfo info, ASSERT_EQ(infoField->fieldValue.integerValue, info.tieredInfo.backgroundIndexing); } else if (!strcmp(infoField->fieldName, VecSimCommonStrings::FRONTEND_INDEX_STRING)) { ASSERT_EQ(infoField->fieldType, INFOFIELD_ITERATOR); - compareFlatIndexInfoToIterator(frontendIndexInfo, infoField->fieldValue.iteratorValue); + compareFlatIndexInfoToIterator(frontendIndexInfo, infoField->fieldValue.iteratorValue, + /*expect_global_memory=*/false); } else if (!strcmp(infoField->fieldName, VecSimCommonStrings::BACKEND_INDEX_STRING)) { ASSERT_EQ(infoField->fieldType, INFOFIELD_ITERATOR); chooseCompareIndexInfoToIterator(backendIndexInfo, infoField->fieldValue.iteratorValue); @@ -534,20 +562,28 @@ void compareTieredIndexInfoToIterator(VecSimIndexDebugInfo info, static void chooseCompareIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter) { + // Nested iterator built by the backend index directly (not via the C API), + // so the top-level GLOBAL_MEMORY field is not appended here. switch (info.commonInfo.basicInfo.algo) { case VecSimAlgo_HNSWLIB: - compareHNSWIndexInfoToIterator(info, infoIter); + compareHNSWIndexInfoToIterator(info, infoIter, /*expect_global_memory=*/false); break; case VecSimAlgo_SVS: - compareSVSIndexInfoToIterator(info, infoIter); + compareSVSIndexInfoToIterator(info, infoIter, /*expect_global_memory=*/false); break; default: FAIL(); } } -void compareSVSIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter) { - ASSERT_EQ(DebugInfoIteratorFieldCount::SVS, VecSimDebugInfoIterator_NumberOfFields(infoIter)); +void compareSVSIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter, + bool expect_global_memory) { + // GLOBAL_MEMORY is always appended by the C API VecSimIndex_DebugInfoIterator; + // nested backend iterators (built via the C++ method, e.g. when comparing the + // SVS backend of a tiered index) never have it appended. + size_t extra = expect_global_memory ? 1 : 0; + ASSERT_EQ(DebugInfoIteratorFieldCount::SVS + extra, + VecSimDebugInfoIterator_NumberOfFields(infoIter)); while (VecSimDebugInfoIterator_HasNextField(infoIter)) { VecSim_InfoField *infoField = VecSimDebugInfoIterator_NextField(infoIter); if (!strcmp(infoField->fieldName, VecSimCommonStrings::ALGORITHM_STRING)) { @@ -659,6 +695,18 @@ void compareSVSIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIte // SVS epsilon parameter. ASSERT_EQ(infoField->fieldType, INFOFIELD_FLOAT64); ASSERT_EQ(infoField->fieldValue.floatingPointValue, info.svsInfo.epsilon); + } else if (!strcmp(infoField->fieldName, + VecSimCommonStrings::SHARED_SVS_THREADPOOL_MEMORY_STRING)) { + // Always emitted by SVSIndex::debugInfoIterator(). + ASSERT_EQ(infoField->fieldType, INFOFIELD_UINT64); + ASSERT_EQ(infoField->fieldValue.uintegerValue, + VecSimSVSThreadPool::getSharedAllocationSize()); + } else if (!strcmp(infoField->fieldName, VecSimCommonStrings::GLOBAL_MEMORY_STRING)) { + // Top-level field unconditionally appended by the C API + // VecSimIndex_DebugInfoIterator; mirrors VecSim_GetGlobalMemory(). + ASSERT_TRUE(expect_global_memory); + ASSERT_EQ(infoField->fieldType, INFOFIELD_UINT64); + ASSERT_EQ(infoField->fieldValue.uintegerValue, VecSim_GetGlobalMemory()); } else { FAIL(); } @@ -756,6 +804,7 @@ std::vector getFlatFields() { auto commonFields = getCommonFields(); fields.insert(fields.end(), commonFields.begin(), commonFields.end()); fields.push_back(VecSimCommonStrings::BLOCK_SIZE_STRING); // BLOCK_SIZE + fields.push_back(VecSimCommonStrings::GLOBAL_MEMORY_STRING); return fields; } @@ -774,10 +823,12 @@ std::vector getHNSWFields() { fields.push_back(VecSimCommonStrings::HNSW_ENTRYPOINT); fields.push_back(VecSimCommonStrings::EPSILON_STRING); fields.push_back(VecSimCommonStrings::NUM_MARKED_DELETED); + fields.push_back(VecSimCommonStrings::GLOBAL_MEMORY_STRING); return fields; } -// Imitates SVSIndex::debugInfoIterator() +// Imitates SVSIndex::debugInfoIterator() followed by the +// top-level GLOBAL_MEMORY field appended by VecSimIndex_DebugInfoIterator. std::vector getSVSFields() { std::vector fields; fields.push_back(VecSimCommonStrings::ALGORITHM_STRING); // ALGORITHM @@ -799,6 +850,8 @@ std::vector getSVSFields() { fields.push_back(VecSimCommonStrings::SVS_SEARCH_BC_STRING); fields.push_back(VecSimCommonStrings::SVS_LEANVEC_DIM_STRING); fields.push_back(VecSimCommonStrings::EPSILON_STRING); + fields.push_back(VecSimCommonStrings::SHARED_SVS_THREADPOOL_MEMORY_STRING); + fields.push_back(VecSimCommonStrings::GLOBAL_MEMORY_STRING); return fields; } @@ -818,28 +871,29 @@ std::vector getTieredCommonFields() { return fields; } -// Imitates TieredSVSIndex::debugInfoIterator() +// Imitates TieredSVSIndex::debugInfoIterator() followed by +// the top-level GLOBAL_MEMORY field appended by VecSimIndex_DebugInfoIterator. +// SHARED_SVS_THREADPOOL_MEMORY is emitted by SVSIndex::debugInfoIterator() and +// therefore appears inside the BACKEND_INDEX nested iterator, not at this level. std::vector getTieredSVSFields() { auto fields = getTieredCommonFields(); // Add SVS tiered-specific fields: - fields.push_back( - VecSimCommonStrings::TIERED_SVS_TRAINING_THRESHOLD_STRING); // 15. - // TIERED_SVS_TRAINING_THRESHOLD - fields.push_back( - VecSimCommonStrings::TIERED_SVS_UPDATE_THRESHOLD_STRING); // 16. TIERED_SVS_UPDATE_THRESHOLD - fields.push_back( - VecSimCommonStrings:: - TIERED_SVS_THREADS_RESERVE_TIMEOUT_STRING); // 17. TIERED_SVS_THREADS_RESERVE_TIMEOUT + fields.push_back(VecSimCommonStrings::TIERED_SVS_TRAINING_THRESHOLD_STRING); + fields.push_back(VecSimCommonStrings::TIERED_SVS_UPDATE_THRESHOLD_STRING); + fields.push_back(VecSimCommonStrings::TIERED_SVS_THREADS_RESERVE_TIMEOUT_STRING); + fields.push_back(VecSimCommonStrings::GLOBAL_MEMORY_STRING); return fields; } -// Imitates TieredHNSWIndex::debugInfoIterator() +// Imitates TieredHNSWIndex::debugInfoIterator() followed by +// the top-level GLOBAL_MEMORY field appended by VecSimIndex_DebugInfoIterator. std::vector getTieredHNSWFields() { auto fields = getTieredCommonFields(); // Add HNSW tiered-specific field: fields.push_back( VecSimCommonStrings:: TIERED_HNSW_SWAP_JOBS_THRESHOLD_STRING); // 15. TIERED_HNSW_SWAP_JOBS_THRESHOLD + fields.push_back(VecSimCommonStrings::GLOBAL_MEMORY_STRING); return fields; } diff --git a/tests/unit/unit_test_utils.h b/tests/unit/unit_test_utils.h index 519a130f1..eaaba55ab 100644 --- a/tests/unit/unit_test_utils.h +++ b/tests/unit/unit_test_utils.h @@ -186,16 +186,26 @@ void compareSVSInfo(svsInfoStruct info1, svsInfoStruct info2); void validateSVSIndexAttributesInfo(svsInfoStruct info, SVSParams params); -void compareFlatIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter); - -void compareHNSWIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter); - +// expect_global_memory: when true, the iterator is the top-level one returned +// by the C API VecSimIndex_DebugInfoIterator, which always appends the GLOBAL_MEMORY +// field. Pass false when the iterator is a nested backend iterator built directly via +// the C++ debugInfoIterator() method (no GLOBAL_MEMORY appended at that level). +void compareFlatIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter, + bool expect_global_memory = true); + +void compareHNSWIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter, + bool expect_global_memory = true); + +// Always called with the C++ debugInfoIterator() iterator (no GLOBAL_MEMORY field at +// this level); the field-count assertion at the top of the function will fail loudly +// if a C API iterator is ever passed. void compareTieredIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimIndexDebugInfo frontendIndexInfo, VecSimIndexDebugInfo backendIndexInfo, VecSimDebugInfoIterator *infoIterator); -void compareSVSIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter); +void compareSVSIndexInfoToIterator(VecSimIndexDebugInfo info, VecSimDebugInfoIterator *infoIter, + bool expect_global_memory = true); void runRangeQueryTest(VecSimIndex *index, const void *query, double radius, const std::function &ResCB,