diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 84f55e762..856210331 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -241,8 +241,10 @@ info(mqueue_dropped, _Session) -> %% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); info(awaiting_rel_max, #{props := Conf}) -> maps:get(max_awaiting_rel, Conf); -info(await_rel_timeout, #{props := Conf}) -> - maps:get(await_rel_timeout, Conf). +info(await_rel_timeout, #{props := _Conf}) -> + %% TODO: currently this setting is ignored: + %% maps:get(await_rel_timeout, Conf). + 0. -spec stats(session()) -> emqx_types:stats(). stats(Session) -> @@ -438,9 +440,13 @@ pubcomp(_ClientInfo, PacketId, Session0) -> -spec deliver(clientinfo(), [emqx_types:deliver()], session()) -> {ok, replies(), session()}. -deliver(_ClientInfo, _Delivers, Session) -> - %% TODO: system messages end up here. - {ok, [], Session}. +deliver(ClientInfo, Delivers, Session0) -> + %% Durable sessions still have to handle some transient messages. + %% For example, retainer sends messages to the session directly. + Session = lists:foldl( + fun(Msg, Acc) -> enqueue_transient(ClientInfo, Msg, Acc) end, Session0, Delivers + ), + {ok, [], pull_now(Session)}. -spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. @@ -481,8 +487,8 @@ handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s := S = emqx_persistent_session_ds_state:commit(S0), From ! Ref, {ok, [], Session#{s => S}}; -handle_timeout(_ClientInfo, expire_awaiting_rel, Session) -> - %% TODO: stub +handle_timeout(_ClientInfo, Timeout, Session) -> + ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}), {ok, [], Session}. bump_last_alive(S0) -> @@ -871,6 +877,54 @@ process_batch( IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight ). +%%-------------------------------------------------------------------- +%% Transient messages +%%-------------------------------------------------------------------- + +enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) -> + %% TODO: Such messages won't be retransmitted, should the session + %% reconnect before transient messages are acked. + %% + %% Proper solution could look like this: session publishes + %% transient messages to a separate DS DB that serves as a queue, + %% then subscribes to a special system topic that contains the + %% queued messages. Since streams in this DB are exclusive to the + %% session, messages from the queue can be dropped as soon as they + %% are acked. + Subs = emqx_persistent_session_ds_state:get_subscriptions(S), + Msgs = [ + Msg + || SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []), + Msg <- begin + #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs), + emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS) + end + ], + lists:foldl(fun do_enqueue_transient/2, Session, Msgs). + +do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) -> + case Qos of + ?QOS_0 -> + S = S0, + Inflight = emqx_persistent_session_ds_inflight:push({undefined, Msg}, Inflight0); + ?QOS_1 -> + SeqNo = inc_seqno( + ?QOS_1, emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0) + ), + S = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), SeqNo, S0), + Inflight = emqx_persistent_session_ds_inflight:push({SeqNo, Msg}, Inflight0); + ?QOS_2 -> + SeqNo = inc_seqno( + ?QOS_2, emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0) + ), + S = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), SeqNo, S0), + Inflight = emqx_persistent_session_ds_inflight:push({SeqNo, Msg}, Inflight0) + end, + Session#{ + inflight => Inflight, + s => S + }. + %%-------------------------------------------------------------------- %% Buffer drain %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index f8ee11c08..a5c171f67 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -53,7 +53,7 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), - TCsNonGeneric = [t_choose_impl], + TCsNonGeneric = [t_choose_impl, t_transient], TCGroups = [{group, tcp}, {group, quic}, {group, ws}], [ {persistence_disabled, TCGroups}, @@ -265,7 +265,15 @@ messages(Topic, Payloads) -> messages(Topic, Payloads, ?QOS_2). messages(Topic, Payloads, QoS) -> - [#mqtt_msg{topic = Topic, payload = P, qos = QoS} || P <- Payloads]. + lists:map( + fun + (Bin) when is_binary(Bin) -> + #mqtt_msg{topic = Topic, payload = Bin, qos = QoS}; + (Msg = #mqtt_msg{}) -> + Msg#mqtt_msg{topic = Topic} + end, + Payloads + ). publish(Topic, Payload) -> publish(Topic, Payload, ?QOS_2). @@ -1103,6 +1111,93 @@ t_unsubscribe_replay(Config) -> ), ok = emqtt:disconnect(Sub1). +%% This testcase verifies that persistent sessions handle "transient" +%% mesages correctly. +%% +%% Transient messages are delivered to the channel directly, bypassing +%% the broker code that decides whether the messages should be +%% persisted or not, and therefore they are not persisted. +%% +%% `emqx_retainer' is an example of application that uses this +%% mechanism. +%% +%% This testcase creates the conditions when the transient messages +%% appear in the middle of the replay, to make sure the durable +%% session doesn't get confused and/or stuck if retained messages are +%% changed while the session was down. +t_transient(Config) -> + ConnFun = ?config(conn_fun, Config), + TopicPrefix = ?config(topic, Config), + ClientId = atom_to_binary(?FUNCTION_NAME), + ClientOpts = [ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 100}}, + {max_inflight, 100} + | Config + ], + Deliver = fun(Topic, Payload, QoS) -> + [Pid] = emqx_cm:lookup_channels(ClientId), + Msg = emqx_message:make(_From = <<"test">>, QoS, Topic, Payload), + Pid ! {deliver, Topic, Msg} + end, + Topic1 = <>, + Topic2 = <>, + Topic3 = <>, + %% 1. Start the client and subscribe to the topic: + {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]), + ?assertMatch({ok, _}, emqtt:ConnFun(Sub)), + ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, <>, qos2)), + %% 2. Publish regular messages: + publish(Topic1, <<"1">>, ?QOS_1), + publish(Topic1, <<"2">>, ?QOS_2), + Msgs1 = receive_messages(2), + [#{payload := <<"1">>, packet_id := PI1}, #{payload := <<"2">>, packet_id := PI2}] = Msgs1, + %% 3. Publish and recieve transient messages: + Deliver(Topic2, <<"3">>, ?QOS_0), + Deliver(Topic2, <<"4">>, ?QOS_1), + Deliver(Topic2, <<"5">>, ?QOS_2), + Msgs2 = receive_messages(3), + ?assertMatch( + [ + #{payload := <<"3">>, qos := ?QOS_0}, + #{payload := <<"4">>, qos := ?QOS_1}, + #{payload := <<"5">>, qos := ?QOS_2} + ], + Msgs2 + ), + %% 4. Publish more regular messages: + publish(Topic3, <<"6">>, ?QOS_1), + publish(Topic3, <<"7">>, ?QOS_2), + Msgs3 = receive_messages(2), + [#{payload := <<"6">>, packet_id := PI6}, #{payload := <<"7">>, packet_id := PI7}] = Msgs3, + %% 5. Reconnect the client: + ok = emqtt:disconnect(Sub), + {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]), + ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)), + %% 6. Recieve the historic messages and check that their packet IDs didn't change: + %% Note: durable session currenty WON'T replay transient messages. + ProcessMessage = fun(#{payload := P, packet_id := ID}) -> {ID, P} end, + ?assertMatch( + #{ + Topic1 := [{PI1, <<"1">>}, {PI2, <<"2">>}], + Topic3 := [{PI6, <<"6">>}, {PI7, <<"7">>}] + }, + maps:groups_from_list(fun get_msgpub_topic/1, ProcessMessage, receive_messages(7, 5_000)) + ), + %% 7. Finish off by sending messages to all the topics to make + %% sure none of the streams are blocked: + [publish(T, <<"fin">>, ?QOS_2) || T <- [Topic1, Topic2, Topic3]], + ?assertMatch( + #{ + Topic1 := [<<"fin">>], + Topic2 := [<<"fin">>], + Topic3 := [<<"fin">>] + }, + get_topicwise_order(receive_messages(3)) + ), + ok = emqtt:disconnect(Sub1). + t_multiple_subscription_matches(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config),