From cd60e824b8a9016e9e625b859180a8db3374214d Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 12 Feb 2026 11:08:03 +0000 Subject: [PATCH 1/4] Rename any cdb files outside of the manifest (#486) * Rename any cdb files outside of the manifest Rename them as *.bak files. This aligns with the ledger, which does the same thing at startup. These files are expected when a stop or crash happens during a journal compaction. In this case, files may be created - but not added to the manifest. These orphaned files will be in the post_compact folder. It is less expected for folders in the non-compact area, but can still happen when there is a crash during the delete_pending state. The garbage is not collected (deleted) - but this just makes it easier for OS admins to clear it, as they don't need erlang functions, *.bak files can be assumed to be garbage. * Update src/leveled_inker.erl Co-authored-by: Thomas Arts * Add comment following review * Format correctly --------- Co-authored-by: Thomas Arts --- src/leveled_inker.erl | 114 ++++++++++++++++++++--------- src/leveled_log.erl | 3 + test/end_to_end/recovery_SUITE.erl | 45 +++++++++++- 3 files changed, 126 insertions(+), 36 deletions(-) diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index ec5c7dfd..2542c22d 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -137,6 +137,9 @@ -define(WASTE_FP, "waste"). -define(JOURNAL_FILEX, "cdb"). -define(PENDING_FILEX, "pnd"). +-define(ARCHIVE_FILEX, "bak"). +% Note that archive means "no longer active", it is an indication of +% removable waste not of backup. -define(TEST_KC, {[], infinity}). -define(SHUTDOWN_LOOPS, 10). -define(SHUTDOWN_PAUSE, 10000). @@ -682,7 +685,7 @@ handle_call(roll, _From, State = #state{is_snapshot = Snap}) when }} end; handle_call( - {backup, BackupPath}, _from, State + {backup, BackupPath}, _From, State ) when State#state.is_snapshot == true -> @@ -699,20 +702,21 @@ handle_call( ExtendedBaseFN = BaseFN ++ "." ++ ?JOURNAL_FILEX, BackupName = filename:join(BackupJFP, BaseFN), true = leveled_cdb:finished_rolling(PidR), - case + Link = file:make_link( FN ++ "." ++ ?JOURNAL_FILEX, BackupName ++ "." ++ ?JOURNAL_FILEX - ) - of + ), + case Link of ok -> ok; {error, eexist} -> ok end, - {[{SQN, BackupName, PidR, LastKey} | ManAcc], [ - ExtendedBaseFN | FTRAcc - ]}; + { + [{SQN, BackupName, PidR, LastKey} | ManAcc], + [ExtendedBaseFN | FTRAcc] + }; false -> ?STD_LOG(i0021, [FN, SQN, State#state.journal_sqn]), {ManAcc, FTRAcc} @@ -1263,7 +1267,7 @@ close_allmanifest([H | ManifestT]) -> ) -> leveled_imanifest:manifest(). %% @doc -%% Open all the files in the manifets, and updating the manifest with the PIDs +%% Open all the files in the manifest, and updating the manifest with the PIDs %% of the opened files open_all_manifest([], RootPath, CDBOpts) -> ?STD_LOG(i0011, []), @@ -1273,10 +1277,12 @@ open_all_manifest([], RootPath, CDBOpts) -> true ); open_all_manifest(Man0, RootPath, CDBOpts) -> + OnDiskJournalSet = + sets:from_list(get_all_completejournals(RootPath), [{version, 2}]), Man1 = leveled_imanifest:to_list(Man0), [{HeadSQN, HeadFN, _IgnorePid, HeadLK} | ManifestTail] = Man1, OpenJournalFun = - fun(ManEntry) -> + fun(ManEntry, Acc) -> {LowSQN, FN, _, LK_RO} = ManEntry, CFN = FN ++ "." ++ ?JOURNAL_FILEX, PFN = FN ++ "." ++ ?PENDING_FILEX, @@ -1284,40 +1290,80 @@ open_all_manifest(Man0, RootPath, CDBOpts) -> true -> {ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN, LK_RO, CDBOpts), - {LowSQN, FN, Pid, LK_RO}; + { + {LowSQN, FN, Pid, LK_RO}, + sets:del_element(CFN, Acc) + }; false -> - W = leveled_cdb:cdb_open_writer(PFN, CDBOpts), - {ok, Pid} = W, + {ok, Pid} = leveled_cdb:cdb_open_writer(PFN, CDBOpts), ok = leveled_cdb:cdb_roll(Pid), LK_WR = leveled_cdb:cdb_lastkey(Pid), - {LowSQN, FN, Pid, LK_WR} + { + {LowSQN, FN, Pid, LK_WR}, + Acc + } end end, - OpenedTailAsList = lists:map(OpenJournalFun, ManifestTail), + { + OpenedTailAsList, + FilteredOnDiskJournalSet + } = + lists:mapfoldl(OpenJournalFun, OnDiskJournalSet, ManifestTail), OpenedTail = leveled_imanifest:from_list(OpenedTailAsList), CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX, PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX, - case filelib:is_file(CompleteHeadFN) of - true -> - ?STD_LOG(i0012, [HeadFN]), - {ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN), - LastKey = {LastSQN, _, _} = leveled_cdb:cdb_lastkey(HeadR), - ManToHead = + StartedManifest = + case filelib:is_file(CompleteHeadFN) of + true -> + ?STD_LOG(i0012, [HeadFN]), + {ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN), + LastKey = {LastSQN, _, _} = leveled_cdb:cdb_lastkey(HeadR), + ManToHead = + leveled_imanifest:add_entry( + OpenedTail, + {HeadSQN, HeadFN, HeadR, LastKey}, + true + ), + NewManEntry = + start_new_activejournal(LastSQN + 1, RootPath, CDBOpts), + leveled_imanifest:add_entry(ManToHead, NewManEntry, true); + false -> + {ok, HeadW} = + leveled_cdb:cdb_open_writer(PendingHeadFN, CDBOpts), leveled_imanifest:add_entry( - OpenedTail, - {HeadSQN, HeadFN, HeadR, LastKey}, - true - ), - NewManEntry = - start_new_activejournal(LastSQN + 1, RootPath, CDBOpts), - leveled_imanifest:add_entry(ManToHead, NewManEntry, true); - false -> - {ok, HeadW} = - leveled_cdb:cdb_open_writer(PendingHeadFN, CDBOpts), - leveled_imanifest:add_entry( - OpenedTail, {HeadSQN, HeadFN, HeadW, HeadLK}, true - ) - end. + OpenedTail, {HeadSQN, HeadFN, HeadW, HeadLK}, true + ) + end, + lists:foreach( + fun(FN) -> + NewName = + filename:flatten([filename:rootname(FN), "." ++ ?ARCHIVE_FILEX]), + ?STD_LOG(i0029, [FN]), + file:rename(FN, NewName) + end, + sets:to_list( + sets:del_element(CompleteHeadFN, FilteredOnDiskJournalSet) + ) + ), + StartedManifest. + +-spec get_all_completejournals(string()) -> list(file:filename()). +get_all_completejournals(RootPath) -> + JFiles = list_dir(filepath(RootPath, journal_dir)), + CFiles = list_dir(filepath(RootPath, journal_compact_dir)), + lists:filter( + fun(FN) -> + filename:extension(FN) == ("." ++ ?JOURNAL_FILEX) + end, + JFiles ++ CFiles + ). + +-spec list_dir(string()) -> list(file:filename()). +list_dir(Path) -> + {ok, Files} = file:list_dir(Path), + lists:map( + fun(FN) -> filename:join(Path, FN) end, Files + ). start_new_activejournal(SQN, RootPath, CDBOpts) -> Filename = filepath(RootPath, SQN, new_journal), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 4f055d11..9d25a70b 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -311,6 +311,9 @@ {debug, <<"Shutdown complete for cloned Inker for reason ~w">>}, i0028 => {debug, <<"Shutdown complete for Inker for reason ~w">>}, + i0029 => + {warning, + <<"Journal with FN=~s renamed to backup as not active in manifest">>}, ic001 => {info, <<"Closed for reason ~w so maybe leaving garbage">>}, ic002 => diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index c7cfe8fe..877ac1a8 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -859,8 +859,16 @@ aae_missingjournal(_Config) -> {async, AllHeadF2} = leveled_bookie:book_returnfolder( Bookie2, - {foldheads_allkeys, ?RIAK_TAG, FoldHeadsFun, true, true, false, - false, false} + { + foldheads_allkeys, + ?RIAK_TAG, + FoldHeadsFun, + true, + true, + false, + false, + false + } ), HeadL2 = length(AllHeadF2()), io:format("Fold head returned ~w objects~n", [HeadL2]), @@ -868,6 +876,39 @@ aae_missingjournal(_Config) -> true = HeadL2 > 0, ok = leveled_bookie:book_close(Bookie2), + + % Add extra journal file - check it gets switched to .bak + FNXJ = RootPath ++ "/journal/journal_files/extra_file", + FNXC = RootPath ++ "/journal/journal_files/post_compact/extra_file", + ok = file:write_file(FNXJ ++ ".cdb", <<"NotaCDB">>), + ok = file:write_file(FNXC ++ ".cdb", <<"NotaCDB">>), + {ok, _} = file:read_file_info(FNXJ ++ ".cdb"), + {ok, _} = file:read_file_info(FNXC ++ ".cdb"), + + {ok, Bookie3} = leveled_bookie:book_start(StartOpts), + {async, AllHeadF3} = + leveled_bookie:book_returnfolder( + Bookie3, + { + foldheads_allkeys, + ?RIAK_TAG, + FoldHeadsFun, + true, + true, + false, + false, + false + } + ), + HeadL3 = length(AllHeadF3()), + true = HeadL3 == HeadL2, + + ok = leveled_bookie:book_close(Bookie3), + + {error, enoent} = file:read_file_info(FNXJ ++ ".cdb"), + {error, enoent} = file:read_file_info(FNXJ ++ ".cdb"), + {ok, <<"NotaCDB">>} = file:read_file(FNXJ ++ ".bak"), + {ok, <<"NotaCDB">>} = file:read_file(FNXC ++ ".bak"), testutil:reset_filestructure(). simple_cachescoring(_Config) -> From 8bf9070d6fb5b476d9b5f891bd3c6c54193831b2 Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Thu, 12 Feb 2026 13:02:59 +0200 Subject: [PATCH 2/4] leveled_bookie:status/1 (#17) Return a map of status information about the bookie (when the bookie is not a snapshot). The content of the status may change in future releases. --------- Co-authored-by: Andriy Zavada Co-authored-by: Martin Sumner --- src/leveled_bookie.erl | 48 ++++++- src/leveled_cdb.erl | 12 ++ src/leveled_iclerk.erl | 125 +++++++++++------- src/leveled_monitor.erl | 220 ++++++++++++++++++++++++++++++-- src/leveled_pclerk.erl | 18 ++- src/leveled_penciller.erl | 26 +++- src/leveled_sst.erl | 12 ++ test/end_to_end/basic_SUITE.erl | 196 +++++++++++++++++++++++++++- 8 files changed, 593 insertions(+), 64 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 0c43c0f3..4e6dae7b 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -75,7 +75,8 @@ book_loglevel/2, book_addlogs/2, book_removelogs/2, - book_headstatus/1 + book_headstatus/1, + book_status/1 ]). %% folding API @@ -1316,6 +1317,24 @@ book_removelogs(Pid, ForcedLogs) -> book_headstatus(Pid) -> gen_server:call(Pid, head_status, infinity). +-spec book_status(pid()) -> map(). +%% @doc +%% Return a proplist containing the following items: +%% * current size of the ledger cache; +%% * number of active journal files; +%% * average compaction score for the journal; +%% * current distribution of files across the ledger (e.g. count of files by level); +%% * current size of the penciller in-memory cache; +%% * penciller work backlog status; +%% * last merge time (penciller); +%% * last compaction time (journal); +%% * last compaction result (journal) e.g. files compacted and compaction score; +%% * ratio of metadata to object size (recent PUTs); +%% * PUT/GET/HEAD recent time/count metrics; +%% * mean level for recent fetches. +book_status(Pid) -> + gen_server:call(Pid, status, infinity). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -1475,7 +1494,8 @@ handle_call( State#state.cache_size, State#state.cache_multiple, Cache0, - State#state.penciller + State#state.penciller, + State#state.monitor ) of {ok, Cache} -> @@ -1509,7 +1529,8 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) when State#state.cache_size, State#state.cache_multiple, Cache0, - State#state.penciller + State#state.penciller, + State#state.monitor ) of {ok, Cache} -> @@ -1686,7 +1707,8 @@ handle_call({compact_journal, Timeout}, From, State) when State#state.cache_size, State#state.cache_multiple, State#state.ledger_cache, - State#state.penciller + State#state.penciller, + State#state.monitor ) of {_, NewCache} -> @@ -1740,6 +1762,8 @@ handle_call(return_actors, _From, State) -> {reply, {ok, State#state.inker, State#state.penciller}, State}; handle_call(head_status, _From, State) -> {reply, {State#state.head_only, State#state.head_lookup}, State}; +handle_call(status, _From, State) -> + {reply, status(State), State}; handle_call(Msg, _From, State) -> {reply, {unsupported_message, element(1, Msg)}, State}. @@ -2877,7 +2901,11 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> end. -spec maybepush_ledgercache( - pos_integer(), pos_integer(), ledger_cache(), pid() + pos_integer(), + pos_integer(), + ledger_cache(), + pid(), + leveled_monitor:monitor() ) -> {ok | returned, ledger_cache()}. %% @doc @@ -2890,9 +2918,12 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> %% in the reply. Try again later when it isn't busy (and also potentially %% implement a slow_offer state to slow down the pace at which PUTs are being %% received) -maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller) -> +maybepush_ledgercache( + MaxCacheSize, MaxCacheMult, Cache, Penciller, {Monitor, _} +) -> Tab = Cache#ledger_cache.mem, CacheSize = ets:info(Tab, size), + leveled_monitor:add_stat(Monitor, {ledger_cache_size_update, CacheSize}), TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize, MaxCacheMult), if TimeToPush -> @@ -3048,6 +3079,11 @@ maybelog_snap_timing({Pid, _StatsFreq}, BookieTime, PCLTime) when maybelog_snap_timing(_Monitor, _, _) -> ok. +status(#state{monitor = {no_monitor, 0}}) -> + #{}; +status(#state{monitor = {Monitor, _}}) -> + leveled_monitor:get_bookie_status(Monitor). + %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 16590485..fe94d482 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -494,6 +494,8 @@ starting({call, From}, {open_writer, Filename}, State) -> starting({call, From}, {open_reader, Filename}, State) -> leveled_log:save(State#state.log_options), ?STD_LOG(cdb02, [Filename]), + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}), {Handle, Index, LastKey} = open_for_readonly(Filename, false), State0 = State#state{ handle = Handle, @@ -505,6 +507,8 @@ starting({call, From}, {open_reader, Filename}, State) -> starting({call, From}, {open_reader, Filename, LastKey}, State) -> leveled_log:save(State#state.log_options), ?STD_LOG(cdb02, [Filename]), + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}), {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), State0 = State#state{ handle = Handle, @@ -650,6 +654,8 @@ writer( ) when ?IS_DEF(LP) -> + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}), ok = leveled_iclerk:clerk_hashtablecalc( State#state.hashtree, LP, self() @@ -880,6 +886,8 @@ delete_pending( ) when ?IS_DEF(FN), ?IS_DEF(IO) -> + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, -1}), ?STD_LOG(cdb04, [FN, State#state.delete_point]), close_pendingdelete(IO, FN, State#state.waste_path), {stop, normal}; @@ -906,6 +914,10 @@ delete_pending( ), {keep_state_and_data, [?DELETE_TIMEOUT]}; false -> + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat( + Monitor, {n_active_journal_files_update, -1} + ), ?STD_LOG(cdb04, [FN, ManSQN]), close_pendingdelete(IO, FN, State#state.waste_path), {stop, normal} diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index d47c4516..29989368 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -421,46 +421,69 @@ handle_cast( CloseFun = ScoringState#scoring_state.close_fun, SW = ScoringState#scoring_state.start_time, ScoreParams = - {MaxRunLength, State#state.maxrunlength_compactionperc, - State#state.singlefile_compactionperc}, + { + MaxRunLength, + State#state.maxrunlength_compactionperc, + State#state.singlefile_compactionperc + }, {BestRun0, Score} = assess_candidates(Candidates, ScoreParams), ?TMR_LOG(ic003, [Score, length(BestRun0)], SW), - case Score > 0.0 of - true -> - BestRun1 = sort_run(BestRun0), - print_compaction_run(BestRun1, ScoreParams), - ManifestSlice = - compact_files( - BestRun1, - CDBopts, - FilterFun, - FilterServer, - MaxSQN, - State#state.reload_strategy, - State#state.compression_method - ), - FilesToDelete = - lists:map( - fun(C) -> - { - C#candidate.low_sqn, - C#candidate.filename, - C#candidate.journal, - undefined - } - end, - BestRun1 - ), - ?STD_LOG(ic002, [length(FilesToDelete)]), - ok = CloseFun(FilterServer), - ok = - leveled_inker:ink_clerkcomplete( - State#state.inker, ManifestSlice, FilesToDelete - ); - false -> - ok = CloseFun(FilterServer), - ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []) - end, + LRL = + case Score > 0.0 of + true -> + BestRun1 = sort_run(BestRun0), + print_compaction_run(BestRun1, ScoreParams), + ManifestSlice = + compact_files( + BestRun1, + CDBopts, + FilterFun, + FilterServer, + MaxSQN, + State#state.reload_strategy, + State#state.compression_method + ), + FilesToDelete = + lists:map( + fun(C) -> + { + C#candidate.low_sqn, + C#candidate.filename, + C#candidate.journal, + undefined + } + end, + BestRun1 + ), + ?STD_LOG(ic002, [length(FilesToDelete)]), + ok = CloseFun(FilterServer), + ok = + leveled_inker:ink_clerkcomplete( + State#state.inker, ManifestSlice, FilesToDelete + ), + length(BestRun0); + false -> + ok = CloseFun(FilterServer), + ok = + leveled_inker:ink_clerkcomplete(State#state.inker, [], []), + 0 + end, + {Monitor, _} = CDBopts#cdb_options.monitor, + {MaxScore, MeanScore} = calc_run_stats(Candidates), + {MegaST, SecST, MicroST} = ScoringState#scoring_state.start_time, + StartTimeMilli = (MegaST * 1000000 + SecST) * 1000 + (MicroST div 1000), + leveled_monitor:add_stat( + Monitor, + { + journal_compaction, + MaxScore, + MeanScore, + Score, + LRL, + os:system_time(millisecond) - StartTimeMilli, + StartTimeMilli + } + ), {noreply, State#state{scoring_state = undefined}, hibernate}; handle_cast( {trim, PersistedSQN, ManifestAsList}, State = #state{inker = Ink} @@ -584,6 +607,18 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) -> %%% Internal functions %%%============================================================================ +-spec calc_run_stats(list(candidate())) -> {float(), float()}. +calc_run_stats(Candidates) -> + case lists:map(fun(C) -> C#candidate.compaction_perc end, Candidates) of + L when length(L) > 0 -> + { + lists:max(L), + lists:sum(L) / length(L) + }; + _ -> + {0.0, 0.0} + end. + -spec check_single_file( pid(), leveled_inker:filterfun(), @@ -714,7 +749,7 @@ fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> %% %% Although this requires many loops over the list of the candidate, as the %% file scores have already been calculated the cost per loop should not be -%% a high burden. Reducing the maximum run length, will reduce the cost of +%% a high burden. Reducing the maximum run length, will reduce the cost if %% this exercise should be a problem. %% %% The score parameters are used to produce the score of the compaction run, @@ -756,7 +791,7 @@ assess_candidates(AllCandidates, Params) -> {list(candidate()), float()}. %% @doc %% For a given run length, calculate the scores for all consecutive runs of -%% files, comparing the score with the best run which has beens een so far. +%% files, comparing the score with the best run which has beens seen so far. %% The best is a tuple of the actual run of candidates, along with the score %% achieved for that run assess_for_runlength(RunLength, AllCandidates, Params, Best) -> @@ -775,7 +810,7 @@ assess_for_runlength(RunLength, AllCandidates, Params, Best) -> -spec score_run(list(candidate()), score_parameters()) -> float(). %% @doc %% Score a run. Caluclate the avergae score across all the files in the run, -%% and deduct that from a target score. Good candidate runs for comapction +%% and deduct that from a target score. Good candidate runs for compaction %% have larger (positive) scores. Bad candidate runs for compaction have %% negative scores. score_run([], _Params) -> @@ -1265,7 +1300,10 @@ check_single_file_test() -> Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS), ?assertMatch(37.5, Score1), LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end, - Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS), + Score2 = + check_single_file( + CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS + ), ?assertMatch(100.0, Score2), Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS), ?assertMatch(37.5, Score3), @@ -1414,7 +1452,8 @@ compact_empty_file_test() -> {3, {o, "Bucket", "Key3", null}} ], LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end, - Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS), + Score1 = + check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS), ?assert((+0.0 =:= Score1) orelse (-0.0 =:= Score1)), ok = leveled_cdb:cdb_deletepending(CDB2), ok = leveled_cdb:cdb_destroy(CDB2). diff --git a/src/leveled_monitor.erl b/src/leveled_monitor.erl index 34702c6e..792384d9 100644 --- a/src/leveled_monitor.erl +++ b/src/leveled_monitor.erl @@ -40,7 +40,8 @@ log_level/2, log_add/2, log_remove/2, - get_defaults/0 + get_defaults/0, + get_bookie_status/1 ]). -define(LOG_LIST, [ @@ -54,6 +55,39 @@ ]). -define(LOG_FREQUENCY_SECONDS, 30). +-define(INITIAL_BOOKIE_STATUS, #{ + fetch_count_by_level => + #{ + not_found => #{count => 0, time => 0}, + mem => #{count => 0, time => 0}, + lower => #{count => 0, time => 0}, + '0' => #{count => 0, time => 0}, + '1' => #{count => 0, time => 0}, + '2' => #{count => 0, time => 0}, + '3' => #{count => 0, time => 0} + }, + get_body_time => 0, + get_sample_count => 0, + head_rsp_time => 0, + head_sample_count => 0, + journal_last_compaction_time => undefined, + journal_last_compaction_duration => undefined, + journal_last_compaction_score => undefined, + journal_last_compaction_max => undefined, + journal_last_compaction_mean => undefined, + journal_last_compaction_runlength => undefined, + ledger_cache_size => undefined, + level_files_count => #{}, + n_active_journal_files => 1, + penciller_inmem_cache_size => undefined, + penciller_last_merge_time => undefined, + penciller_work_backlog_status => undefined, + put_ink_time => 0, + put_mem_time => 0, + put_prep_time => 0, + put_sample_count => 0 +}). + -record(bookie_get_timings, { sample_count = 0 :: non_neg_integer(), head_time = 0 :: non_neg_integer(), @@ -66,7 +100,6 @@ sample_count = 0 :: non_neg_integer(), cache_count = 0 :: non_neg_integer(), found_count = 0 :: non_neg_integer(), - cache_hits = 0 :: non_neg_integer(), fetch_ledger_time = 0 :: non_neg_integer(), fetch_ledgercache_time = 0 :: non_neg_integer(), rsp_time = 0 :: non_neg_integer(), @@ -130,6 +163,40 @@ sample_start_time = os:timestamp() :: erlang:timestamp() }). +-type bookie_status() :: #{ + ledger_cache_size => undefined | non_neg_integer(), + n_active_journal_files => pos_integer(), + level_files_count => #{non_neg_integer() => non_neg_integer()}, + penciller_inmem_cache_size => undefined | pos_integer(), + penciller_work_backlog_status => + undefined | {non_neg_integer(), boolean(), boolean()}, + penciller_last_merge_time => undefined | integer(), + journal_last_compaction_time => undefined | pos_integer(), + journal_last_compaction_duration => undefined | non_neg_integer(), + journal_last_compaction_score => undefined | float(), + journal_last_compaction_max => undefined | float(), + journal_last_compaction_mean => undefined | float(), + journal_last_compaction_runlength => undefined | non_neg_integer(), + fetch_count_by_level => + undefined + | #{ + reporting_fetch_level() => #{ + count => non_neg_integer(), + time => non_neg_integer() + } + }, + get_body_time => undefined | non_neg_integer(), + get_sample_count => non_neg_integer(), + head_rsp_time => undefined | non_neg_integer(), + head_sample_count => non_neg_integer(), + put_ink_time => undefined | non_neg_integer(), + put_mem_time => undefined | non_neg_integer(), + put_prep_time => undefined | non_neg_integer(), + put_sample_count => non_neg_integer() +}. +-type reporting_fetch_level() :: + not_found | mem | '0' | '1' | '2' | '3' | lower. + -record(state, { bookie_get_timings = #bookie_get_timings{} :: bookie_get_timings(), bookie_head_timings = #bookie_head_timings{} :: bookie_head_timings(), @@ -139,7 +206,8 @@ sst_fetch_timings = [] :: list(sst_fetch_timings()), cdb_get_timings = #cdb_get_timings{} :: cdb_get_timings(), log_frequency = ?LOG_FREQUENCY_SECONDS :: pos_integer(), - log_order = [] :: list(log_type()) + log_order = [] :: list(log_type()), + bookie_status :: bookie_status() }). -type bookie_get_timings() :: #bookie_get_timings{}. @@ -181,6 +249,26 @@ microsecs()}. -type cdb_get_update() :: {cdb_get_update, pos_integer(), microsecs(), microsecs()}. +-type bookie_status_update() :: + {ledger_cache_size_update, pos_integer()} + | {n_active_journal_files_update, integer()} + | {avg_compaction_score_update, float()} + | {level_files_count_update, #{non_neg_integer() => pos_integer()}, + TS :: non_neg_integer()} + | {penciller_inmem_cache_size_update, pos_integer()} + | {penciller_work_backlog_status_update, { + non_neg_integer(), boolean(), boolean() + }} + | { + journal_compaction, + float(), + float(), + float(), + pos_integer(), + non_neg_integer(), + pos_integer() + } + | {metadata_objsize_ratio_update, not_implemented}. -type statistic() :: bookie_get_update() | bookie_head_update() @@ -188,7 +276,8 @@ | bookie_snap_update() | pcl_fetch_update() | sst_fetch_update() - | cdb_get_update(). + | cdb_get_update() + | bookie_status_update(). -export_type([monitor/0, timing/0, sst_fetch_type/0, log_type/0]). @@ -204,7 +293,9 @@ monitor_start(LogFreq, LogOrder) -> ), {ok, Monitor}. --spec add_stat(pid(), statistic()) -> ok. +-spec add_stat(no_monitor | pid(), statistic()) -> ok. +add_stat(no_monitor, _Statistic) -> + ok; add_stat(Watcher, Statistic) -> gen_server:cast(Watcher, Statistic). @@ -230,6 +321,10 @@ log_add(Pid, ForcedLogs) -> log_remove(Pid, ForcedLogs) -> gen_server:cast(Pid, {log_remove, ForcedLogs}). +-spec get_bookie_status(pid()) -> bookie_status(). +get_bookie_status(Pid) -> + gen_server:call(Pid, get_bookie_status). + -spec maybe_time(monitor()) -> erlang:timestamp() | no_timing. maybe_time({_Pid, TimingProbability}) -> case rand:uniform(100) of @@ -272,8 +367,66 @@ init([LogOpts, LogFrequency, LogOrder]) -> ), InitialJitter = rand:uniform(2 * 1000 * LogFrequency), erlang:send_after(InitialJitter, self(), report_next_stats), - {ok, #state{log_frequency = LogFrequency, log_order = RandomLogOrder}}. - + {ok, #state{ + log_frequency = LogFrequency, + log_order = RandomLogOrder, + bookie_status = ?INITIAL_BOOKIE_STATUS + }}. + +handle_call( + get_bookie_status, + _From, + #state{ + bookie_status = BS, + bookie_get_timings = GT, + bookie_put_timings = PT, + bookie_head_timings = HT, + pcl_fetch_timings = PFT + } = State +) -> + FCL = #{ + not_found => #{ + count => PFT#pcl_fetch_timings.notfound_count, + time => PFT#pcl_fetch_timings.notfound_time + }, + mem => #{ + count => PFT#pcl_fetch_timings.foundmem_count, + time => PFT#pcl_fetch_timings.foundmem_time + }, + '0' => #{ + count => PFT#pcl_fetch_timings.found0_count, + time => PFT#pcl_fetch_timings.found0_time + }, + '1' => #{ + count => PFT#pcl_fetch_timings.found1_count, + time => PFT#pcl_fetch_timings.found1_time + }, + '2' => #{ + count => PFT#pcl_fetch_timings.found2_count, + time => PFT#pcl_fetch_timings.found2_time + }, + '3' => #{ + count => PFT#pcl_fetch_timings.found3_count, + time => PFT#pcl_fetch_timings.found3_time + }, + lower => #{ + count => PFT#pcl_fetch_timings.foundlower_count, + time => PFT#pcl_fetch_timings.foundlower_time + } + }, + StatusEnriched = + BS#{ + get_sample_count => GT#bookie_get_timings.sample_count, + get_body_time => GT#bookie_get_timings.body_time, + head_sample_count => HT#bookie_head_timings.sample_count, + head_rsp_time => HT#bookie_head_timings.rsp_time, + put_sample_count => PT#bookie_put_timings.sample_count, + put_prep_time => PT#bookie_put_timings.prep_time, + put_ink_time => PT#bookie_put_timings.ink_time, + put_mem_time => PT#bookie_put_timings.mem_time, + fetch_count_by_level => FCL + }, + {reply, StatusEnriched, State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -633,7 +786,51 @@ handle_cast({log_add, ForcedLogs}, State) -> {noreply, State}; handle_cast({log_remove, ForcedLogs}, State) -> ok = leveled_log:remove_forcedlogs(ForcedLogs), - {noreply, State}. + {noreply, State}; +handle_cast({ledger_cache_size_update, A}, State = #state{bookie_status = BS}) -> + {noreply, State#state{bookie_status = BS#{ledger_cache_size => A}}}; +handle_cast( + {n_active_journal_files_update, Delta}, State = #state{bookie_status = BS0} +) -> + A = maps:get(n_active_journal_files, BS0), + BS = maps:put(n_active_journal_files, A + Delta, BS0), + {noreply, State#state{bookie_status = BS}}; +handle_cast( + {level_files_count_update, U, TS}, State = #state{bookie_status = BS0} +) -> + A = maps:get(level_files_count, BS0), + BS1 = maps:put(level_files_count, maps:merge(A, U), BS0), + BS2 = maps:put(penciller_last_merge_time, TS, BS1), + {noreply, State#state{bookie_status = BS2}}; +handle_cast( + {penciller_inmem_cache_size_update, A}, State = #state{bookie_status = BS} +) -> + {noreply, State#state{bookie_status = BS#{penciller_inmem_cache_size => A}}}; +handle_cast( + {penciller_work_backlog_status_update, A}, + State = #state{bookie_status = BS} +) -> + {noreply, State#state{ + bookie_status = BS#{penciller_work_backlog_status => A} + }}; +handle_cast( + {journal_compaction, MaxScore, MeanScore, Score, LRL, Duration, StartTime}, + State = #state{bookie_status = BS} +) -> + { + noreply, + State#state{ + bookie_status = + BS#{ + journal_last_compaction_time => StartTime, + journal_last_compaction_duration => Duration, + journal_last_compaction_score => Score, + journal_last_compaction_max => MaxScore, + journal_last_compaction_mean => MeanScore, + journal_last_compaction_runlength => LRL + } + } + }. handle_info(report_next_stats, State) -> erlang:send_after( @@ -664,7 +861,12 @@ code_change(_OldVsn, State, _Extra) -> coverage_cheat_test() -> {ok, M} = monitor_start(1, []), timer:sleep(2000), - {ok, _State1} = code_change(null, #state{}, null), + {ok, _State1} = + code_change( + null, + #state{bookie_status = ?INITIAL_BOOKIE_STATUS}, + null + ), ok = add_stat(M, {pcl_fetch_update, 4, 100}), ok = report_stats(M, pcl_fetch), % Can close, so empty log_order hasn't crashed diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 61e4313c..197ea21d 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -379,15 +379,27 @@ merge_limit(SrcLevel, SinkListLength, MMB) when is_integer(MMB) -> list(leveled_sst:sst_pointer()) }. do_merge( - [], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions, _Max + [], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, SSTOpts, Additions, _Max ) -> + {Monitor, _} = SSTOpts#sst_options.monitor, + leveled_monitor:add_stat( + Monitor, + {level_files_count_update, #{SinkLevel => length(Additions)}, + os:system_time(millisecond)} + ), ?STD_LOG(pc011, [NewSQN, SinkLevel, length(Additions), full]), {lists:reverse(Additions), [], []}; do_merge( - KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max + KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, SSTOpts, Additions, Max ) when length(Additions) >= Max -> + {Monitor, _} = SSTOpts#sst_options.monitor, + leveled_monitor:add_stat( + Monitor, + {level_files_count_update, #{SinkLevel => length(Additions)}, + os:system_time(millisecond)} + ), ?STD_LOG(pc011, [NewSQN, SinkLevel, length(Additions), partial]), FNSrc = leveled_penciller:sst_filename( @@ -400,7 +412,7 @@ do_merge( {ExpandedKL1, []} = split_unexpanded_files(KL1), {ExpandedKL2, L2FilePointersRem} = split_unexpanded_files(KL2), TS1 = os:timestamp(), - InfOpts = OptsSST#sst_options{max_sstslots = infinity}, + InfOpts = SSTOpts#sst_options{max_sstslots = infinity}, % Need to be careful to make sure all the remainder goes in one file, % could be situations whereby the max_sstslots has been changed between % restarts - and so there is too much data for one file in the diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 38b435a4..745e05bd 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -790,6 +790,10 @@ handle_call( State#state.levelzero_index, length(State#state.levelzero_cache) + 1 ), + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat( + Monitor, {penciller_inmem_cache_size_update, NewL0Size} + ), Subs = [NewL0Size, true, true, MinSQN, MaxSQN], ?RND_LOG(p0031, Subs, SW, 0.1), {reply, ok, State#state{ @@ -1249,7 +1253,12 @@ handle_cast( }}; handle_cast( work_for_clerk, - State = #state{manifest = Man, levelzero_cache = L0Cache, clerk = Clerk} + State = #state{ + manifest = Man, + levelzero_cache = L0Cache, + clerk = Clerk, + monitor = {Monitor, _} + } ) when ?IS_DEF(Man), ?IS_DEF(L0Cache), ?IS_DEF(Clerk) -> @@ -1281,9 +1290,19 @@ handle_cast( {WL, WC} = leveled_pmanifest:check_for_work(Man), case {WC, (CacheAlreadyFull or CacheOverSize)} of {0, false} -> + leveled_monitor:add_stat( + Monitor, + {penciller_work_backlog_status_update, + {0, false, false}} + ), % No work required {noreply, State#state{work_backlog = false}}; {WC, true} when WC < ?WORKQUEUE_BACKLOG_TOLERANCE -> + leveled_monitor:add_stat( + Monitor, + {penciller_work_backlog_status_update, + {WC, false, true}} + ), % Rolling the memory to create a new Level Zero file % Must not do this if there is a work backlog beyond the % tolerance, as then the backlog may never be addressed. @@ -1309,6 +1328,11 @@ handle_cast( % L0 work to do, or because the backlog has grown beyond % tolerance Backlog = WC >= ?WORKQUEUE_BACKLOG_TOLERANCE, + leveled_monitor:add_stat( + Monitor, + {penciller_work_backlog_status_update, + {WC, Backlog, L0Full}} + ), ?STD_LOG(p0024, [WC, Backlog, L0Full]), [TL | _Tail] = WL, ok = leveled_pclerk:clerk_push(Clerk, {TL, Man}), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 65faf960..40c60d58 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -655,6 +655,15 @@ starting( Level ), Summary = UpdState#state.summary, + + if + Level == 0 -> + leveled_monitor:add_stat( + element(1, Monitor), {penciller_inmem_cache_size_update, 0} + ); + el /= se -> + noop + end, ?TMR_LOG(sst08, [ActualFilename, Level, Summary#summary.max_sqn], SW), erlang:send_after(?STARTUP_TIMEOUT, self(), start_complete), {next_state, reader, @@ -753,6 +762,9 @@ starting(cast, complete_l0startup, State) -> Summary = UpdState#state.summary, Time4 = timer:now_diff(os:timestamp(), SW4), + leveled_monitor:add_stat( + element(1, Monitor), {penciller_inmem_cache_size_update, 0} + ), ?TMR_LOG(sst08, [ActualFilename, 0, Summary#summary.max_sqn], SW0), ?STD_LOG(sst11, [Time0, Time1, Time2, Time3, Time4]), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 1b903c20..65c3c7f1 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -15,7 +15,8 @@ bigsst_littlesst/1, safereaderror_startup/1, remove_journal_test/1, - bigpcl_bucketlist/1 + bigpcl_bucketlist/1, + bookie_status_report/1 ]). all() -> @@ -33,7 +34,8 @@ all() -> bigsst_littlesst, safereaderror_startup, remove_journal_test, - bigpcl_bucketlist + bigpcl_bucketlist, + bookie_status_report ]. init_per_suite(Config) -> @@ -43,6 +45,196 @@ init_per_suite(Config) -> end_per_suite(Config) -> testutil:end_per_suite(Config). +bookie_status_report(_Config) -> + RootPath = testutil:reset_filestructure(), + StartOpts = + [ + {root_path, RootPath}, + {sync_strategy, testutil:sync_strategy()}, + {log_level, info}, + %% ensure all stats are always collected + {stats_percentage, 100}, + %% to trigger penciller merge events sooner + {max_pencillercachesize, 16000}, + %% to create more journal files (exactly 8 for the 80k of + %% keys loaded in the test) + {max_journalobjectcount, 10000}, + {forced_logs, []} + ], + {ok, Bookie} = leveled_bookie:book_start(StartOpts), + + InitialReport = + #{ + ledger_cache_size => undefined, + n_active_journal_files => 1, + level_files_count => #{}, + penciller_inmem_cache_size => undefined, + penciller_work_backlog_status => {0, false, false}, + penciller_last_merge_time => undefined, + journal_last_compaction_time => undefined, + journal_last_compaction_duration => undefined, + journal_last_compaction_score => undefined, + journal_last_compaction_max => undefined, + journal_last_compaction_mean => undefined, + journal_last_compaction_runlength => undefined, + fetch_count_by_level => + #{ + not_found => #{count => 0, time => 0}, + mem => #{count => 0, time => 0}, + lower => #{count => 0, time => 0}, + '0' => #{count => 0, time => 0}, + '1' => #{count => 0, time => 0}, + '2' => #{count => 0, time => 0}, + '3' => #{count => 0, time => 0} + }, + get_sample_count => 0, + get_body_time => 0, + head_sample_count => 0, + head_rsp_time => 0, + put_sample_count => 0, + put_prep_time => 0, + put_ink_time => 0, + put_mem_time => 0 + }, + InitialReport = leveled_bookie:book_status(Bookie), + io:format("\nInitial report, before any IO\n~p\n", [InitialReport]), + check_n_journal_files(RootPath, InitialReport), + + {TObj, TSpec} = testutil:generate_testobject(), + ok = testutil:book_riakput(Bookie, TObj, TSpec), + + Rep1 = leveled_bookie:book_status(Bookie), + 1 = maps:get(ledger_cache_size, Rep1), + 1 = maps:get(put_sample_count, Rep1), + GoodPutPrepTime = 10000, + GoodPutInkTime = 10000, + GoodPutMemTime = 100, + within_range(1, GoodPutPrepTime, maps:get(put_prep_time, Rep1)), + within_range(1, GoodPutInkTime, maps:get(put_ink_time, Rep1)), + within_range(1, GoodPutMemTime, maps:get(put_mem_time, Rep1)), + #{} = maps:get(level_files_count, Rep1), + check_n_journal_files(RootPath, Rep1), + + {r_object, TBkt, TKey, _, _, _, _} = TObj, + {ok, _} = testutil:book_riakget(Bookie, TBkt, TKey), + Rep2 = leveled_bookie:book_status(Bookie), + io:format("\nReport after a single PUT+GET\n~p\n", [Rep2]), + 1 = maps:get(get_sample_count, Rep2), + GoodGetBodyTime = 500, + within_range(1, GoodGetBodyTime, maps:get(get_body_time, Rep2)), + #{} = maps:get(level_files_count, Rep2), + check_n_journal_files(RootPath, Rep2), + + io:format("Prompt journal compaction~n"), + CompactionStarted1 = os:system_time(millisecond), + ok = leveled_bookie:book_compactjournal(Bookie, 30000), + testutil:wait_for_compaction(Bookie), + + Rep3 = leveled_bookie:book_status(Bookie), + io:format("\nReport after first compaction:\n~p\n", [Rep3]), + +0.0 = maps:get(journal_last_compaction_score, Rep3), + within_range( + CompactionStarted1, + os:system_time(millisecond), + maps:get(journal_last_compaction_time, Rep3) + ), + #{} = maps:get(level_files_count, Rep3), + undefined = maps:get(penciller_inmem_cache_size, Rep3), + undefined = maps:get(penciller_last_merge_time, Rep3), + check_n_journal_files(RootPath, Rep3), + + io:format("Load 80K objects and then delete them~n"), + testutil:load_objects( + 20000, + [binary_uuid, binary_uuid, binary_uuid, binary_uuid], + Bookie, + no_check, + fun testutil:generate_compressibleobjects/2 + ), + FoldKeysFun = fun(B, K, Acc) -> [{B, K} | Acc] end, + {async, F1} = + leveled_bookie:book_keylist(Bookie, o_rkv, {FoldKeysFun, []}), + KL1 = F1(), + lists:foreach( + fun({Bucket, Key}) -> + testutil:book_riakdelete(Bookie, Bucket, Key, []) + end, + KL1 + ), + + Rep4 = leveled_bookie:book_status(Bookie), + io:format("\nReport after loading 80K objects:\n~p\n", [Rep4]), + %% we have reduced max penciller cache size to make it certain a + %% merge occurs after so many PUTs + within_range( + CompactionStarted1, + os:system_time(millisecond), + maps:get(penciller_last_merge_time, Rep4) + ), + check_n_journal_files(RootPath, Rep4), + + io:format("Prompt journal compaction again~n"), + CompactionStarted2 = os:system_time(millisecond), + ok = leveled_bookie:book_compactjournal(Bookie, 30000), + testutil:wait_for_compaction(Bookie), + + Rep5 = leveled_bookie:book_status(Bookie), + io:format("\nReport after second compaction:\n~p\n", [Rep5]), + #{1 := 1} = maps:get(level_files_count, Rep5), + within_range( + CompactionStarted2, + os:system_time(millisecond), + maps:get(journal_last_compaction_time, Rep5) + ), + JLCRScore = maps:get(journal_last_compaction_score, Rep5), + within_range(0.0, 100.0, JLCRScore), + within_range(20.0, 90.0, maps:get(journal_last_compaction_mean, Rep5)), + true = 100.0 >= maps:get(journal_last_compaction_max, Rep5), + JLCRRL = maps:get(journal_last_compaction_runlength, Rep5), + within_range(7, 9, JLCRRL), + within_range( + 0, + 40000, + maps:get(penciller_inmem_cache_size, Rep5) + ), + check_n_journal_files(RootPath, Rep5), + + io:format("Sleeping 10s to see 8 files are actually deleted\n", []), + timer:sleep(_DELETE_TIMEOUT = 10_000 + 1_000), + + Rep6 = leveled_bookie:book_status(Bookie), + check_n_journal_files(RootPath, Rep6), + + SnapOpts = [{snapshot_bookie, Bookie}], + {ok, BookSnap} = leveled_bookie:book_start(SnapOpts), + #{} = leveled_bookie:book_status(BookSnap), + ok = leveled_bookie:book_close(BookSnap), + + io:format("\nClosing book and reopening\n", []), + ok = leveled_bookie:book_close(Bookie), + {ok, Bookie2} = leveled_bookie:book_start(StartOpts), + Rep7 = leveled_bookie:book_status(Bookie2), + check_n_journal_files(RootPath, Rep7), + + ok = leveled_bookie:book_destroy(Bookie2). + +within_range(Min, Max, V) -> + true = Min =< V, + true = Max >= V. + +check_n_journal_files(RootPath, Rep) -> + A = length( + filelib:wildcard(RootPath ++ "/journal/journal_files/*.{cdb,pnd}") + ), + B = length( + filelib:wildcard( + RootPath ++ "/journal/journal_files/post_compact/*.{cdb,pnd}" + ) + ), + C = maps:get(n_active_journal_files, Rep), + io:format("journal files: ~b (reported: ~b)\n", [A + B, C]), + C = A + B. + simple_put_fetch_head_delete(_Config) -> io:format("simple test with info and no forced logs~n"), simple_test_withlog(info, []), From 490ec56520bd36ca4cf9e84b52d97cbcddd7708c Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 13 Feb 2026 13:33:11 +0000 Subject: [PATCH 3/4] Handle coverage issue Issue with cdb_checkhashtable/1 being complete immediately. So add some more keys, so that it won't be. --- src/leveled_inker.erl | 57 +++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 2542c22d..728e6b65 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -1576,18 +1576,33 @@ build_dummy_journal(KeyConvertF) -> ok = filelib:ensure_dir(ManifestFP), F1 = filename:join(JournalFP, "nursery_1.pnd"), {ok, J1} = leveled_cdb:cdb_open_writer(F1), + %% Load some dummmy keys to avoid timing issues when scenarios are not + %% triggered due to hashtable being calculated too fast + lists:foreach( + fun(I) -> + DK = lists:flatten(io_lib:format("DummmyK~6..0w", [I])), + DV = lists:flatten(io_lib:format("TestValue~6..0w", [I])), + leveled_cdb:cdb_put( + J1, + {I, stnd, KeyConvertF(DK)}, + create_value_for_journal({DV, ?TEST_KC}, false) + ) + end, + lists:seq(1, 1000) + ), + {K1, V1} = {KeyConvertF("Key1"), "TestValue1"}, {K2, V2} = {KeyConvertF("Key2"), "TestValue2"}, ok = leveled_cdb:cdb_put( J1, - {1, stnd, K1}, + {1001, stnd, K1}, create_value_for_journal({V1, ?TEST_KC}, false) ), ok = leveled_cdb:cdb_put( J1, - {2, stnd, K2}, + {1002, stnd, K2}, create_value_for_journal({V2, ?TEST_KC}, false) ), ok = leveled_cdb:cdb_roll(J1), @@ -1618,20 +1633,20 @@ build_dummy_journal(KeyConvertF) -> ok = leveled_cdb:cdb_put( J2, - {3, stnd, K1}, + {1003, stnd, K1}, create_value_for_journal({V3, ?TEST_KC}, false) ), ok = leveled_cdb:cdb_put( J2, - {4, stnd, K4}, + {1004, stnd, K4}, create_value_for_journal({V4, ?TEST_KC}, false) ), LK2 = leveled_cdb:cdb_lastkey(J2), ok = leveled_cdb:cdb_close(J2), Manifest = [ {1, "test/test_area/journal/journal_files/nursery_1", "pid1", LK1}, - {3, "test/test_area/journal/journal_files/nursery_3", "pid2", LK2} + {1003, "test/test_area/journal/journal_files/nursery_3", "pid2", LK2} ], ManifestBin = term_to_binary(Manifest), {ok, MF1} = file:open( @@ -1671,12 +1686,12 @@ simple_inker_test() -> compression_method = native, compress_on_receipt = true }), - Obj1 = ink_get(Ink1, key_converter("Key1"), 1), - ?assertMatch(Obj1, {{1, key_converter("Key1")}, {"TestValue1", ?TEST_KC}}), - Obj3 = ink_get(Ink1, key_converter("Key1"), 3), - ?assertMatch(Obj3, {{3, key_converter("Key1")}, {"TestValue3", ?TEST_KC}}), - Obj4 = ink_get(Ink1, key_converter("Key4"), 4), - ?assertMatch(Obj4, {{4, key_converter("Key4")}, {"TestValue4", ?TEST_KC}}), + Obj1 = ink_get(Ink1, key_converter("Key1"), 1001), + ?assertMatch(Obj1, {{1001, key_converter("Key1")}, {"TestValue1", ?TEST_KC}}), + Obj3 = ink_get(Ink1, key_converter("Key1"), 1003), + ?assertMatch(Obj3, {{1003, key_converter("Key1")}, {"TestValue3", ?TEST_KC}}), + Obj4 = ink_get(Ink1, key_converter("Key4"), 1004), + ?assertMatch(Obj4, {{1004, key_converter("Key4")}, {"TestValue4", ?TEST_KC}}), ink_close(Ink1), clean_testdir(RootPath). @@ -1697,10 +1712,10 @@ simple_inker_completeactivejournal_test() -> compression_method = native, compress_on_receipt = true }), - Obj1 = ink_get(Ink1, key_converter("Key1"), 1), - ?assertMatch(Obj1, {{1, key_converter("Key1")}, {"TestValue1", ?TEST_KC}}), - Obj2 = ink_get(Ink1, key_converter("Key4"), 4), - ?assertMatch(Obj2, {{4, key_converter("Key4")}, {"TestValue4", ?TEST_KC}}), + Obj1 = ink_get(Ink1, key_converter("Key1"), 1001), + ?assertMatch(Obj1, {{1001, key_converter("Key1")}, {"TestValue1", ?TEST_KC}}), + Obj2 = ink_get(Ink1, key_converter("Key4"), 1004), + ?assertMatch(Obj2, {{1004, key_converter("Key4")}, {"TestValue4", ?TEST_KC}}), ink_close(Ink1), clean_testdir(RootPath). @@ -1738,12 +1753,12 @@ compact_journal_testto(WRP, ExpectedFiles) -> {[], infinity}, true ), - ?assertMatch(NewSQN1, 5), + ?assertMatch(NewSQN1, 1005), ok = ink_printmanifest(Ink1), - R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5), + R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 1005), ?assertMatch( R0, - {{5, test_ledgerkey("KeyAA")}, {"TestValueAA", {[], infinity}}} + {{1005, test_ledgerkey("KeyAA")}, {"TestValueAA", {[], infinity}}} ), FunnyLoop = lists:seq(1, 48), Checker = lists:map( @@ -1767,14 +1782,14 @@ compact_journal_testto(WRP, ExpectedFiles) -> {[], infinity}, true ), - ?assertMatch(NewSQN2, 54), + ?assertMatch(NewSQN2, 1054), ActualManifest = ink_getmanifest(Ink1), ok = ink_printmanifest(Ink1), ?assertMatch(3, length(ActualManifest)), {ok, _ICL1} = ink_compactjournal( Ink1, Checker, - fun(X) -> {X, 55} end, + fun(X) -> {X, 1055} end, fun(_F) -> ok end, fun(L, K, SQN) -> case lists:member({SQN, K}, L) of @@ -1791,7 +1806,7 @@ compact_journal_testto(WRP, ExpectedFiles) -> {ok, _ICL2} = ink_compactjournal( Ink1, Checker2, - fun(X) -> {X, 55} end, + fun(X) -> {X, 1055} end, fun(_F) -> ok end, fun(L, K, SQN) -> case lists:member({SQN, K}, L) of From 0fe60b2ef5b1d6edfbdbde4f07c8dbdaf5157830 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 13 Feb 2026 13:34:17 +0000 Subject: [PATCH 4/4] On cdb_roll and delete_pending There should still be a delete timeout if delete_pending occurs during cdb_roll --- src/leveled_cdb.erl | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index fe94d482..a825df28 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -724,7 +724,12 @@ rolling( }, case State#state.deferred_delete of true -> - {next_state, delete_pending, State0, [{reply, From, ok}]}; + { + next_state, + delete_pending, + State0, + [{reply, From, ok}, ?DELETE_TIMEOUT] + }; false -> ?TMR_LOG(cdb18, [], SW), {next_state, reader, State0, [{reply, From, ok}, hibernate]} @@ -3091,6 +3096,35 @@ pendingdelete_test() -> % No issues destroying even though the file has already been removed ok = cdb_destroy(P2). +deletewhenrolling_test_() -> + {timeout, 60000, fun deletewhenrolling_tester/0}. + +deletewhenrolling_tester() -> + F1 = "test/test_area/deleterolling_test.pnd", + file:delete(F1), + {ok, P1} = cdb_open_writer(F1, #cdb_options{binary_mode = false}), + KVList = generate_sequentialkeys(5000, []), + ok = cdb_mput(P1, KVList), + ok = cdb_roll(P1), + SpawnFakeInker = spawn(fun() -> ok end), + ok = cdb_deletepending(P1, 5000, SpawnFakeInker), + ?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")), + ?assertMatch({"Key1000", "Value1000"}, cdb_get(P1, "Key1000")), + ?assertMatch({"Key4000", "Value4000"}, cdb_get(P1, "Key4000")), + timer:sleep(?DELETE_TIMEOUT - 1000), + lists:foreach( + fun(_I) -> + case is_process_alive(P1) of + true -> + timer:sleep(1000); + false -> + ok + end + end, + lists:seq(1, 10) + ), + ?assertMatch(false, is_process_alive(P1)). + getpositions_sample_test() -> % what if we try and get positions with a file with o(1000) entries F1 = "test/test_area/getpos_sample_test.pnd",