From ef19e8a08b3243fcdc1c455c79673b4d6911417e Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Mon, 30 Nov 2020 17:28:03 +0800 Subject: [PATCH 1/9] feat(listener): add depth for ssl listener --- .gitignore | 1 + etc/emqx.conf | 5 +++++ priv/emqx.schema | 6 ++++++ 3 files changed, 12 insertions(+) diff --git a/.gitignore b/.gitignore index aaec950d4..2e19823c3 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,4 @@ erlang.mk *.coverdata etc/emqx.conf.rendered Mnesia.*/ +.stamp diff --git a/etc/emqx.conf b/etc/emqx.conf index 24a3a3060..ddc1ed755 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1317,6 +1317,11 @@ listener.ssl.external.access.1 = allow all ## Value: Duration listener.ssl.external.handshake_timeout = 15s +## Maximum number of non-self-issued intermediate certificates that can follow the peer certificate in a valid certification path. +## +## Value: Number +#listener.ssl.external.depth = 10 + ## Path to the file containing the user's private PEM-encoded key. ## ## See: http://erlang.org/doc/man/ssl.html diff --git a/priv/emqx.schema b/priv/emqx.schema index fd5badb68..ccecd8315 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1368,6 +1368,11 @@ end}. {datatype, {duration, ms}} ]}. +{mapping, "listener.ssl.$name.depth", "emqx.listeners", [ + {default, 10}, + {datatype, integer} +]}. + {mapping, "listener.ssl.$name.dhfile", "emqx.listeners", [ {datatype, string} ]}. @@ -1878,6 +1883,7 @@ end}. {ciphers, Ciphers}, {user_lookup_fun, UserLookupFun}, {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, + {depth, cuttlefish:conf_get(Prefix ++ ".depth", Conf, undefined)}, {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, From 3f924631e4aab5c31627f04bbd9483b82d4e6cd3 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 2 Dec 2020 14:07:55 +0800 Subject: [PATCH 2/9] refactor(limiter): not saving anonymous function --- src/emqx_limiter.erl | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index e3ff7512f..26172f761 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -35,7 +35,7 @@ -type(checker() :: #{ name := name() , capacity := non_neg_integer() , interval := non_neg_integer() - , consumer := function() | esockd_rate_limit:bucket() + , consumer := esockd_rate_limit:bucket() | emqx_zone:zone() }). -type(name() :: conn_bytes_in @@ -84,7 +84,7 @@ do_init_checker(Zone, {Name, {Capacity, Interval}}) -> _ -> esockd_limiter:create({Zone, Name}, Capacity, Interval) end, - Ck#{consumer => fun(I) -> esockd_limiter:consume({Zone, Name}, I) end}; + Ck#{consumer => Zone}; _ -> Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)} end. @@ -126,7 +126,13 @@ consume(Pubs, Bytes, #{name := Name, consumer := Cons}) -> _ -> case is_overall_limiter(Name) of true -> - {_, Intv} = Cons(Tokens), + {_, Intv} = case erlang:is_function(Cons) of + true -> %% Compatible with hot-upgrade from e4.2.0, e4.2.1. + %% It should be removed after 4.3.0 + {env, [Zone|_]} = erlang:fun_info(Cons, env), + esockd_limiter:consume({Zone, Name}, Tokens); + _ -> esockd_limiter:consume({Cons, Name}, Tokens) + end, {Intv, Cons}; _ -> esockd_rate_limit:check(Tokens, Cons) From 3b1074d11f25624d34da05bc3dd184c8b8a3faac Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 2 Dec 2020 17:00:23 +0800 Subject: [PATCH 3/9] refactor(conn): not saving anonymous func --- src/emqx.appup.src | 53 +++------------- src/emqx_connection.erl | 10 +-- src/emqx_frame.erl | 124 +++++++++++++++++++++++++++++++++++++ src/emqx_limiter.erl | 2 + src/emqx_ws_connection.erl | 10 +-- test/emqx_frame_SUITE.erl | 15 ++++- 6 files changed, 157 insertions(+), 57 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index c36611f85..87ca0c005 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,48 +1,9 @@ %% -*-: erlang -*- -{DefaultLen, DefaultSize} = - case WordSize = erlang:system_info(wordsize) of - 8 -> % arch_64 - {10000, cuttlefish_bytesize:parse("64MB")}; - 4 -> % arch_32 - {1000, cuttlefish_bytesize:parse("32MB")} - end, -{"4.2.3", - [ - {"4.2.2", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.2.1", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []} - ]}, - {"4.2.0", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []}, - {apply, {application, set_env, - [emqx, force_shutdown_policy, - #{message_queue_len => DefaultLen, - max_heap_size => DefaultSize div WordSize}]}} - ]} - ], - [ - {"4.2.2", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.2.1", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []} - ]}, - {"4.2.0", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []} - ]} - ] +{VSN, + [ + {<<".*">>, []} + ], + [ + {<<".*">>, []} + ] }. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 2ab7f6c1e..8e8f7117d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -80,8 +80,8 @@ limit_timer :: maybe(reference()), %% Parse State parse_state :: emqx_frame:parse_state(), - %% Serialize function - serialize :: emqx_frame:serialize_fun(), + %% Serialize options + serialize :: emqx_frame:serialize_opts(), %% Channel State channel :: emqx_channel:channel(), %% GC State @@ -203,7 +203,7 @@ init_state(Transport, Socket, Options) -> Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), - Serialize = emqx_frame:serialize_fun(), + Serialize = emqx_frame:serialize_opts(), Channel = emqx_channel:init(ConnInfo, Options), GcState = emqx_zone:init_gc_state(Zone), StatsTimer = emqx_zone:stats_timer(Zone), @@ -337,7 +337,7 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #state{idle_timer = IdleTimer}) -> ok = emqx_misc:cancel_timer(IdleTimer), - Serialize = emqx_frame:serialize_fun(ConnPkt), + Serialize = emqx_frame:serialize_opts(ConnPkt), NState = State#state{serialize = Serialize, idle_timer = undefined }, @@ -578,7 +578,7 @@ handle_outgoing(Packet, State) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case Serialize(Packet) of + case emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 1c27548f7..cd41a935c 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -31,26 +31,53 @@ , serialize/2 ]). +%% The new version APIs to avoid saving +%% anonymous func +-export([ parse2/1 + , parse2/2 + , serialize_opts/0 + , serialize_opts/1 + , serialize_pkt/2 + ]). + -export_type([ options/0 , parse_state/0 , parse_result/0 , serialize_fun/0 ]). +-export_type([ parse_state2/0 + , parse_result2/0 + , serialize_opts/0 + ]). + -type(options() :: #{strict_mode => boolean(), max_size => 1..?MAX_PACKET_SIZE, version => emqx_types:version() }). -type(parse_state() :: {none, options()} | cont_fun()). +-type(parse_state2() :: {none, options()} | {cont_state(), options()}). -type(parse_result() :: {more, cont_fun()} | {ok, emqx_types:packet(), binary(), parse_state()}). +-type(parse_result2() :: {more, parse_state()} + | {ok, emqx_types:packet(), binary(), parse_state()}). + -type(cont_fun() :: fun((binary()) -> parse_result())). +-type(cont_state() :: {Stage :: len | body, + State :: #{hdr := #mqtt_packet_header{}, + len := {pos_integer(), non_neg_integer()} | non_neg_integer(), + rest => binary() + } + }). + -type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())). +-type(serialize_opts() :: options()). + -define(none(Options), {none, Options}). -define(DEFAULT_OPTIONS, @@ -81,6 +108,89 @@ merge_opts(Options) -> %% Parse MQTT Frame %%-------------------------------------------------------------------- +-spec(parse2(binary()) -> parse_result2()). +parse2(Bin) -> + parse2(Bin, initial_parse_state()). + +-spec(parse2(binary(), parse_state()) -> parse_result2()). +parse2(<<>>, {none, Options}) -> + {more, {none, Options}}; +parse2(<>, + {none, Options = #{strict_mode := StrictMode}}) -> + %% Validate header if strict mode. + StrictMode andalso validate_header(Type, Dup, QoS, Retain), + Header = #mqtt_packet_header{type = Type, + dup = bool(Dup), + qos = QoS, + retain = bool(Retain) + }, + Header1 = case fixqos(Type, QoS) of + QoS -> Header; + FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} + end, + parse_remaining_len2(Rest, Header1, Options); + +parse2(Bin, {{len, #{hdr := Header, + len := {Multiplier, Length}} + }, Options}) when is_binary(Bin) -> + parse_remaining_len2(Bin, Header, Multiplier, Length, Options); +parse2(Bin, {{body, #{hdr := Header, + len := Length, + rest := Rest} + }, Options}) when is_binary(Bin) -> + parse_frame2(<>, Header, Length, Options). + +parse_remaining_len2(<<>>, Header, Options) -> + {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; +parse_remaining_len2(Rest, Header, Options) -> + parse_remaining_len2(Rest, Header, 1, 0, Options). + +parse_remaining_len2(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) + when Length > MaxSize -> + error(frame_too_large); +parse_remaining_len2(<<>>, Header, Multiplier, Length, Options) -> + {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; +%% Match DISCONNECT without payload +parse_remaining_len2(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> + Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), + {ok, Packet, Rest, ?none(Options)}; +%% Match PINGREQ. +parse_remaining_len2(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> + parse_frame2(Rest, Header, 0, Options); +%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... +parse_remaining_len2(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> + parse_frame2(Rest, Header, 2, Options); +parse_remaining_len2(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> + parse_remaining_len2(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); +parse_remaining_len2(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, + Options = #{max_size := MaxSize}) -> + FrameLen = Value + Len * Multiplier, + if + FrameLen > MaxSize -> error(frame_too_large); + true -> parse_frame2(Rest, Header, FrameLen, Options) + end. + +parse_frame2(Bin, Header, 0, Options) -> + {ok, packet(Header), Bin, ?none(Options)}; + +parse_frame2(Bin, Header, Length, Options) -> + case Bin of + <> -> + case parse_packet(Header, FrameBin, Options) of + {Variable, Payload} -> + {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; + Variable = #mqtt_packet_connect{proto_ver = Ver} -> + {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})}; + Variable -> + {ok, packet(Header, Variable), Rest, ?none(Options)} + end; + TooShortBin -> + {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}} + end. + +%% Deprecated parse funcs +%% It should be removed after 4.2.x + -spec(parse(binary()) -> parse_result()). parse(Bin) -> parse(Bin, initial_parse_state()). @@ -443,6 +553,20 @@ serialize_fun(#{version := Ver, max_size := MaxSize}) -> end end. +serialize_opts() -> + ?DEFAULT_OPTIONS. + +serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> + MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), + #{version => ProtoVer, max_size => MaxSize}. + +serialize_pkt(Packet, #{version := Ver, max_size := MaxSize}) -> + IoData = serialize(Packet, Ver), + case is_too_large(IoData, MaxSize) of + true -> <<>>; + false -> IoData + end. + -spec(serialize(emqx_types:packet()) -> iodata()). serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4). diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index 26172f761..fad897d92 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -53,6 +53,8 @@ -type(limiter() :: #limiter{}). +-dialyzer({nowarn_function, [consume/3]}). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 33b882768..ee0e24dbb 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -70,8 +70,8 @@ limit_timer :: maybe(reference()), %% Parse State parse_state :: emqx_frame:parse_state(), - %% Serialize Fun - serialize :: emqx_frame:serialize_fun(), + %% Serialize options + serialize :: emqx_frame:serialize_opts(), %% Channel channel :: emqx_channel:channel(), %% GC State @@ -231,7 +231,7 @@ websocket_init([Req, Opts]) -> MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), - Serialize = emqx_frame:serialize_fun(), + Serialize = emqx_frame:serialize_opts(), Channel = emqx_channel:init(ConnInfo, Opts), GcState = emqx_zone:init_gc_state(Zone), StatsTimer = emqx_zone:stats_timer(Zone), @@ -292,7 +292,7 @@ websocket_info({cast, Msg}, State) -> handle_info(Msg, State); websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> - Serialize = emqx_frame:serialize_fun(ConnPkt), + Serialize = emqx_frame:serialize_opts(ConnPkt), NState = State#state{serialize = Serialize}, handle_incoming(Packet, cancel_idle_timer(NState)); @@ -544,7 +544,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case Serialize(Packet) of + case emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 5d4e146db..7ef60bc69 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -26,6 +26,7 @@ all() -> [{group, parse}, + {group, parse2}, {group, connect}, {group, connack}, {group, publish}, @@ -44,6 +45,8 @@ groups() -> [t_parse_cont, t_parse_frame_too_large ]}, + {parse2, [parallel], + [t_parse_cont2]}, {connect, [parallel], [t_serialize_parse_v3_connect, t_serialize_parse_v4_connect, @@ -129,6 +132,16 @@ t_parse_frame_too_large(_) -> ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})), ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). +t_parse_cont2(_) -> + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}), + ParseState = emqx_frame:initial_parse_state(), + <> = serialize_to_binary(Packet), + {more, ContParse} = emqx_frame:parse2(<<>>, ParseState), + {more, ContParse1} = emqx_frame:parse2(HdrBin, ContParse), + {more, ContParse2} = emqx_frame:parse2(LenBin, ContParse1), + {more, ContParse3} = emqx_frame:parse2(<<>>, ContParse2), + {ok, Packet, <<>>, _} = emqx_frame:parse2(RestBin, ContParse3). + t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, @@ -509,7 +522,7 @@ parse_serialize(Packet, Opts) when is_map(Opts) -> Ver = maps:get(version, Opts, ?MQTT_PROTO_V4), Bin = iolist_to_binary(emqx_frame:serialize(Packet, Ver)), ParseState = emqx_frame:initial_parse_state(Opts), - {ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState), + {ok, NPacket, <<>>, _} = emqx_frame:parse2(Bin, ParseState), NPacket. serialize_to_binary(Packet) -> From f1b3bbd7bc81b936dbdd635e6a1e447e8e0c73d0 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 2 Dec 2020 17:52:36 +0800 Subject: [PATCH 4/9] chore: supply the code_change logic --- rebar.config | 2 +- src/emqx_connection.erl | 56 ++++++++++++++++ src/emqx_frame.erl | 133 ++++++-------------------------------- src/emqx_limiter.erl | 8 +-- test/emqx_frame_SUITE.erl | 15 +---- test/emqx_vm_SUITE.erl | 2 +- 6 files changed, 78 insertions(+), 138 deletions(-) diff --git a/rebar.config b/rebar.config index df71f9762..fc3a95ce2 100644 --- a/rebar.config +++ b/rebar.config @@ -40,7 +40,7 @@ [{plugins, [{coveralls, {git, "https://github.com/emqx/coveralls-erl", {branch, "github"}}}]}, {deps, [{bbmustache, "1.7.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}, + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}} ]}, {erl_opts, [debug_info]} diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 8e8f7117d..ad3694812 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -441,6 +441,62 @@ system_continue(Parent, _Debug, State) -> system_terminate(Reason, _Parent, _Debug, State) -> terminate(Reason, State). +system_code_change(State, _Mod, {down, Vsn}, _Extra) + when Vsn == "4.2.0"; + Vsn == "4.2.1" -> + Channel = State#state.channel, + NSerialize = emqx_frame:serialize_fun(State#state.serialize), + case {State#state.parse_state, element(10, Channel)} of + {{none, _}, undefined} -> + {ok, State#state{serialize = NSerialize}}; + {Ps, Quota} -> + %% BACKW: e4.2.0-e4.2.1 + %% We can't recover/reconstruct anonymous function state for + %% Parser or Quota consumer. So just close it. + ?LOG(error, "Unsupport downgrade connection ~0p, peername: ~0p." + " Due to it have an incomplete frame or unsafed quota counter," + " parser_state: ~0p, quota: ~0p." + " Force close it now!!!", [self(), State#state.peername, Ps, Quota]), + self() ! {close, unsupported_downgrade_connection_state}, + {ok, State#state{serialize = NSerialize}} + end; + +system_code_change(State, _Mod, Vsn, _Extra) + when Vsn == "4.2.0"; + Vsn == "4.2.1" -> + Channel = State#state.channel, + NChannel = + case element(10, Channel) of + undefined -> Channel; + Quoter -> + Zone = element(2, Quoter), + Cks = element(3, Quoter), + NCks = [case Name == overall_messages_routing of + true -> Ck#{consumer => Zone}; + _ -> Ck + end || Ck = #{name := Name} <- Cks], + setelement(10, Channel, setelement(3, Quoter, NCks)) + end, + + NParseState = + case State#state.parse_state of + Ps = {none, _} -> Ps; + Ps when is_function(Ps) -> + case erlang:fun_info(Ps, env) of + {_, [Hdr, Opts]} -> + {{len, #{hdr => Hdr, len => {1,0}}}, Opts}; + {_, [Bin, Hdr, Len, Opts]} when is_binary(Bin) -> + {{body, #{hdr => Hdr, len => Len, rest => Bin}}, Opts}; + {_, [Hdr, Multip, Len, Opts]} -> + {{len, #{hdr => Hdr, len => {Multip, Len}}}, Opts} + end + end, + + {_, [Ver, MaxSize]} = erlang:fun_info(State#state.serialize, env), + NSerialize = #{version => Ver, max_size => MaxSize}, + + {ok, State#state{channel = NChannel, parse_state = NParseState, serialize = NSerialize}}; + system_code_change(State, _Mod, _OldVsn, _Extra) -> {ok, State}. diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index cd41a935c..d09b8416e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -27,27 +27,16 @@ , parse/2 , serialize_fun/0 , serialize_fun/1 - , serialize/1 - , serialize/2 - ]). - -%% The new version APIs to avoid saving -%% anonymous func --export([ parse2/1 - , parse2/2 , serialize_opts/0 , serialize_opts/1 , serialize_pkt/2 + , serialize/1 + , serialize/2 ]). -export_type([ options/0 , parse_state/0 , parse_result/0 - , serialize_fun/0 - ]). - --export_type([ parse_state2/0 - , parse_result2/0 , serialize_opts/0 ]). @@ -56,17 +45,11 @@ version => emqx_types:version() }). --type(parse_state() :: {none, options()} | cont_fun()). --type(parse_state2() :: {none, options()} | {cont_state(), options()}). +-type(parse_state() :: {none, options()} | {cont_state(), options()}). --type(parse_result() :: {more, cont_fun()} +-type(parse_result() :: {more, parse_state()} | {ok, emqx_types:packet(), binary(), parse_state()}). --type(parse_result2() :: {more, parse_state()} - | {ok, emqx_types:packet(), binary(), parse_state()}). - --type(cont_fun() :: fun((binary()) -> parse_result())). - -type(cont_state() :: {Stage :: len | body, State :: #{hdr := #mqtt_packet_header{}, len := {pos_integer(), non_neg_integer()} | non_neg_integer(), @@ -74,8 +57,6 @@ } }). --type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())). - -type(serialize_opts() :: options()). -define(none(Options), {none, Options}). @@ -108,96 +89,13 @@ merge_opts(Options) -> %% Parse MQTT Frame %%-------------------------------------------------------------------- --spec(parse2(binary()) -> parse_result2()). -parse2(Bin) -> - parse2(Bin, initial_parse_state()). - --spec(parse2(binary(), parse_state()) -> parse_result2()). -parse2(<<>>, {none, Options}) -> - {more, {none, Options}}; -parse2(<>, - {none, Options = #{strict_mode := StrictMode}}) -> - %% Validate header if strict mode. - StrictMode andalso validate_header(Type, Dup, QoS, Retain), - Header = #mqtt_packet_header{type = Type, - dup = bool(Dup), - qos = QoS, - retain = bool(Retain) - }, - Header1 = case fixqos(Type, QoS) of - QoS -> Header; - FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} - end, - parse_remaining_len2(Rest, Header1, Options); - -parse2(Bin, {{len, #{hdr := Header, - len := {Multiplier, Length}} - }, Options}) when is_binary(Bin) -> - parse_remaining_len2(Bin, Header, Multiplier, Length, Options); -parse2(Bin, {{body, #{hdr := Header, - len := Length, - rest := Rest} - }, Options}) when is_binary(Bin) -> - parse_frame2(<>, Header, Length, Options). - -parse_remaining_len2(<<>>, Header, Options) -> - {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; -parse_remaining_len2(Rest, Header, Options) -> - parse_remaining_len2(Rest, Header, 1, 0, Options). - -parse_remaining_len2(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) - when Length > MaxSize -> - error(frame_too_large); -parse_remaining_len2(<<>>, Header, Multiplier, Length, Options) -> - {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; -%% Match DISCONNECT without payload -parse_remaining_len2(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> - Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), - {ok, Packet, Rest, ?none(Options)}; -%% Match PINGREQ. -parse_remaining_len2(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> - parse_frame2(Rest, Header, 0, Options); -%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... -parse_remaining_len2(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> - parse_frame2(Rest, Header, 2, Options); -parse_remaining_len2(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> - parse_remaining_len2(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); -parse_remaining_len2(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, - Options = #{max_size := MaxSize}) -> - FrameLen = Value + Len * Multiplier, - if - FrameLen > MaxSize -> error(frame_too_large); - true -> parse_frame2(Rest, Header, FrameLen, Options) - end. - -parse_frame2(Bin, Header, 0, Options) -> - {ok, packet(Header), Bin, ?none(Options)}; - -parse_frame2(Bin, Header, Length, Options) -> - case Bin of - <> -> - case parse_packet(Header, FrameBin, Options) of - {Variable, Payload} -> - {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; - Variable = #mqtt_packet_connect{proto_ver = Ver} -> - {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})}; - Variable -> - {ok, packet(Header, Variable), Rest, ?none(Options)} - end; - TooShortBin -> - {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}} - end. - -%% Deprecated parse funcs -%% It should be removed after 4.2.x - -spec(parse(binary()) -> parse_result()). parse(Bin) -> parse(Bin, initial_parse_state()). -spec(parse(binary(), parse_state()) -> parse_result()). parse(<<>>, {none, Options}) -> - {more, fun(Bin) -> parse(Bin, {none, Options}) end}; + {more, {none, Options}}; parse(<>, {none, Options = #{strict_mode := StrictMode}}) -> %% Validate header if strict mode. @@ -212,11 +110,19 @@ parse(<>, FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} end, parse_remaining_len(Rest, Header1, Options); -parse(Bin, Cont) when is_binary(Bin), is_function(Cont) -> - Cont(Bin). + +parse(Bin, {{len, #{hdr := Header, + len := {Multiplier, Length}} + }, Options}) when is_binary(Bin) -> + parse_remaining_len(Bin, Header, Multiplier, Length, Options); +parse(Bin, {{body, #{hdr := Header, + len := Length, + rest := Rest} + }, Options}) when is_binary(Bin) -> + parse_frame(<>, Header, Length, Options). parse_remaining_len(<<>>, Header, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end}; + {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(Rest, Header, 1, 0, Options). @@ -224,7 +130,7 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> error(frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; + {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; %% Match DISCONNECT without payload parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), @@ -260,9 +166,7 @@ parse_frame(Bin, Header, Length, Options) -> {ok, packet(Header, Variable), Rest, ?none(Options)} end; TooShortBin -> - {more, fun(BinMore) -> - parse_frame(<>, Header, Length, Options) - end} + {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}} end. -compile({inline, [packet/1, packet/2, packet/3]}). @@ -870,4 +774,3 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. - diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index fad897d92..447e04fea 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -128,13 +128,7 @@ consume(Pubs, Bytes, #{name := Name, consumer := Cons}) -> _ -> case is_overall_limiter(Name) of true -> - {_, Intv} = case erlang:is_function(Cons) of - true -> %% Compatible with hot-upgrade from e4.2.0, e4.2.1. - %% It should be removed after 4.3.0 - {env, [Zone|_]} = erlang:fun_info(Cons, env), - esockd_limiter:consume({Zone, Name}, Tokens); - _ -> esockd_limiter:consume({Cons, Name}, Tokens) - end, + {_, Intv} = esockd_limiter:consume({Cons, Name}, Tokens), {Intv, Cons}; _ -> esockd_rate_limit:check(Tokens, Cons) diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 7ef60bc69..5d4e146db 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -26,7 +26,6 @@ all() -> [{group, parse}, - {group, parse2}, {group, connect}, {group, connack}, {group, publish}, @@ -45,8 +44,6 @@ groups() -> [t_parse_cont, t_parse_frame_too_large ]}, - {parse2, [parallel], - [t_parse_cont2]}, {connect, [parallel], [t_serialize_parse_v3_connect, t_serialize_parse_v4_connect, @@ -132,16 +129,6 @@ t_parse_frame_too_large(_) -> ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})), ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). -t_parse_cont2(_) -> - Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}), - ParseState = emqx_frame:initial_parse_state(), - <> = serialize_to_binary(Packet), - {more, ContParse} = emqx_frame:parse2(<<>>, ParseState), - {more, ContParse1} = emqx_frame:parse2(HdrBin, ContParse), - {more, ContParse2} = emqx_frame:parse2(LenBin, ContParse1), - {more, ContParse3} = emqx_frame:parse2(<<>>, ContParse2), - {ok, Packet, <<>>, _} = emqx_frame:parse2(RestBin, ContParse3). - t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, @@ -522,7 +509,7 @@ parse_serialize(Packet, Opts) when is_map(Opts) -> Ver = maps:get(version, Opts, ?MQTT_PROTO_V4), Bin = iolist_to_binary(emqx_frame:serialize(Packet, Ver)), ParseState = emqx_frame:initial_parse_state(Opts), - {ok, NPacket, <<>>, _} = emqx_frame:parse2(Bin, ParseState), + {ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState), NPacket. serialize_to_binary(Packet) -> diff --git a/test/emqx_vm_SUITE.erl b/test/emqx_vm_SUITE.erl index 19545f0ea..4bc231ad0 100644 --- a/test/emqx_vm_SUITE.erl +++ b/test/emqx_vm_SUITE.erl @@ -80,7 +80,7 @@ t_get_port_info(_Config) -> {ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]), emqx_vm:get_port_info(), ok = gen_tcp:close(Sock), - [Port | _] = erlang:ports(). + [_Port | _] = erlang:ports(). t_transform_port(_Config) -> [Port | _] = erlang:ports(), From ae9449a0049c15a141710715ef08f1ab60c505ec Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 3 Dec 2020 17:13:06 +0800 Subject: [PATCH 5/9] chore: eliminate diaylzer warnings --- src/emqx_connection.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ad3694812..d2a6f2afa 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -108,6 +108,7 @@ , init_state/3 , run_loop/2 , system_terminate/4 + , system_code_change/4 ]}). -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) From 505257b25bf70ba68e60859fa080835971f6bc35 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 5 Dec 2020 17:22:44 +0800 Subject: [PATCH 6/9] chore(appup): remeove the code_change codes --- src/emqx_connection.erl | 56 ----------------------------------------- 1 file changed, 56 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index d2a6f2afa..cdd57e752 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -442,62 +442,6 @@ system_continue(Parent, _Debug, State) -> system_terminate(Reason, _Parent, _Debug, State) -> terminate(Reason, State). -system_code_change(State, _Mod, {down, Vsn}, _Extra) - when Vsn == "4.2.0"; - Vsn == "4.2.1" -> - Channel = State#state.channel, - NSerialize = emqx_frame:serialize_fun(State#state.serialize), - case {State#state.parse_state, element(10, Channel)} of - {{none, _}, undefined} -> - {ok, State#state{serialize = NSerialize}}; - {Ps, Quota} -> - %% BACKW: e4.2.0-e4.2.1 - %% We can't recover/reconstruct anonymous function state for - %% Parser or Quota consumer. So just close it. - ?LOG(error, "Unsupport downgrade connection ~0p, peername: ~0p." - " Due to it have an incomplete frame or unsafed quota counter," - " parser_state: ~0p, quota: ~0p." - " Force close it now!!!", [self(), State#state.peername, Ps, Quota]), - self() ! {close, unsupported_downgrade_connection_state}, - {ok, State#state{serialize = NSerialize}} - end; - -system_code_change(State, _Mod, Vsn, _Extra) - when Vsn == "4.2.0"; - Vsn == "4.2.1" -> - Channel = State#state.channel, - NChannel = - case element(10, Channel) of - undefined -> Channel; - Quoter -> - Zone = element(2, Quoter), - Cks = element(3, Quoter), - NCks = [case Name == overall_messages_routing of - true -> Ck#{consumer => Zone}; - _ -> Ck - end || Ck = #{name := Name} <- Cks], - setelement(10, Channel, setelement(3, Quoter, NCks)) - end, - - NParseState = - case State#state.parse_state of - Ps = {none, _} -> Ps; - Ps when is_function(Ps) -> - case erlang:fun_info(Ps, env) of - {_, [Hdr, Opts]} -> - {{len, #{hdr => Hdr, len => {1,0}}}, Opts}; - {_, [Bin, Hdr, Len, Opts]} when is_binary(Bin) -> - {{body, #{hdr => Hdr, len => Len, rest => Bin}}, Opts}; - {_, [Hdr, Multip, Len, Opts]} -> - {{len, #{hdr => Hdr, len => {Multip, Len}}}, Opts} - end - end, - - {_, [Ver, MaxSize]} = erlang:fun_info(State#state.serialize, env), - NSerialize = #{version => Ver, max_size => MaxSize}, - - {ok, State#state{channel = NChannel, parse_state = NParseState, serialize = NSerialize}}; - system_code_change(State, _Mod, _OldVsn, _Extra) -> {ok, State}. From d00ea4875270bb06111aae92df8ad8670feb4592 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 7 Dec 2020 14:35:38 +0800 Subject: [PATCH 7/9] feature(tcp): alarm when tcp connection congested --- etc/emqx.conf | 7 ++++++ priv/emqx.schema | 11 +++++++++ src/emqx_alarm.erl | 2 ++ src/emqx_connection.erl | 41 +++++++++++++++++++++++++++++++--- test/emqx_connection_SUITE.erl | 5 +++++ 5 files changed, 63 insertions(+), 3 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index ddc1ed755..daa69d713 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1139,6 +1139,13 @@ listener.tcp.external.send_timeout_close = on ## Value: on | off ## listener.tcp.external.tune_buffer = off +## The socket is set to a busy state when the amount of data queued internally +## by the ERTS socket implementation reaches this limit. +## +## Value: on | off +## Defaults to 1MB +## listener.tcp.external.high_watermark = 1MB + ## The TCP_NODELAY flag for MQTT connections. Small amounts of data are ## sent immediately if the option is enabled. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index ccecd8315..3abd63f76 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1244,6 +1244,11 @@ end}. hidden ]}. +{mapping, "listener.tcp.$name.high_watermark", "emqx.listeners", [ + {datatype, bytesize}, + {default, "1MB"} + ]}. + {mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden @@ -1336,6 +1341,11 @@ end}. hidden ]}. +{mapping, "listener.ssl.$name.high_watermark", "emqx.listeners", [ + {datatype, bytesize}, + {default, "1MB"} + ]}. + {mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden @@ -1844,6 +1854,7 @@ end}. {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {high_watermark, cuttlefish:conf_get(Prefix ++ ".high_watermark", Conf, undefined)}, {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) end, diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index 37d6fe1e8..f224f5c03 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -344,6 +344,8 @@ normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); +normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) -> + list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId])); normalize_message(_Name, _UnknownDetails) -> <<"Unknown alarm">>. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index cdd57e752..0714d1798 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -103,6 +103,9 @@ -define(ENABLED(X), (X =/= undefined)). +-define(ALARM_TCP_CONGEST(Channel), + list_to_binary(io_lib:format("mqtt_conn/congested/~s", [emqx_channel:info(clientid, Channel)]))). + -dialyzer({no_match, [info/2]}). -dialyzer({nowarn_function, [ init/4 , init_state/3 @@ -429,6 +432,7 @@ handle_msg(Msg, State) -> terminate(Reason, State = #state{channel = Channel}) -> ?LOG(debug, "Terminated due to ~p", [Reason]), + emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)), emqx_channel:terminate(Reason, Channel), close_socket(State), exit(Reason). @@ -595,11 +599,12 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> %% Send data -spec(send(iodata(), state()) -> ok). -send(IoData, #state{transport = Transport, socket = Socket}) -> +send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) -> Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), emqx_pd:inc_counter(outgoing_bytes, Oct), - case Transport:async_send(Socket, IoData) of + maybe_warn_congestion(Socket, Transport, Channel), + case Transport:async_send(Socket, IoData, [nosuspend]) of ok -> ok; Error = {error, _Reason} -> %% Send an inet_reply to postpone handling the error @@ -607,6 +612,36 @@ send(IoData, #state{transport = Transport, socket = Socket}) -> ok end. +maybe_warn_congestion(Socket, Transport, Channel) -> + IsCongestAlarmSet = is_congestion_alarm_set(), + case is_congested(Socket, Transport) of + true when not IsCongestAlarmSet -> + {ok, Stat} = Transport:getstat(Socket, [recv_cnt, recv_oct, send_cnt, send_oct]), + {ok, Opts} = Transport:getopts(Socket, [high_watermark,high_msgq_watermark, sndbuf, recbuf, buffer]), + ok = set_congestion_alarm(), + emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), maps:from_list(Stat++Opts)); + false when IsCongestAlarmSet -> + ok = clear_congestion_alarm(), + emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)); + _ -> ok + end. + +is_congested(Socket, Transport) -> + case Transport:getstat(Socket, [send_pend]) of + {ok, [{send_pend, N}]} when N > 0 -> true; + _ -> false + end. + +is_congestion_alarm_set() -> + case erlang:get(conn_congested) of + true -> true; + _ -> false + end. +set_congestion_alarm() -> + erlang:put(conn_congested, true), ok. +clear_congestion_alarm() -> + erlang:put(conn_congested, false), ok. + %%-------------------------------------------------------------------- %% Handle Info @@ -622,7 +657,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) -> end; handle_info({sock_error, Reason}, State) -> - ?LOG(debug, "Socket error: ~p", [Reason]), + Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]), handle_info({sock_closed, Reason}, close_socket(State)); handle_info(Info, State) -> diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index b908fe4ab..2538aeecb 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -52,6 +52,9 @@ init_per_suite(Config) -> ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end), + ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end), + ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end), + Config. end_per_suite(_Config) -> @@ -62,6 +65,7 @@ end_per_suite(_Config) -> ok = meck:unload(emqx_pd), ok = meck:unload(emqx_metrics), ok = meck:unload(emqx_hooks), + ok = meck:unload(emqx_alarm), ok. init_per_testcase(_TestCase, Config) -> @@ -77,6 +81,7 @@ init_per_testcase(_TestCase, Config) -> {ok, [{K, 0} || K <- Options]} end), ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end), + ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end), ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end), Config. From 0ee489a9bec7354537108bf08e5695f6de5cdb72 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 7 Dec 2020 15:05:20 +0800 Subject: [PATCH 8/9] fix(congestion): change the conn congestion alarm msg body --- src/emqx_channel.erl | 14 ++++++++++++++ src/emqx_connection.erl | 29 +++++++++++++++++++++++++---- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 71b89f153..d9356ccd7 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -131,6 +131,20 @@ info(zone, #channel{clientinfo = #{zone := Zone}}) -> Zone; info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; +info(username, #channel{clientinfo = #{username := Username}}) -> + Username; +info(socktype, #channel{conninfo = #{socktype := SockType}}) -> + SockType; +info(peername, #channel{conninfo = #{peername := Peername}}) -> + Peername; +info(sockname, #channel{conninfo = #{sockname := Sockname}}) -> + Sockname; +info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) -> + ProtoName; +info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) -> + ProtoVer; +info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) -> + ConnectedAt; info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 0714d1798..f5635a147 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -104,7 +104,16 @@ -define(ENABLED(X), (X =/= undefined)). -define(ALARM_TCP_CONGEST(Channel), - list_to_binary(io_lib:format("mqtt_conn/congested/~s", [emqx_channel:info(clientid, Channel)]))). + list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s", + [emqx_channel:info(clientid, Channel), + emqx_channel:info(username, Channel)]))). + +-define(ALARM_CONN_INFO_KEYS, [ + socktype, sockname, peername, + clientid, username, proto_name, proto_ver, connected_at +]). +-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). +-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). -dialyzer({no_match, [info/2]}). -dialyzer({nowarn_function, [ init/4 @@ -616,10 +625,9 @@ maybe_warn_congestion(Socket, Transport, Channel) -> IsCongestAlarmSet = is_congestion_alarm_set(), case is_congested(Socket, Transport) of true when not IsCongestAlarmSet -> - {ok, Stat} = Transport:getstat(Socket, [recv_cnt, recv_oct, send_cnt, send_oct]), - {ok, Opts} = Transport:getopts(Socket, [high_watermark,high_msgq_watermark, sndbuf, recbuf, buffer]), ok = set_congestion_alarm(), - emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), maps:from_list(Stat++Opts)); + emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), + tcp_congestion_alarm_details(Socket, Transport, Channel)); false when IsCongestAlarmSet -> ok = clear_congestion_alarm(), emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)); @@ -642,6 +650,19 @@ set_congestion_alarm() -> clear_congestion_alarm() -> erlang:put(conn_congested, false), ok. +tcp_congestion_alarm_details(Socket, Transport, Channel) -> + {ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS), + {ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS), + SockInfo = maps:from_list(Stat ++ Opts), + ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]), + maps:merge(ConnInfo, SockInfo). + +conn_info(Key, Channel) when Key =:= sockname; Key =:= peername -> + {IPStr, Port} = emqx_channel:info(Key, Channel), + {Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])}; +conn_info(Key, Channel) -> + {Key, emqx_channel:info(Key, Channel)}. + %%-------------------------------------------------------------------- %% Handle Info From 28b0e874f6a08ab1d68744d2797d18db3030fc36 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 7 Dec 2020 15:53:22 +0800 Subject: [PATCH 9/9] fix(tests): test cases for receiving will msgs --- test/mqtt_protocol_v5_SUITE.erl | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index 50fc9c04d..33d7d923d 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -348,16 +348,19 @@ t_connect_will_delay_interval(_) -> {will_topic, Topic}, {will_payload, Payload}, {will_props, #{'Will-Delay-Interval' => 3}}, - {properties, #{'Session-Expiry-Interval' => 7200}}, - {keepalive, 2} + {properties, #{'Session-Expiry-Interval' => 7200}} ]), {ok, _} = emqtt:connect(Client2), - - timer:sleep(5000), + %% terminate the client without sending the DISCONNECT + emqtt:stop(Client2), + %% should not get the will msg in 2.5s + timer:sleep(1500), ?assertEqual(0, length(receive_messages(1))), - timer:sleep(7000), + %% should get the will msg in 4.5s + timer:sleep(1000), ?assertEqual(1, length(receive_messages(1))), + %% try again, but let the session expire quickly {ok, Client3} = emqtt:start_link([ {clientid, <<"t_connect_will_delay_interval">>}, {proto_ver, v5}, @@ -367,14 +370,16 @@ t_connect_will_delay_interval(_) -> {will_topic, Topic}, {will_payload, Payload}, {will_props, #{'Will-Delay-Interval' => 7200}}, - {properties, #{'Session-Expiry-Interval' => 3}}, - {keepalive, 2} + {properties, #{'Session-Expiry-Interval' => 3}} ]), {ok, _} = emqtt:connect(Client3), - - timer:sleep(5000), + %% terminate the client without sending the DISCONNECT + emqtt:stop(Client3), + %% should not get the will msg in 2.5s + timer:sleep(1500), ?assertEqual(0, length(receive_messages(1))), - timer:sleep(7000), + %% should get the will msg in 4.5s + timer:sleep(1000), ?assertEqual(1, length(receive_messages(1))), ok = emqtt:disconnect(Client1),