diff --git a/Makefile b/Makefile index c4e0dc020..9aa90f94c 100644 --- a/Makefile +++ b/Makefile @@ -39,8 +39,6 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message -CT_SUITES = emqx_portal - CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index fbcfcc059..48c8c0e1e 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -35,7 +35,6 @@ -export([pubcomp/2, pubcomp/3, pubcomp/4]). -export([subscriptions/1]). -export([info/1, stop/1]). --export([next_packet_id/1, next_packet_id/2]). %% For test cases -export([pause/1, resume/1]). @@ -390,7 +389,7 @@ publish(Client, Topic, Properties, Payload, Opts) props = Properties, payload = iolist_to_binary(Payload)}). --spec(publish(client(), #mqtt_msg{} | [#mqtt_msg{}]) -> ok | {ok, packet_id()} | {error, term()}). +-spec(publish(client(), #mqtt_msg{}) -> ok | {ok, packet_id()} | {error, term()}). publish(Client, Msg) -> gen_statem:call(Client, {publish, Msg}). @@ -422,19 +421,6 @@ disconnect(Client, ReasonCode) -> disconnect(Client, ReasonCode, Properties) -> gen_statem:call(Client, {disconnect, ReasonCode, Properties}). --spec next_packet_id(packet_id()) -> packet_id(). -next_packet_id(?MAX_PACKET_ID) -> 1; -next_packet_id(Id) -> Id + 1. - --spec next_packet_id(packet_id(), integer()) -> packet_id(). -next_packet_id(Id, Bump) -> - true = (Bump < ?MAX_PACKET_ID div 2), %% assert - N = Id + Bump, - case N > ?MAX_PACKET_ID of - true -> N - ?MAX_PACKET_ID; - false -> N - end. - %%------------------------------------------------------------------------------ %% For test cases %%------------------------------------------------------------------------------ @@ -801,26 +787,23 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) -> {stop_and_reply, Reason, [{reply, From, Error}]} end; -connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, State) - when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> - connected({call, From}, {publish, [Msg]}, State); - -%% when publishing a batch, {ok, BasePacketId} is returned, -%% following packet ids for the batch tail are mod (1 bsl 16) consecutive -connected({call, From}, {publish, Msgs}, - State = #state{inflight = Inflight, last_packet_id = PacketId}) when is_list(Msgs) -> - %% NOTE: to ensure API call atomicity, inflight buffer may overflow - case emqx_inflight:is_full(Inflight) of - true -> - {keep_state, State, [{reply, From, {error, inflight_full}}]}; - false -> - case send_batch(assign_packet_id(Msgs, PacketId), State) of - {ok, NewState} -> - {keep_state, ensure_retry_timer(NewState), [{reply, From, {ok, PacketId}}]}; - {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} - end - end; +connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, + State = #state{inflight = Inflight, last_packet_id = PacketId}) + when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> + case emqx_inflight:is_full(Inflight) of + true -> + {keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]}; + false -> + Msg1 = Msg#mqtt_msg{packet_id = PacketId}, + case send(Msg1, State) of + {ok, NewState} -> + Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight), + {keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}), + [{reply, From, {ok, PacketId}}]}; + {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} + end + end; connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics}, State = #state{last_packet_id = PacketId}) -> @@ -1349,24 +1332,13 @@ send_puback(Packet, State) -> {error, Reason} -> {stop, {shutdown, Reason}} end. -send_batch([], State) -> {ok, State}; -send_batch([Msg = #mqtt_msg{packet_id = PacketId} | Rest], - State = #state{inflight = Inflight}) -> - case send(Msg, State) of - {ok, NewState} -> - Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight), - send_batch(Rest, NewState#state{inflight = Inflight1}); - {error, Reason} -> - {error, Reason} - end. - send(Msg, State) when is_record(Msg, mqtt_msg) -> send(msg_to_packet(Msg), State); send(Packet, State = #state{socket = Sock, proto_ver = Ver}) when is_record(Packet, mqtt_packet) -> Data = emqx_frame:serialize(Packet, #{version => Ver}), - emqx_logger:debug("SEND Data: ~p", [Data]), + emqx_logger:debug("SEND Data: ~1000p", [Packet]), case emqx_client_sock:send(Sock, Data) of ok -> {ok, bump_last_packet_id(State)}; Error -> Error @@ -1402,15 +1374,12 @@ next_events(Packets) -> [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)]. %%------------------------------------------------------------------------------ -%% packet_id generation and assignment - -assign_packet_id(Msg = #mqtt_msg{}, Id) -> - Msg#mqtt_msg{packet_id = Id}; -assign_packet_id([H | T], Id) -> - [assign_packet_id(H, Id) | assign_packet_id(T, next_packet_id(Id))]; -assign_packet_id([], _Id) -> - []. +%% packet_id generation bump_last_packet_id(State = #state{last_packet_id = Id}) -> State#state{last_packet_id = next_packet_id(Id)}. +-spec next_packet_id(packet_id()) -> packet_id(). +next_packet_id(?MAX_PACKET_ID) -> 1; +next_packet_id(Id) -> Id + 1. + diff --git a/src/portal/emqx_portal_mqtt.erl b/src/portal/emqx_portal_mqtt.erl index de5bf6042..6c718e9e4 100644 --- a/src/portal/emqx_portal_mqtt.erl +++ b/src/portal/emqx_portal_mqtt.erl @@ -34,7 +34,8 @@ %% Messages towards ack collector process -define(RANGE(Min, Max), {Min, Max}). --define(SENT(PktIdRange), {sent, PktIdRange}). +-define(REF_IDS(Ref, Ids), {Ref, Ids}). +-define(SENT(RefIds), {sent, RefIds}). -define(ACKED(AnyPktId), {acked, AnyPktId}). -define(STOP(Ref), {stop, Ref}). @@ -49,8 +50,6 @@ start(Config) -> {ok, _} -> try subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])), - %% ack collector is always a new pid every reconnect. - %% use it as a connection reference {ok, Ref, #{ack_collector => AckCollector, client_pid => Pid}} catch @@ -100,16 +99,21 @@ safe_stop(Pid, StopF, Timeout) -> exit(Pid, kill) end. -send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, Batch) -> - case emqx_client:publish(ClientPid, Batch) of - {ok, BasePktId} -> - LastPktId = emqx_client:next_packet_id(BasePktId, length(Batch) - 1), - AckCollector ! ?SENT(?RANGE(BasePktId, LastPktId)), - %% return last pakcet id as batch reference - {ok, LastPktId}; +send(Conn, Batch) -> + send(Conn, Batch, []). + +send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest] = Batch, Acc) -> + case emqx_client:publish(ClientPid, Msg) of + {ok, PktId} when Rest =:= [] -> + %% last one sent + Ref = make_ref(), + AckCollector ! ?SENT(?REF_IDS(Ref, lists:reverse([PktId | Acc]))), + {ok, Ref}; + {ok, PktId} -> + send(Conn, Rest, [PktId | Acc]); {error, {_PacketId, inflight_full}} -> timer:sleep(100), - send(Conn, Batch); + send(Conn, Batch, Acc); {error, Reason} -> %% NOTE: There is no partial sucess of a batch and recover from the middle %% only to retry all messages in one batch @@ -126,9 +130,9 @@ ack_collector(Parent, ConnRef, Acked, Sent) -> exit(normal); ?ACKED(PktId) -> match_acks(Parent, queue:in(PktId, Acked), Sent); - ?SENT(Range) -> + ?SENT(RefIds) -> %% this message only happens per-batch, hence ++ is ok - match_acks(Parent, Acked, Sent ++ [Range]) + match_acks(Parent, Acked, Sent ++ [RefIds]) after 200 -> {Acked, Sent} @@ -140,12 +144,14 @@ match_acks(Parent, Acked, Sent) -> match_acks_1(Parent, queue:out(Acked), Sent). match_acks_1(_Parent, {empty, Empty}, Sent) -> {Empty, Sent}; -match_acks_1(Parent, {{value, PktId}, Acked}, [?RANGE(PktId, PktId) | Sent]) -> +match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId]) | Sent]) -> %% batch finished - ok = emqx_portal:handle_ack(Parent, PktId), + ok = emqx_portal:handle_ack(Parent, Ref), match_acks(Parent, Acked, Sent); -match_acks_1(Parent, {{value, PktId}, Acked}, [?RANGE(PktId, Max) | Sent]) -> - match_acks(Parent, Acked, [?RANGE(emqx_client:next_packet_id(PktId), Max) | Sent]). +match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId | RestIds]) | Sent]) -> + %% one message finished, but not the whole batch + match_acks(Parent, Acked, [?REF_IDS(Ref, RestIds) | Sent]). + %% When puback for QoS-1 message is received from remote MQTT broker %% NOTE: no support for QoS-2 diff --git a/test/emqx_portal_SUITE.erl b/test/emqx_portal_SUITE.erl index f42a9e7b1..3c380d684 100644 --- a/test/emqx_portal_SUITE.erl +++ b/test/emqx_portal_SUITE.erl @@ -39,7 +39,7 @@ init_per_suite(Config) -> _ -> ok end, - emqx_ct_broker_helpers:run_setup_steps([{log_leve, info} | Config]). + emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]). end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). diff --git a/test/emqx_portal_mqtt_tests.erl b/test/emqx_portal_mqtt_tests.erl index 8f513a853..311554a2f 100644 --- a/test/emqx_portal_mqtt_tests.erl +++ b/test/emqx_portal_mqtt_tests.erl @@ -18,7 +18,6 @@ send_and_ack_test() -> %% delegate from gen_rpc to rpc for unit test - Tester = self(), meck:new(emqx_client, [passthrough, no_history]), meck:expect(emqx_client, start_link, 1, fun(#{msg_handler := Hdlr}) -> @@ -28,14 +27,13 @@ send_and_ack_test() -> meck:expect(emqx_client, stop, 1, fun(Pid) -> Pid ! stop end), meck:expect(emqx_client, publish, 2, - fun(_Conn, Msgs) -> - case rand:uniform(100) of + fun(Client, Msg) -> + case rand:uniform(200) of 1 -> {error, {dummy, inflight_full}}; _ -> - BaseId = hd(Msgs), - Tester ! {published, Msgs}, - {ok, BaseId} + Client ! {publish, Msg}, + {ok, Msg} %% as packet id end end), try @@ -44,38 +42,19 @@ send_and_ack_test() -> {ok, Ref, Conn} = emqx_portal_mqtt:start(#{}), %% return last packet id as batch reference {ok, AckRef} = emqx_portal_mqtt:send(Conn, Batch), - %% as if the remote broker replied with puback - ok = fake_pubacks(Conn), %% expect batch ack - AckRef1= receive {batch_ack, Id} -> Id end, - %% asset received ack matches the batch ref returned in send API - ?assertEqual(AckRef, AckRef1), + receive {batch_ack, AckRef} -> ok end, ok = emqx_portal_mqtt:stop(Ref, Conn) after meck:unload(emqx_client) end. -fake_pubacks(#{client_pid := Client}) -> - #{puback := PubAckCallback} = get_hdlr(Client), +fake_client(#{puback := PubAckCallback} = Hdlr) -> receive - {published, Msgs} -> - lists:foreach( - fun(Id) -> - PubAckCallback(#{packet_id => Id, reason_code => ?RC_SUCCESS}) - end, Msgs) - end. - -get_hdlr(Client) -> - Client ! {get_hdlr, self()}, - receive {hdr, Hdlr} -> Hdlr end. - -fake_client(Hdlr) -> - receive - {get_hdlr, Pid} -> - Pid ! {hdr, Hdlr}, + {publish, PktId} -> + PubAckCallback(#{packet_id => PktId, reason_code => ?RC_SUCCESS}), fake_client(Hdlr); stop -> exit(normal) end. - diff --git a/test/emqx_portal_tests.erl b/test/emqx_portal_tests.erl index 3b2879b12..b44e02732 100644 --- a/test/emqx_portal_tests.erl +++ b/test/emqx_portal_tests.erl @@ -71,7 +71,7 @@ disturbance_test() -> %% buffer should continue taking in messages when disconnected buffer_when_disconnected_test_() -> - {timeout, 5000, fun test_buffer_when_disconnected/0}. + {timeout, 10000, fun test_buffer_when_disconnected/0}. test_buffer_when_disconnected() -> Ref = make_ref(), @@ -92,7 +92,7 @@ test_buffer_when_disconnected() -> Receiver ! {portal, Pid}, ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), Pid ! {disconnected, Ref, test}, - ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 2000), + ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000), ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), ok = emqx_portal:stop(?PORTAL_REG_NAME).