diff --git a/src/riak_core_node_worker_pool_sup.erl b/src/riak_core_node_worker_pool_sup.erl index 46e0abb..f020aa3 100644 --- a/src/riak_core_node_worker_pool_sup.erl +++ b/src/riak_core_node_worker_pool_sup.erl @@ -17,8 +17,10 @@ %% ------------------------------------------------------------------- -module(riak_core_node_worker_pool_sup). -behaviour(supervisor). +-include_lib("kernel/include/logger.hrl"). -export([start_link/0, init/1]). -export([start_pool/5]). +-export([hard_reset_dscp_pool/5]). %% Helper macro for declaring children of supervisor -define(CHILD(I, PoolType, Args, Type, Timeout), @@ -37,9 +39,9 @@ init([]) -> {ok, {{one_for_one, 5, 10}, []}}. %% @doc -%% Start a node_worker_pool - can be either assuredforwardng_pool or -%% a besteffort_pool (which will also be registered as a node_worker_pool for -%% backwards compatability) +%% Start a node_worker_pool - can be either assured forwarding pool or +%% a best effort_pool (which will also be registered as a node_worker_pool for +%% backwards compatibility) -spec start_pool(atom(), pos_integer(), list(), list(), worker_pool()) -> ok | {error, Reason::term()}. start_pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) -> @@ -58,3 +60,96 @@ pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) -> [WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType], worker). + +-define(DSCP_POOLS, [be_pool,af4_pool,af3_pool,af2_pool,af1_pool]). + +% @doc +% A hard reset of a dscp pool is tested only on a cluster not running active +% queries. All query runners should be killed as well the existing pool +% managers +% +% A new set of pools will be started with the new worker counts. +% +% This will exit if a dscp pool strategy is not in use, without forcing the +% hard reset. +-spec hard_reset_dscp_pool( + pos_integer(), + pos_integer(), + pos_integer(), + pos_integer(), + pos_integer() +) -> + ok|{error, unexpected_state}. +hard_reset_dscp_pool(AF1, AF2, AF3, AF4, BE) -> + {PoolList, PoolMap} = + element(4, sys:get_state(riak_core_node_worker_pool_sup)), + case lists:sort(PoolList) == lists:sort(?DSCP_POOLS) of + true -> + ?LOG_INFO( + "Attempting hard reset of dscp node worker pool to sizes: " + "AF1 ~w AF2 ~w AF3 ~w AF4 ~w BE ~w", + [AF1, AF2, AF3, AF4, BE] + ), + ?LOG_WARNING( + "Attempt to hard reset dscp pool will cancel will all running " + "query contributions on this node" + ), + true = + exit( + whereis(riak_core_node_worker_pool_sup), + kill + ), + true = + wait_until( + fun() -> + case whereis(riak_core_node_worker_pool_sup) of + undefined -> + false; + Pid when is_pid(Pid) -> + is_process_alive(Pid) + end + end, + 1, + 10 + ), + lists:foreach( + fun({PoolName, PoolSize}) -> + riak_core_node_worker_pool_sup:start_pool( + riak_kv_worker, + PoolSize, + [], + [], + PoolName + ) + end, + [ + {af1_pool, AF1}, + {af2_pool, AF2}, + {af3_pool, AF3}, + {af4_pool, AF4}, + {be_pool, BE} + ] + ), + {_UpdlList, UpdPoolMap} = + element(4, sys:get_state(riak_core_node_worker_pool_sup)), + ?LOG_INFO("Pool update complete configuration ~0p", [UpdPoolMap]), + ok; + false -> + ?LOG_ERROR("Pool state not expected for DSCP Pool ~0p", [PoolMap]), + {error, unexpected_state} + end. + +-spec wait_until( + fun(() -> boolean()), pos_integer(), non_neg_integer() +) -> + boolean(). +wait_until(_Fun, _Delay, 0) -> + false; +wait_until(Fun, Delay, Count) -> + case Fun() of + true -> + true; + _ -> + timer:sleep(Delay), + wait_until(Fun, Delay, Count - 1) + end.