diff --git a/rebar.config b/rebar.config index 2370a9b87..fe785724a 100644 --- a/rebar.config +++ b/rebar.config @@ -7,7 +7,7 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.9"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.10"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. diff --git a/src/emqx.appup.src b/src/emqx.appup.src index ae5c1e481..2c0d887f8 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,4 +1,4 @@ -%% -*-: erlang -*- +%% -*- mode: erlang -*- {VSN, [ @@ -92,6 +92,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, @@ -110,6 +111,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[6-7]">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, @@ -120,8 +122,11 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, - {<<"4.2.6">>, [ - {load_module, emqx_channel, brutal_purge, soft_purge, []} + {<<"4.2.8">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], @@ -216,6 +221,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, @@ -234,6 +240,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[6-7]">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, @@ -244,8 +251,11 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, - {<<"4.2.6">>, [ - {load_module, emqx_channel, brutal_purge, soft_purge, []} + {<<"4.2.8">>, [ + {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index e0789382a..1b20753aa 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -945,8 +945,11 @@ handle_info({sock_closed, Reason}, Channel = Shutdown -> Shutdown end; -handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?LOG(error, "Unexpected sock_closed: ~p", [Reason]), +handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> + %% Since sock_closed messages can be generated multiple times, + %% we can simply ignore errors of this type in the disconnected state. + %% e.g. when the socket send function returns an error, there is already + %% a tcp_closed delivered to the process mailbox {ok, Channel}; handle_info(clean_acl_cache, Channel) -> diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 36a280714..2e917ca79 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -70,7 +70,10 @@ ]). %% Internal export --export([stats_fun/0]). +-export([stats_fun/0, clean_down/1]). + +%% Test export +-export([register_channel_/3]). -type(chan_pid() :: pid()). @@ -88,11 +91,13 @@ %% Batch drain -define(BATCH_SIZE, 100000). --define(T_TAKEOVER, 15000). - %% Server name -define(CM, ?MODULE). +-define(T_KICK, 5000). +-define(T_GET_INFO, 5000). +-define(T_TAKEOVER, 15000). + %% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). start_link() -> @@ -161,7 +166,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_info(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Update infos of the channel. -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()). @@ -186,7 +191,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_stats(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Set channel's stats. -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()). @@ -254,7 +259,7 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), + ?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) end, StalePids), @@ -266,78 +271,111 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> + %% TODO: if takeover times out, maybe kill the old? Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {ok, ConnMod, ChanPid, Session} end; - takeover_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]). + rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). %% @doc Discard all the sessions identified by the ClientId. -spec(discard_session(emqx_types:clientid()) -> ok). discard_session(ClientId) when is_binary(ClientId) -> case lookup_channels(ClientId) of [] -> ok; - ChanPids -> - lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) end. -do_discard_session(ClientId, Pid) -> +%% @private Kick a local stale session to force it step down. +%% If failed to kick (e.g. timeout) force a kill. +%% Keeping the stale pid around, or returning error or raise an exception +%% benefits nobody. +-spec kick_or_kill(kick | discard, module(), pid()) -> ok. +kick_or_kill(Action, ConnMod, Pid) -> try - discard_session(ClientId, Pid) + %% this is essentailly a gen_server:call implemented in emqx_connection + %% and emqx_ws_connection. + %% the handle_call is implemented in emqx_channel + ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) catch _ : noproc -> % emqx_ws_connection: call - ?LOG(debug, "session_already_gone: ~p", [Pid]), + ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]), ok; _ : {noproc, _} -> % emqx_connection: gen_server:call - ?LOG(debug, "session_already_gone: ~p", [Pid]), + ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]), ok; - _ : {'EXIT', {noproc, _}} -> % rpc_call/3 - ?LOG(debug, "session_already_gone: ~p", [Pid]), + _ : {shutdown, _} -> + ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]), ok; _ : {{shutdown, _}, _} -> - ?LOG(debug, "session_already_shutdown: ~p", [Pid]), + ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]), ok; - _ : Error : St -> - ?LOG(debug, "failed_to_discard_session: ~p, " - "error: ~p, stacktrace: ~0p", [Pid, Error, St]) + _ : {timeout, {gen_server, call, _}} -> + ?LOG(warning, "session_kick_timeout: ~p, action: ~p, " + "stale_channel: ~p", + [Pid, Action, stale_channel_info(Pid)]), + ok = force_kill(Pid); + _ : Error -> + ?LOG(error, "session_kick_exception: ~p, action: ~p, " + "reason: ~p, stacktrace: ~p, stale_channel: ~p", + [Pid, Action, Error, erlang:get_stacktrace(), stale_channel_info(Pid)]), + ok = force_kill(Pid) end. -discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chann_conn_mod(ClientId, ChanPid) of - undefined -> ok; - ConnMod when is_atom(ConnMod) -> - ConnMod:call(ChanPid, discard, ?T_TAKEOVER) - end; +force_kill(Pid) -> + exit(Pid, kill), + ok. + +stale_channel_info(Pid) -> + process_info(Pid, [status, message_queue_len, current_stacktrace]). discard_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]). + kick_session(discard, ClientId, ChanPid). + +kick_session(ClientId, ChanPid) -> + kick_session(kick, ClientId, ChanPid). + +%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). +kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> + case get_chann_conn_mod(ClientId, ChanPid) of + undefined -> + %% already deregistered + ok; + ConnMod when is_atom(ConnMod) -> + ok = kick_or_kill(Action, ConnMod, ChanPid) + end; +kick_session(Action, ClientId, ChanPid) -> + %% call remote node on the old APIs because we do not know if they have upgraded + %% to have kick_session/3 + Function = case Action of + discard -> discard_session; + kick -> kick_session + end, + try + rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK) + catch + Error : Reason -> + %% This should mostly be RPC failures. + %% However, if the node is still running the old version + %% code (prior to emqx app 4.3.10) some of the RPC handler + %% exceptions may get propagated to a new version node + ?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p", + [node(ChanPid), Action, Error, Reason]) + end. kick_session(ClientId) -> case lookup_channels(ClientId) of - [] -> {error, not_found}; - [ChanPid] -> - kick_session(ClientId, ChanPid); + [] -> + ?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]), + ok; ChanPids -> - [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), - lists:foreach(fun(StalePid) -> - catch discard_session(ClientId, StalePid) - end, StalePids), - kick_session(ClientId, ChanPid) + case length(ChanPids) > 1 of + true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]); + false -> ok + end, + lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids) end. -kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chan_info(ClientId, ChanPid) of - #{conninfo := #{conn_mod := ConnMod}} -> - ConnMod:call(ChanPid, kick, ?T_TAKEOVER); - undefined -> - {error, not_found} - end; - -kick_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]). - %% @doc Is clean start? % is_clean_start(#{clean_start := false}) -> false; % is_clean_start(_Attrs) -> true. @@ -373,10 +411,16 @@ lookup_channels(local, ClientId) -> [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)]. %% @private -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of - {badrpc, Reason} -> error(Reason); - Res -> Res +rpc_call(Node, Fun, Args, Timeout) -> + case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of + {badrpc, Reason} -> + %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler + %% should catch all exceptions and always return 'ok'. + %% This leaves 'badrpc' only possible when there is problem + %% calling the remote node. + error({badrpc, Reason}); + Res -> + Res end. %% @private @@ -409,7 +453,7 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), - ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]), + ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> @@ -445,5 +489,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chann_conn_mod(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 69337adf2..ffec9055e 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -685,7 +685,10 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) -> end; handle_info({sock_error, Reason}, State) -> - Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]), + case Reason =/= closed andalso Reason =/= einval of + true -> ?LOG(warning, "socket_error: ~p", [Reason]); + false -> ok + end, handle_info({sock_closed, Reason}, close_socket(State)); handle_info(Info, State) -> diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 94fbda4b7..7caa92ea8 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -67,6 +67,8 @@ version => ?MQTT_PROTO_V4 }). +-define(MULTIPLIER_MAX, 16#200000). + -dialyzer({no_match, [serialize_utf8_string/2]}). %%-------------------------------------------------------------------- @@ -142,7 +144,7 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 2, Options); parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options) - when Multiplier > 2097152 -> + when Multiplier > ?MULTIPLIER_MAX -> error(malformed_variable_byte_integer); parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); @@ -411,6 +413,9 @@ parse_property(<<16#2A, Val, Bin/binary>>, Props) -> parse_variable_byte_integer(Bin) -> parse_variable_byte_integer(Bin, 1, 0). +parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value) + when Multiplier > ?MULTIPLIER_MAX -> + error(malformed_variable_byte_integer); parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index b32cdaf23..ec660429a 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -31,6 +31,12 @@ conn_mod => emqx_connection, receive_maximum => 100}}). +-define(WAIT(PATTERN, TIMEOUT, RET), + fun() -> + receive PATTERN -> RET + after TIMEOUT -> error({timeout, ?LINE}) end + end()). + %%-------------------------------------------------------------------- %% CT callbacks %%-------------------------------------------------------------------- @@ -151,19 +157,98 @@ t_open_session_race_condition(_) -> exit(Pid, kill), timer:sleep(100), ?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)). -t_discard_session(_) -> - ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end), - ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = meck:unload(emqx_connection). +t_kick_session_discard_normal(_) -> + test_kick_session(discard, normal). + +t_kick_session_discard_shutdown(_) -> + test_kick_session(discard, shutdown). + +t_kick_session_discard_shutdown_with_reason(_) -> + test_kick_session(discard, {shutdown, discard}). + +t_kick_session_discard_timeout(_) -> + test_kick_session(discard, timeout). + +t_kick_session_discard_noproc(_) -> + test_kick_session(discard, noproc). + +t_kick_session_kick_normal(_) -> + test_kick_session(discard, normal). + +t_kick_session_kick_shutdown(_) -> + test_kick_session(discard, shutdown). + +t_kick_session_kick_shutdown_with_reason(_) -> + test_kick_session(discard, {shutdown, discard}). + +t_kick_session_kick_timeout(_) -> + test_kick_session(discard, timeout). + +t_kick_session_kick_noproc(_) -> + test_kick_session(discard, noproc). + +test_kick_session(Action, Reason) -> + ClientId = rand_client_id(), + #{conninfo := ConnInfo} = ?ChanInfo, + FakeSessionFun = + fun Loop() -> + receive + {'$gen_call', From, A} when A =:= kick orelse + A =:= discard -> + case Reason of + normal -> + gen_server:reply(From, ok); + timeout -> + %% no response to the call + Loop(); + _ -> + exit(Reason) + end; + Msg -> + ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), + Loop() + end + end, + {Pid1, _} = spawn_monitor(FakeSessionFun), + {Pid2, _} = spawn_monitor(FakeSessionFun), + ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel_(ClientId, Pid2, ConnInfo), + ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))), + case Reason of + noproc -> exit(Pid1, kill), exit(Pid2, kill); + _ -> ok + end, + ok = case Action of + kick -> emqx_cm:kick_session(ClientId); + discard -> emqx_cm:discard_session(ClientId) + end, + case Reason =:= timeout orelse Reason =:= noproc of + true -> + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)), + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R)); + false -> + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)), + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R)) + end, + ok = flush_emqx_pool(), + ?assertEqual([], emqx_cm:lookup_channels(ClientId)). + +rand_client_id() -> + list_to_binary("client-id-" ++ integer_to_list(erlang:system_time())). + +%% Channel deregistration is delegated to emqx_pool as a sync tasks. +%% The emqx_pool is pool of workers, and there is no way to know +%% which worker was picked for the last deregistration task. +%% This help function creates a large enough number of async tasks +%% to sync with the pool workers. +%% The number of tasks should be large enough to ensure all workers have +%% the chance to work on at least one of the tasks. +flush_emqx_pool() -> + Self = self(), + L = lists:seq(1, 1000), + lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), + lists:foreach(fun(I) -> receive {done, I} -> ok end end, L). t_takeover_session(_) -> {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), @@ -178,21 +263,6 @@ t_takeover_session(_) -> {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>). -t_kick_session(_) -> - ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end), - {error, not_found} = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), - test = emqx_cm:kick_session(<<"clientid">>), - erlang:spawn(fun() -> - ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), - timer:sleep(1000) - end), - ct:sleep(100), - test = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = meck:unload(emqx_connection). - t_all_channels(_) -> ?assertEqual(true, is_list(emqx_cm:all_channels())).