Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
book_loglevel/2,
book_addlogs/2,
book_removelogs/2,
book_headstatus/1
book_headstatus/1,
book_status/1
]).

%% folding API
Expand Down Expand Up @@ -1316,6 +1317,24 @@ book_removelogs(Pid, ForcedLogs) ->
book_headstatus(Pid) ->
gen_server:call(Pid, head_status, infinity).

-spec book_status(pid()) -> map().
%% @doc
%% Return a proplist containing the following items:
%% * current size of the ledger cache;
%% * number of active journal files;
%% * average compaction score for the journal;
%% * current distribution of files across the ledger (e.g. count of files by level);
%% * current size of the penciller in-memory cache;
%% * penciller work backlog status;
%% * last merge time (penciller);
%% * last compaction time (journal);
%% * last compaction result (journal) e.g. files compacted and compaction score;
%% * ratio of metadata to object size (recent PUTs);
%% * PUT/GET/HEAD recent time/count metrics;
%% * mean level for recent fetches.
book_status(Pid) ->
gen_server:call(Pid, status, infinity).

%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
Expand Down Expand Up @@ -1475,7 +1494,8 @@ handle_call(
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{ok, Cache} ->
Expand Down Expand Up @@ -1509,7 +1529,8 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) when
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{ok, Cache} ->
Expand Down Expand Up @@ -1686,7 +1707,8 @@ handle_call({compact_journal, Timeout}, From, State) when
State#state.cache_size,
State#state.cache_multiple,
State#state.ledger_cache,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{_, NewCache} ->
Expand Down Expand Up @@ -1740,6 +1762,8 @@ handle_call(return_actors, _From, State) ->
{reply, {ok, State#state.inker, State#state.penciller}, State};
handle_call(head_status, _From, State) ->
{reply, {State#state.head_only, State#state.head_lookup}, State};
handle_call(status, _From, State) ->
{reply, status(State), State};
handle_call(Msg, _From, State) ->
{reply, {unsupported_message, element(1, Msg)}, State}.

Expand Down Expand Up @@ -2877,7 +2901,11 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
end.

-spec maybepush_ledgercache(
pos_integer(), pos_integer(), ledger_cache(), pid()
pos_integer(),
pos_integer(),
ledger_cache(),
pid(),
leveled_monitor:monitor()
) ->
{ok | returned, ledger_cache()}.
%% @doc
Expand All @@ -2890,9 +2918,12 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
%% in the reply. Try again later when it isn't busy (and also potentially
%% implement a slow_offer state to slow down the pace at which PUTs are being
%% received)
maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller) ->
maybepush_ledgercache(
MaxCacheSize, MaxCacheMult, Cache, Penciller, {Monitor, _}
) ->
Tab = Cache#ledger_cache.mem,
CacheSize = ets:info(Tab, size),
leveled_monitor:add_stat(Monitor, {ledger_cache_size_update, CacheSize}),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize, MaxCacheMult),
if
TimeToPush ->
Expand Down Expand Up @@ -3048,6 +3079,11 @@ maybelog_snap_timing({Pid, _StatsFreq}, BookieTime, PCLTime) when
maybelog_snap_timing(_Monitor, _, _) ->
ok.

status(#state{monitor = {no_monitor, 0}}) ->
#{};
status(#state{monitor = {Monitor, _}}) ->
leveled_monitor:get_bookie_status(Monitor).

%%%============================================================================
%%% Test
%%%============================================================================
Expand Down
48 changes: 47 additions & 1 deletion src/leveled_cdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ starting({call, From}, {open_writer, Filename}, State) ->
starting({call, From}, {open_reader, Filename}, State) ->
leveled_log:save(State#state.log_options),
?STD_LOG(cdb02, [Filename]),
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
State0 = State#state{
handle = Handle,
Expand All @@ -505,6 +507,8 @@ starting({call, From}, {open_reader, Filename}, State) ->
starting({call, From}, {open_reader, Filename, LastKey}, State) ->
leveled_log:save(State#state.log_options),
?STD_LOG(cdb02, [Filename]),
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
State0 = State#state{
handle = Handle,
Expand Down Expand Up @@ -650,6 +654,8 @@ writer(
) when
?IS_DEF(LP)
->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
ok =
leveled_iclerk:clerk_hashtablecalc(
State#state.hashtree, LP, self()
Expand Down Expand Up @@ -718,7 +724,12 @@ rolling(
},
case State#state.deferred_delete of
true ->
{next_state, delete_pending, State0, [{reply, From, ok}]};
{
next_state,
delete_pending,
State0,
[{reply, From, ok}, ?DELETE_TIMEOUT]
};
false ->
?TMR_LOG(cdb18, [], SW),
{next_state, reader, State0, [{reply, From, ok}, hibernate]}
Expand Down Expand Up @@ -880,6 +891,8 @@ delete_pending(
) when
?IS_DEF(FN), ?IS_DEF(IO)
->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, -1}),
?STD_LOG(cdb04, [FN, State#state.delete_point]),
close_pendingdelete(IO, FN, State#state.waste_path),
{stop, normal};
Expand All @@ -906,6 +919,10 @@ delete_pending(
),
{keep_state_and_data, [?DELETE_TIMEOUT]};
false ->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(
Monitor, {n_active_journal_files_update, -1}
),
?STD_LOG(cdb04, [FN, ManSQN]),
close_pendingdelete(IO, FN, State#state.waste_path),
{stop, normal}
Expand Down Expand Up @@ -3079,6 +3096,35 @@ pendingdelete_test() ->
% No issues destroying even though the file has already been removed
ok = cdb_destroy(P2).

deletewhenrolling_test_() ->
{timeout, 60000, fun deletewhenrolling_tester/0}.

deletewhenrolling_tester() ->
F1 = "test/test_area/deleterolling_test.pnd",
file:delete(F1),
{ok, P1} = cdb_open_writer(F1, #cdb_options{binary_mode = false}),
KVList = generate_sequentialkeys(5000, []),
ok = cdb_mput(P1, KVList),
ok = cdb_roll(P1),
SpawnFakeInker = spawn(fun() -> ok end),
ok = cdb_deletepending(P1, 5000, SpawnFakeInker),
?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")),
?assertMatch({"Key1000", "Value1000"}, cdb_get(P1, "Key1000")),
?assertMatch({"Key4000", "Value4000"}, cdb_get(P1, "Key4000")),
timer:sleep(?DELETE_TIMEOUT - 1000),
lists:foreach(
fun(_I) ->
case is_process_alive(P1) of
true ->
timer:sleep(1000);
false ->
ok
end
end,
lists:seq(1, 10)
),
?assertMatch(false, is_process_alive(P1)).

getpositions_sample_test() ->
% what if we try and get positions with a file with o(1000) entries
F1 = "test/test_area/getpos_sample_test.pnd",
Expand Down
125 changes: 82 additions & 43 deletions src/leveled_iclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -421,46 +421,69 @@ handle_cast(
CloseFun = ScoringState#scoring_state.close_fun,
SW = ScoringState#scoring_state.start_time,
ScoreParams =
{MaxRunLength, State#state.maxrunlength_compactionperc,
State#state.singlefile_compactionperc},
{
MaxRunLength,
State#state.maxrunlength_compactionperc,
State#state.singlefile_compactionperc
},
{BestRun0, Score} = assess_candidates(Candidates, ScoreParams),
?TMR_LOG(ic003, [Score, length(BestRun0)], SW),
case Score > 0.0 of
true ->
BestRun1 = sort_run(BestRun0),
print_compaction_run(BestRun1, ScoreParams),
ManifestSlice =
compact_files(
BestRun1,
CDBopts,
FilterFun,
FilterServer,
MaxSQN,
State#state.reload_strategy,
State#state.compression_method
),
FilesToDelete =
lists:map(
fun(C) ->
{
C#candidate.low_sqn,
C#candidate.filename,
C#candidate.journal,
undefined
}
end,
BestRun1
),
?STD_LOG(ic002, [length(FilesToDelete)]),
ok = CloseFun(FilterServer),
ok =
leveled_inker:ink_clerkcomplete(
State#state.inker, ManifestSlice, FilesToDelete
);
false ->
ok = CloseFun(FilterServer),
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], [])
end,
LRL =
case Score > 0.0 of
true ->
BestRun1 = sort_run(BestRun0),
print_compaction_run(BestRun1, ScoreParams),
ManifestSlice =
compact_files(
BestRun1,
CDBopts,
FilterFun,
FilterServer,
MaxSQN,
State#state.reload_strategy,
State#state.compression_method
),
FilesToDelete =
lists:map(
fun(C) ->
{
C#candidate.low_sqn,
C#candidate.filename,
C#candidate.journal,
undefined
}
end,
BestRun1
),
?STD_LOG(ic002, [length(FilesToDelete)]),
ok = CloseFun(FilterServer),
ok =
leveled_inker:ink_clerkcomplete(
State#state.inker, ManifestSlice, FilesToDelete
),
length(BestRun0);
false ->
ok = CloseFun(FilterServer),
ok =
leveled_inker:ink_clerkcomplete(State#state.inker, [], []),
0
end,
{Monitor, _} = CDBopts#cdb_options.monitor,
{MaxScore, MeanScore} = calc_run_stats(Candidates),
{MegaST, SecST, MicroST} = ScoringState#scoring_state.start_time,
StartTimeMilli = (MegaST * 1000000 + SecST) * 1000 + (MicroST div 1000),
leveled_monitor:add_stat(
Monitor,
{
journal_compaction,
MaxScore,
MeanScore,
Score,
LRL,
os:system_time(millisecond) - StartTimeMilli,
StartTimeMilli
}
),
{noreply, State#state{scoring_state = undefined}, hibernate};
handle_cast(
{trim, PersistedSQN, ManifestAsList}, State = #state{inker = Ink}
Expand Down Expand Up @@ -584,6 +607,18 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
%%% Internal functions
%%%============================================================================

-spec calc_run_stats(list(candidate())) -> {float(), float()}.
calc_run_stats(Candidates) ->
case lists:map(fun(C) -> C#candidate.compaction_perc end, Candidates) of
L when length(L) > 0 ->
{
lists:max(L),
lists:sum(L) / length(L)
};
_ ->
{0.0, 0.0}
end.

-spec check_single_file(
pid(),
leveled_inker:filterfun(),
Expand Down Expand Up @@ -714,7 +749,7 @@ fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
%%
%% Although this requires many loops over the list of the candidate, as the
%% file scores have already been calculated the cost per loop should not be
%% a high burden. Reducing the maximum run length, will reduce the cost of
%% a high burden. Reducing the maximum run length, will reduce the cost if
%% this exercise should be a problem.
%%
%% The score parameters are used to produce the score of the compaction run,
Expand Down Expand Up @@ -756,7 +791,7 @@ assess_candidates(AllCandidates, Params) ->
{list(candidate()), float()}.
%% @doc
%% For a given run length, calculate the scores for all consecutive runs of
%% files, comparing the score with the best run which has beens een so far.
%% files, comparing the score with the best run which has beens seen so far.
%% The best is a tuple of the actual run of candidates, along with the score
%% achieved for that run
assess_for_runlength(RunLength, AllCandidates, Params, Best) ->
Expand All @@ -775,7 +810,7 @@ assess_for_runlength(RunLength, AllCandidates, Params, Best) ->
-spec score_run(list(candidate()), score_parameters()) -> float().
%% @doc
%% Score a run. Caluclate the avergae score across all the files in the run,
%% and deduct that from a target score. Good candidate runs for comapction
%% and deduct that from a target score. Good candidate runs for compaction
%% have larger (positive) scores. Bad candidate runs for compaction have
%% negative scores.
score_run([], _Params) ->
Expand Down Expand Up @@ -1265,7 +1300,10 @@ check_single_file_test() ->
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(37.5, Score1),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS),
Score2 =
check_single_file(
CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS
),
?assertMatch(100.0, Score2),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS),
?assertMatch(37.5, Score3),
Expand Down Expand Up @@ -1414,7 +1452,8 @@ compact_empty_file_test() ->
{3, {o, "Bucket", "Key3", null}}
],
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
Score1 =
check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assert((+0.0 =:= Score1) orelse (-0.0 =:= Score1)),
ok = leveled_cdb:cdb_deletepending(CDB2),
ok = leveled_cdb:cdb_destroy(CDB2).
Expand Down
Loading