Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 98 additions & 3 deletions src/riak_core_node_worker_pool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
%% -------------------------------------------------------------------
-module(riak_core_node_worker_pool_sup).
-behaviour(supervisor).
-include_lib("kernel/include/logger.hrl").
-export([start_link/0, init/1]).
-export([start_pool/5]).
-export([hard_reset_dscp_pool/5]).

%% Helper macro for declaring children of supervisor
-define(CHILD(I, PoolType, Args, Type, Timeout),
Expand All @@ -37,9 +39,9 @@ init([]) ->
{ok, {{one_for_one, 5, 10}, []}}.

%% @doc
%% Start a node_worker_pool - can be either assuredforwardng_pool or
%% a besteffort_pool (which will also be registered as a node_worker_pool for
%% backwards compatability)
%% Start a node_worker_pool - can be either assured forwarding pool or
%% a best effort_pool (which will also be registered as a node_worker_pool for
%% backwards compatibility)
-spec start_pool(atom(), pos_integer(), list(), list(), worker_pool()) ->
ok | {error, Reason::term()}.
start_pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) ->
Expand All @@ -58,3 +60,96 @@ pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) ->
[WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType],
worker).


-define(DSCP_POOLS, [be_pool,af4_pool,af3_pool,af2_pool,af1_pool]).

% @doc
% A hard reset of a dscp pool is tested only on a cluster not running active
% queries. All query runners should be killed as well the existing pool
% managers
%
% A new set of pools will be started with the new worker counts.
%
% This will exit if a dscp pool strategy is not in use, without forcing the
% hard reset.
-spec hard_reset_dscp_pool(
pos_integer(),
pos_integer(),
pos_integer(),
pos_integer(),
pos_integer()
) ->
ok|{error, unexpected_state}.
hard_reset_dscp_pool(AF1, AF2, AF3, AF4, BE) ->
{PoolList, PoolMap} =
element(4, sys:get_state(riak_core_node_worker_pool_sup)),
case lists:sort(PoolList) == lists:sort(?DSCP_POOLS) of
true ->
?LOG_INFO(
"Attempting hard reset of dscp node worker pool to sizes: "
"AF1 ~w AF2 ~w AF3 ~w AF4 ~w BE ~w",
[AF1, AF2, AF3, AF4, BE]
),
?LOG_WARNING(
"Attempt to hard reset dscp pool will cancel will all running "
"query contributions on this node"
),
true =
exit(
whereis(riak_core_node_worker_pool_sup),
kill
),
true =
wait_until(
fun() ->
case whereis(riak_core_node_worker_pool_sup) of
undefined ->
false;
Pid when is_pid(Pid) ->
is_process_alive(Pid)
end
end,
1,
10
),
lists:foreach(
fun({PoolName, PoolSize}) ->
riak_core_node_worker_pool_sup:start_pool(
riak_kv_worker,
PoolSize,
[],
[],
PoolName
)
end,
[
{af1_pool, AF1},
{af2_pool, AF2},
{af3_pool, AF3},
{af4_pool, AF4},
{be_pool, BE}
]
),
{_UpdlList, UpdPoolMap} =
element(4, sys:get_state(riak_core_node_worker_pool_sup)),
?LOG_INFO("Pool update complete configuration ~0p", [UpdPoolMap]),
ok;
false ->
?LOG_ERROR("Pool state not expected for DSCP Pool ~0p", [PoolMap]),
{error, unexpected_state}
end.

-spec wait_until(
fun(() -> boolean()), pos_integer(), non_neg_integer()
) ->
boolean().
wait_until(_Fun, _Delay, 0) ->
false;
wait_until(Fun, Delay, Count) ->
case Fun() of
true ->
true;
_ ->
timer:sleep(Delay),
wait_until(Fun, Delay, Count - 1)
end.
Loading