diff --git a/.gitignore b/.gitignore index 410abf79c..84f92705f 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,4 @@ Mnesia.*/ _checkouts rebar.config.rendered /rebar3 -rebar.lock +rebar.lock \ No newline at end of file diff --git a/etc/emqx.conf b/etc/emqx.conf index 24a3a3060..daa69d713 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1139,6 +1139,13 @@ listener.tcp.external.send_timeout_close = on ## Value: on | off ## listener.tcp.external.tune_buffer = off +## The socket is set to a busy state when the amount of data queued internally +## by the ERTS socket implementation reaches this limit. +## +## Value: on | off +## Defaults to 1MB +## listener.tcp.external.high_watermark = 1MB + ## The TCP_NODELAY flag for MQTT connections. Small amounts of data are ## sent immediately if the option is enabled. ## @@ -1317,6 +1324,11 @@ listener.ssl.external.access.1 = allow all ## Value: Duration listener.ssl.external.handshake_timeout = 15s +## Maximum number of non-self-issued intermediate certificates that can follow the peer certificate in a valid certification path. +## +## Value: Number +#listener.ssl.external.depth = 10 + ## Path to the file containing the user's private PEM-encoded key. ## ## See: http://erlang.org/doc/man/ssl.html diff --git a/priv/emqx.schema b/priv/emqx.schema index fd5badb68..3abd63f76 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1244,6 +1244,11 @@ end}. hidden ]}. +{mapping, "listener.tcp.$name.high_watermark", "emqx.listeners", [ + {datatype, bytesize}, + {default, "1MB"} + ]}. + {mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden @@ -1336,6 +1341,11 @@ end}. hidden ]}. +{mapping, "listener.ssl.$name.high_watermark", "emqx.listeners", [ + {datatype, bytesize}, + {default, "1MB"} + ]}. + {mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden @@ -1368,6 +1378,11 @@ end}. {datatype, {duration, ms}} ]}. +{mapping, "listener.ssl.$name.depth", "emqx.listeners", [ + {default, 10}, + {datatype, integer} +]}. + {mapping, "listener.ssl.$name.dhfile", "emqx.listeners", [ {datatype, string} ]}. @@ -1839,6 +1854,7 @@ end}. {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {high_watermark, cuttlefish:conf_get(Prefix ++ ".high_watermark", Conf, undefined)}, {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) end, @@ -1878,6 +1894,7 @@ end}. {ciphers, Ciphers}, {user_lookup_fun, UserLookupFun}, {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, + {depth, cuttlefish:conf_get(Prefix ++ ".depth", Conf, undefined)}, {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, diff --git a/src/emqx.appup.src b/src/emqx.appup.src index c36611f85..87ca0c005 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,48 +1,9 @@ %% -*-: erlang -*- -{DefaultLen, DefaultSize} = - case WordSize = erlang:system_info(wordsize) of - 8 -> % arch_64 - {10000, cuttlefish_bytesize:parse("64MB")}; - 4 -> % arch_32 - {1000, cuttlefish_bytesize:parse("32MB")} - end, -{"4.2.3", - [ - {"4.2.2", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.2.1", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []} - ]}, - {"4.2.0", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []}, - {apply, {application, set_env, - [emqx, force_shutdown_policy, - #{message_queue_len => DefaultLen, - max_heap_size => DefaultSize div WordSize}]}} - ]} - ], - [ - {"4.2.2", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.2.1", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []} - ]}, - {"4.2.0", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []} - ]} - ] +{VSN, + [ + {<<".*">>, []} + ], + [ + {<<".*">>, []} + ] }. diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index 37d6fe1e8..f224f5c03 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -344,6 +344,8 @@ normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); +normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) -> + list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId])); normalize_message(_Name, _UnknownDetails) -> <<"Unknown alarm">>. diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 71b89f153..d9356ccd7 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -131,6 +131,20 @@ info(zone, #channel{clientinfo = #{zone := Zone}}) -> Zone; info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; +info(username, #channel{clientinfo = #{username := Username}}) -> + Username; +info(socktype, #channel{conninfo = #{socktype := SockType}}) -> + SockType; +info(peername, #channel{conninfo = #{peername := Peername}}) -> + Peername; +info(sockname, #channel{conninfo = #{sockname := Sockname}}) -> + Sockname; +info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) -> + ProtoName; +info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) -> + ProtoVer; +info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) -> + ConnectedAt; info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 2ab7f6c1e..f5635a147 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -80,8 +80,8 @@ limit_timer :: maybe(reference()), %% Parse State parse_state :: emqx_frame:parse_state(), - %% Serialize function - serialize :: emqx_frame:serialize_fun(), + %% Serialize options + serialize :: emqx_frame:serialize_opts(), %% Channel State channel :: emqx_channel:channel(), %% GC State @@ -103,11 +103,24 @@ -define(ENABLED(X), (X =/= undefined)). +-define(ALARM_TCP_CONGEST(Channel), + list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s", + [emqx_channel:info(clientid, Channel), + emqx_channel:info(username, Channel)]))). + +-define(ALARM_CONN_INFO_KEYS, [ + socktype, sockname, peername, + clientid, username, proto_name, proto_ver, connected_at +]). +-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). +-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). + -dialyzer({no_match, [info/2]}). -dialyzer({nowarn_function, [ init/4 , init_state/3 , run_loop/2 , system_terminate/4 + , system_code_change/4 ]}). -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) @@ -203,7 +216,7 @@ init_state(Transport, Socket, Options) -> Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), - Serialize = emqx_frame:serialize_fun(), + Serialize = emqx_frame:serialize_opts(), Channel = emqx_channel:init(ConnInfo, Options), GcState = emqx_zone:init_gc_state(Zone), StatsTimer = emqx_zone:stats_timer(Zone), @@ -337,7 +350,7 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #state{idle_timer = IdleTimer}) -> ok = emqx_misc:cancel_timer(IdleTimer), - Serialize = emqx_frame:serialize_fun(ConnPkt), + Serialize = emqx_frame:serialize_opts(ConnPkt), NState = State#state{serialize = Serialize, idle_timer = undefined }, @@ -428,6 +441,7 @@ handle_msg(Msg, State) -> terminate(Reason, State = #state{channel = Channel}) -> ?LOG(debug, "Terminated due to ~p", [Reason]), + emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)), emqx_channel:terminate(Reason, Channel), close_socket(State), exit(Reason). @@ -578,7 +592,7 @@ handle_outgoing(Packet, State) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case Serialize(Packet) of + case emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), @@ -594,11 +608,12 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> %% Send data -spec(send(iodata(), state()) -> ok). -send(IoData, #state{transport = Transport, socket = Socket}) -> +send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) -> Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), emqx_pd:inc_counter(outgoing_bytes, Oct), - case Transport:async_send(Socket, IoData) of + maybe_warn_congestion(Socket, Transport, Channel), + case Transport:async_send(Socket, IoData, [nosuspend]) of ok -> ok; Error = {error, _Reason} -> %% Send an inet_reply to postpone handling the error @@ -606,6 +621,48 @@ send(IoData, #state{transport = Transport, socket = Socket}) -> ok end. +maybe_warn_congestion(Socket, Transport, Channel) -> + IsCongestAlarmSet = is_congestion_alarm_set(), + case is_congested(Socket, Transport) of + true when not IsCongestAlarmSet -> + ok = set_congestion_alarm(), + emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), + tcp_congestion_alarm_details(Socket, Transport, Channel)); + false when IsCongestAlarmSet -> + ok = clear_congestion_alarm(), + emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)); + _ -> ok + end. + +is_congested(Socket, Transport) -> + case Transport:getstat(Socket, [send_pend]) of + {ok, [{send_pend, N}]} when N > 0 -> true; + _ -> false + end. + +is_congestion_alarm_set() -> + case erlang:get(conn_congested) of + true -> true; + _ -> false + end. +set_congestion_alarm() -> + erlang:put(conn_congested, true), ok. +clear_congestion_alarm() -> + erlang:put(conn_congested, false), ok. + +tcp_congestion_alarm_details(Socket, Transport, Channel) -> + {ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS), + {ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS), + SockInfo = maps:from_list(Stat ++ Opts), + ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]), + maps:merge(ConnInfo, SockInfo). + +conn_info(Key, Channel) when Key =:= sockname; Key =:= peername -> + {IPStr, Port} = emqx_channel:info(Key, Channel), + {Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])}; +conn_info(Key, Channel) -> + {Key, emqx_channel:info(Key, Channel)}. + %%-------------------------------------------------------------------- %% Handle Info @@ -621,7 +678,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) -> end; handle_info({sock_error, Reason}, State) -> - ?LOG(debug, "Socket error: ~p", [Reason]), + Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]), handle_info({sock_closed, Reason}, close_socket(State)); handle_info(Info, State) -> diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 1c27548f7..d09b8416e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -27,6 +27,9 @@ , parse/2 , serialize_fun/0 , serialize_fun/1 + , serialize_opts/0 + , serialize_opts/1 + , serialize_pkt/2 , serialize/1 , serialize/2 ]). @@ -34,7 +37,7 @@ -export_type([ options/0 , parse_state/0 , parse_result/0 - , serialize_fun/0 + , serialize_opts/0 ]). -type(options() :: #{strict_mode => boolean(), @@ -42,14 +45,19 @@ version => emqx_types:version() }). --type(parse_state() :: {none, options()} | cont_fun()). +-type(parse_state() :: {none, options()} | {cont_state(), options()}). --type(parse_result() :: {more, cont_fun()} +-type(parse_result() :: {more, parse_state()} | {ok, emqx_types:packet(), binary(), parse_state()}). --type(cont_fun() :: fun((binary()) -> parse_result())). +-type(cont_state() :: {Stage :: len | body, + State :: #{hdr := #mqtt_packet_header{}, + len := {pos_integer(), non_neg_integer()} | non_neg_integer(), + rest => binary() + } + }). --type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())). +-type(serialize_opts() :: options()). -define(none(Options), {none, Options}). @@ -87,7 +95,7 @@ parse(Bin) -> -spec(parse(binary(), parse_state()) -> parse_result()). parse(<<>>, {none, Options}) -> - {more, fun(Bin) -> parse(Bin, {none, Options}) end}; + {more, {none, Options}}; parse(<>, {none, Options = #{strict_mode := StrictMode}}) -> %% Validate header if strict mode. @@ -102,11 +110,19 @@ parse(<>, FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} end, parse_remaining_len(Rest, Header1, Options); -parse(Bin, Cont) when is_binary(Bin), is_function(Cont) -> - Cont(Bin). + +parse(Bin, {{len, #{hdr := Header, + len := {Multiplier, Length}} + }, Options}) when is_binary(Bin) -> + parse_remaining_len(Bin, Header, Multiplier, Length, Options); +parse(Bin, {{body, #{hdr := Header, + len := Length, + rest := Rest} + }, Options}) when is_binary(Bin) -> + parse_frame(<>, Header, Length, Options). parse_remaining_len(<<>>, Header, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end}; + {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(Rest, Header, 1, 0, Options). @@ -114,7 +130,7 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> error(frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; + {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; %% Match DISCONNECT without payload parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), @@ -150,9 +166,7 @@ parse_frame(Bin, Header, Length, Options) -> {ok, packet(Header, Variable), Rest, ?none(Options)} end; TooShortBin -> - {more, fun(BinMore) -> - parse_frame(<>, Header, Length, Options) - end} + {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}} end. -compile({inline, [packet/1, packet/2, packet/3]}). @@ -443,6 +457,20 @@ serialize_fun(#{version := Ver, max_size := MaxSize}) -> end end. +serialize_opts() -> + ?DEFAULT_OPTIONS. + +serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> + MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), + #{version => ProtoVer, max_size => MaxSize}. + +serialize_pkt(Packet, #{version := Ver, max_size := MaxSize}) -> + IoData = serialize(Packet, Ver), + case is_too_large(IoData, MaxSize) of + true -> <<>>; + false -> IoData + end. + -spec(serialize(emqx_types:packet()) -> iodata()). serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4). @@ -746,4 +774,3 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. - diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index e3ff7512f..447e04fea 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -35,7 +35,7 @@ -type(checker() :: #{ name := name() , capacity := non_neg_integer() , interval := non_neg_integer() - , consumer := function() | esockd_rate_limit:bucket() + , consumer := esockd_rate_limit:bucket() | emqx_zone:zone() }). -type(name() :: conn_bytes_in @@ -53,6 +53,8 @@ -type(limiter() :: #limiter{}). +-dialyzer({nowarn_function, [consume/3]}). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -84,7 +86,7 @@ do_init_checker(Zone, {Name, {Capacity, Interval}}) -> _ -> esockd_limiter:create({Zone, Name}, Capacity, Interval) end, - Ck#{consumer => fun(I) -> esockd_limiter:consume({Zone, Name}, I) end}; + Ck#{consumer => Zone}; _ -> Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)} end. @@ -126,7 +128,7 @@ consume(Pubs, Bytes, #{name := Name, consumer := Cons}) -> _ -> case is_overall_limiter(Name) of true -> - {_, Intv} = Cons(Tokens), + {_, Intv} = esockd_limiter:consume({Cons, Name}, Tokens), {Intv, Cons}; _ -> esockd_rate_limit:check(Tokens, Cons) diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 33b882768..ee0e24dbb 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -70,8 +70,8 @@ limit_timer :: maybe(reference()), %% Parse State parse_state :: emqx_frame:parse_state(), - %% Serialize Fun - serialize :: emqx_frame:serialize_fun(), + %% Serialize options + serialize :: emqx_frame:serialize_opts(), %% Channel channel :: emqx_channel:channel(), %% GC State @@ -231,7 +231,7 @@ websocket_init([Req, Opts]) -> MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), - Serialize = emqx_frame:serialize_fun(), + Serialize = emqx_frame:serialize_opts(), Channel = emqx_channel:init(ConnInfo, Opts), GcState = emqx_zone:init_gc_state(Zone), StatsTimer = emqx_zone:stats_timer(Zone), @@ -292,7 +292,7 @@ websocket_info({cast, Msg}, State) -> handle_info(Msg, State); websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> - Serialize = emqx_frame:serialize_fun(ConnPkt), + Serialize = emqx_frame:serialize_opts(ConnPkt), NState = State#state{serialize = Serialize}, handle_incoming(Packet, cancel_idle_timer(NState)); @@ -544,7 +544,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case Serialize(Packet) of + case emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index b908fe4ab..2538aeecb 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -52,6 +52,9 @@ init_per_suite(Config) -> ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end), + ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end), + ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end), + Config. end_per_suite(_Config) -> @@ -62,6 +65,7 @@ end_per_suite(_Config) -> ok = meck:unload(emqx_pd), ok = meck:unload(emqx_metrics), ok = meck:unload(emqx_hooks), + ok = meck:unload(emqx_alarm), ok. init_per_testcase(_TestCase, Config) -> @@ -77,6 +81,7 @@ init_per_testcase(_TestCase, Config) -> {ok, [{K, 0} || K <- Options]} end), ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end), + ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end), ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end), Config. diff --git a/test/emqx_vm_SUITE.erl b/test/emqx_vm_SUITE.erl index 19545f0ea..4bc231ad0 100644 --- a/test/emqx_vm_SUITE.erl +++ b/test/emqx_vm_SUITE.erl @@ -80,7 +80,7 @@ t_get_port_info(_Config) -> {ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]), emqx_vm:get_port_info(), ok = gen_tcp:close(Sock), - [Port | _] = erlang:ports(). + [_Port | _] = erlang:ports(). t_transform_port(_Config) -> [Port | _] = erlang:ports(), diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index 50fc9c04d..33d7d923d 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -348,16 +348,19 @@ t_connect_will_delay_interval(_) -> {will_topic, Topic}, {will_payload, Payload}, {will_props, #{'Will-Delay-Interval' => 3}}, - {properties, #{'Session-Expiry-Interval' => 7200}}, - {keepalive, 2} + {properties, #{'Session-Expiry-Interval' => 7200}} ]), {ok, _} = emqtt:connect(Client2), - - timer:sleep(5000), + %% terminate the client without sending the DISCONNECT + emqtt:stop(Client2), + %% should not get the will msg in 2.5s + timer:sleep(1500), ?assertEqual(0, length(receive_messages(1))), - timer:sleep(7000), + %% should get the will msg in 4.5s + timer:sleep(1000), ?assertEqual(1, length(receive_messages(1))), + %% try again, but let the session expire quickly {ok, Client3} = emqtt:start_link([ {clientid, <<"t_connect_will_delay_interval">>}, {proto_ver, v5}, @@ -367,14 +370,16 @@ t_connect_will_delay_interval(_) -> {will_topic, Topic}, {will_payload, Payload}, {will_props, #{'Will-Delay-Interval' => 7200}}, - {properties, #{'Session-Expiry-Interval' => 3}}, - {keepalive, 2} + {properties, #{'Session-Expiry-Interval' => 3}} ]), {ok, _} = emqtt:connect(Client3), - - timer:sleep(5000), + %% terminate the client without sending the DISCONNECT + emqtt:stop(Client3), + %% should not get the will msg in 2.5s + timer:sleep(1500), ?assertEqual(0, length(receive_messages(1))), - timer:sleep(7000), + %% should get the will msg in 4.5s + timer:sleep(1000), ?assertEqual(1, length(receive_messages(1))), ok = emqtt:disconnect(Client1),