diff --git a/src/emqx_kernel_sup.erl b/src/emqx_kernel_sup.erl index 40ec7cfd7..6289437ac 100644 --- a/src/emqx_kernel_sup.erl +++ b/src/emqx_kernel_sup.erl @@ -25,7 +25,7 @@ start_link() -> init([]) -> {ok, {{one_for_one, 10, 100}, - [child_spec(emqx_pool, supervisor), + [child_spec(emqx_pool_sup, supervisor), child_spec(emqx_alarm_mgr, worker), child_spec(emqx_hooks, worker), child_spec(emqx_stats, worker), @@ -41,6 +41,7 @@ child_spec(M, worker) -> shutdown => 5000, type => worker, modules => [M]}; + child_spec(M, supervisor) -> #{id => M, start => {M, start_link, []}, diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index 7b12bea69..e90cdda6b 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -16,9 +16,14 @@ -behaviour(gen_server). --export([start_link/0, start_link/2]). +-include("logger.hrl"). + +-export([start_link/2]). -export([submit/1, submit/2]). -export([async_submit/1, async_submit/2]). +-ifdef(TEST). +-export([worker/0]). +-endif. %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -28,10 +33,6 @@ -type(task() :: fun() | mfa() | {fun(), Args :: list(any())}). -%% @doc Start pooler supervisor. -start_link() -> - emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}). - %% @doc Start pool. -spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> @@ -80,22 +81,22 @@ handle_call({submit, Task}, _From, State) -> {reply, catch run(Task), State}; handle_call(Req, _From, State) -> - emqx_logger:error("[Pool] unexpected call: ~p", [Req]), + ?ERROR("[Pool] unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({async_submit, Task}, State) -> try run(Task) catch _:Error:Stacktrace -> - emqx_logger:error("[Pool] error: ~p, ~p", [Error, Stacktrace]) + ?ERROR("[Pool] error: ~p, ~p", [Error, Stacktrace]) end, {noreply, State}; handle_cast(Msg, State) -> - emqx_logger:error("[Pool] unexpected cast: ~p", [Msg]), + ?ERROR("[Pool] unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - emqx_logger:error("[Pool] unexpected info: ~p", [Info]), + ?ERROR("[Pool] unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index eb81233f9..58a6a08cd 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -16,10 +16,13 @@ -behaviour(supervisor). --export([spec/1, spec/2, start_link/3, start_link/4]). +-export([spec/1, spec/2]). +-export([start_link/0, start_link/3, start_link/4]). -export([init/1]). +-define(POOL, emqx_pool). + -spec(spec(list()) -> supervisor:child_spec()). spec(Args) -> spec(pool_sup, Args). @@ -33,6 +36,10 @@ spec(ChildId, Args) -> type => supervisor, modules => [?MODULE]}. +%% @doc Start the default pool supervisor. +start_link() -> + start_link(?POOL, random, {?POOL, start_link, []}). + -spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, MFA) -> start_link(Pool, Type, emqx_vm:schedulers(), MFA). diff --git a/test/emqx_pool_SUITE.erl b/test/emqx_pool_SUITE.erl index 3d7d0f7e5..36ad7c507 100644 --- a/test/emqx_pool_SUITE.erl +++ b/test/emqx_pool_SUITE.erl @@ -15,22 +15,22 @@ -module(emqx_pool_SUITE). -compile(export_all). - -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). - -include_lib("eunit/include/eunit.hrl"). -all() -> [ - {group, submit_case}, - {group, async_submit_case} - ]. +all() -> + [ + {group, submit_case}, + {group, async_submit_case}, + t_unexpected + ]. groups() -> [ {submit_case, [sequence], [submit_mfa, submit_fa]}, - {async_submit_case, [sequence], [async_submit_mfa]} + {async_submit_case, [sequence], [async_submit_mfa, async_submit_ex]} ]. init_per_suite(Config) -> @@ -40,26 +40,39 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. +init_per_testcase(_, Config) -> + {ok, Sup} = emqx_pool_sup:start_link(), + [{pool_sup, Sup}|Config]. + +end_per_testcase(_, Config) -> + Sup = proplists:get_value(pool_sup, Config), + exit(Sup, normal). + submit_mfa(_Config) -> - erlang:process_flag(trap_exit, true), - {ok, Pid} = emqx_pool:start_link(), Result = emqx_pool:submit({?MODULE, test_mfa, []}), - ?assertEqual(15, Result), - gen_server:stop(Pid, normal, 3000), - ok. + ?assertEqual(15, Result). submit_fa(_Config) -> - {ok, Pid} = emqx_pool:start_link(), Fun = fun(X) -> case X rem 2 of 0 -> {true, X div 2}; _ -> false end end, Result = emqx_pool:submit(Fun, [2]), - ?assertEqual({true, 1}, Result), - exit(Pid, normal). + ?assertEqual({true, 1}, Result). + +async_submit_mfa(_Config) -> + emqx_pool:async_submit({?MODULE, test_mfa, []}), + emqx_pool:async_submit(fun ?MODULE:test_mfa/0, []). + +async_submit_ex(_) -> + emqx_pool:async_submit(fun error_fun/0). + +t_unexpected(_) -> + Pid = emqx_pool:worker(), + ?assertEqual(ignored, gen_server:call(Pid, bad_request)), + ?assertEqual(ok, gen_server:cast(Pid, bad_msg)), + Pid ! bad_info, + ok = gen_server:stop(Pid). test_mfa() -> lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]). -async_submit_mfa(_Config) -> - {ok, Pid} = emqx_pool:start_link(), - emqx_pool:async_submit({?MODULE, test_mfa, []}), - exit(Pid, normal). +error_fun() -> error(test_error).