From 839f9dbedbded958ac9162e4f7e0d66acd49a002 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 24 Nov 2023 14:52:29 -0300 Subject: [PATCH 1/4] feat(ds): session expiry Fixes https://emqx.atlassian.net/browse/EMQX-11048 --- .../emqx_persistent_session_ds_SUITE.erl | 73 ++++++++++- apps/emqx/src/emqx_channel.erl | 6 +- apps/emqx/src/emqx_persistent_session_ds.erl | 117 +++++++++++++----- apps/emqx/src/emqx_persistent_session_ds.hrl | 3 +- apps/emqx/src/emqx_session.erl | 8 +- apps/emqx/src/emqx_session_mem.erl | 6 +- .../test/emqx_persistent_session_SUITE.erl | 41 +++++- 7 files changed, 209 insertions(+), 45 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 56246e743..7937c2fd4 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -221,9 +221,10 @@ t_session_subscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), + ConnInfo = #{}, ?assertMatch( #{subscriptions := #{SubTopicFilter := #{}}}, - erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) + erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]) ) end ), @@ -294,9 +295,10 @@ t_session_unsubscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), + ConnInfo = #{}, ?assertMatch( #{subscriptions := Subs = #{}} when map_size(Subs) =:= 0, - erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) + erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]) ), ok end @@ -387,3 +389,70 @@ do_t_session_discard(Params) -> end ), ok. + +t_session_expiration1(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Opts = #{ + clientid => ClientId, + sequence => [ + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}}, + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 1}}, #{}}, + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}} + ] + }, + do_t_session_expiration(Config, Opts). + +t_session_expiration2(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Opts = #{ + clientid => ClientId, + sequence => [ + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}}, + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{ + 'Session-Expiry-Interval' => 1 + }}, + {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}} + ] + }, + do_t_session_expiration(Config, Opts). + +do_t_session_expiration(_Config, Opts) -> + #{ + clientid := ClientId, + sequence := [ + {FirstConn, FirstDisconn}, + {SecondConn, SecondDisconn}, + {ThirdConn, ThirdDisconn} + ] + } = Opts, + CommonParams = #{proto_ver => v5, clientid => ClientId}, + ?check_trace( + begin + Params0 = maps:merge(CommonParams, FirstConn), + Client0 = start_client(Params0), + {ok, _} = emqtt:connect(Client0), + Info0 = maps:from_list(emqtt:info(Client0)), + ?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}), + emqtt:disconnect(Client0, ?RC_NORMAL_DISCONNECTION, FirstDisconn), + + Params1 = maps:merge(CommonParams, SecondConn), + Client1 = start_client(Params1), + {ok, _} = emqtt:connect(Client1), + Info1 = maps:from_list(emqtt:info(Client1)), + ?assertEqual(1, maps:get(session_present, Info1), #{info => Info1}), + emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn), + + ct:sleep(1_500), + + Params2 = maps:merge(CommonParams, ThirdConn), + Client2 = start_client(Params2), + {ok, _} = emqtt:connect(Client2), + Info2 = maps:from_list(emqtt:info(Client2)), + ?assertEqual(0, maps:get(session_present, Info2), #{info => Info2}), + emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 306341700..dd519568f 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1204,12 +1204,13 @@ handle_info( #channel{ conn_state = ConnState, clientinfo = ClientInfo, + conninfo = ConnInfo, session = Session } ) when ConnState =:= connected orelse ConnState =:= reauthenticating -> - {Intent, Session1} = emqx_session:disconnect(ClientInfo, Session), + {Intent, Session1} = emqx_session:disconnect(ClientInfo, ConnInfo, Session), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), Channel2 = Channel1#channel{session = Session1}, case maybe_shutdown(Reason, Intent, Channel2) of @@ -1321,7 +1322,8 @@ handle_timeout( {ok, Replies, NSession} -> handle_out(publish, Replies, Channel#channel{session = NSession}) end; -handle_timeout(_TRef, expire_session, Channel) -> +handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) -> + ok = emqx_session:destroy(Session), shutdown(expired, Channel); handle_timeout( _TRef, diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 76b54e34a..3d38d5e60 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -56,7 +56,7 @@ deliver/3, replay/3, handle_timeout/3, - disconnect/1, + disconnect/2, terminate/2 ]). @@ -74,7 +74,7 @@ -ifdef(TEST). -export([ - session_open/1, + session_open/2, list_all_sessions/0, list_all_subscriptions/0, list_all_streams/0, @@ -98,19 +98,22 @@ id := id(), %% When the session was created created_at := timestamp(), - %% When the session should expire - expires_at := timestamp() | never, + %% When the client last disconnected + disconnected_at := timestamp() | never, %% Client’s Subscriptions. subscriptions := #{topic_filter() => subscription()}, %% Inflight messages inflight := emqx_persistent_message_ds_replayer:inflight(), %% Receive maximum receive_maximum := pos_integer(), + %% Connection Info + conninfo := emqx_types:conninfo(), %% props := map() }. -type timestamp() :: emqx_utils_calendar:epoch_millisecond(). +-type millisecond() :: non_neg_integer(). -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). @@ -123,6 +126,12 @@ next_pkt_id ]). +-define(IS_EXPIRED(NOW_MS, DISCONNECTED_AT, EI), + (is_number(DisconnectedAt) andalso + is_number(EI) andalso + (NowMS >= DisconnectedAt + EI)) +). + -export_type([id/0]). %% @@ -146,7 +155,7 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> ok = emqx_cm:discard_session(ClientID), case maps:get(clean_start, ConnInfo, false) of false -> - case session_open(ClientID) of + case session_open(ClientID, ConnInfo) of Session0 = #{} -> ensure_timers(), ReceiveMaximum = receive_maximum(ConnInfo), @@ -161,9 +170,13 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> end. ensure_session(ClientID, ConnInfo, Conf) -> - Session = session_ensure_new(ClientID, Conf), + Session = session_ensure_new(ClientID, ConnInfo, Conf), ReceiveMaximum = receive_maximum(ConnInfo), - Session#{subscriptions => #{}, receive_maximum => ReceiveMaximum}. + Session#{ + conninfo => ConnInfo, + receive_maximum => ReceiveMaximum, + subscriptions => #{} + }. -spec destroy(session() | clientinfo()) -> ok. destroy(#{id := ClientID}) -> @@ -399,8 +412,9 @@ replay(_ClientInfo, [], Session = #{inflight := Inflight0}) -> %%-------------------------------------------------------------------- --spec disconnect(session()) -> {shutdown, session()}. -disconnect(Session = #{}) -> +-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. +disconnect(Session0, ConnInfo) -> + Session = session_set_disconnected_at_trans(Session0, ConnInfo, now_ms()), {shutdown, Session}. -spec terminate(Reason :: term(), session()) -> ok. @@ -530,47 +544,80 @@ storage() -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(id()) -> +-spec session_open(id(), emqx_types:conninfo()) -> session() | false. -session_open(SessionId) -> - ro_transaction(fun() -> +session_open(SessionId, NewConnInfo) -> + NowMS = now_ms(), + transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of - [Record = #session{}] -> - Session = export_session(Record), - DSSubs = session_read_subscriptions(SessionId), - Subscriptions = export_subscriptions(DSSubs), - Inflight = emqx_persistent_message_ds_replayer:open(SessionId), - Session#{ - subscriptions => Subscriptions, - inflight => Inflight - }; - [] -> + [Record0 = #session{disconnected_at = DisconnectedAt, conninfo = ConnInfo}] -> + EI = expiry_interval(ConnInfo), + case ?IS_EXPIRED(NowMS, DisconnectedAt, EI) of + true -> + %% Should we drop the session now, or leave it to session GC? + false; + false -> + %% new connection being established + Record1 = Record0#session{conninfo = NewConnInfo}, + Record = session_set_disconnected_at(Record1, never), + Session = export_session(Record), + DSSubs = session_read_subscriptions(SessionId), + Subscriptions = export_subscriptions(DSSubs), + Inflight = emqx_persistent_message_ds_replayer:open(SessionId), + Session#{ + conninfo => NewConnInfo, + inflight => Inflight, + subscriptions => Subscriptions + } + end; + _ -> false end end). --spec session_ensure_new(id(), _Props :: map()) -> +-spec session_ensure_new(id(), emqx_types:conninfo(), _Props :: map()) -> session(). -session_ensure_new(SessionId, Props) -> +session_ensure_new(SessionId, ConnInfo, Props) -> transaction(fun() -> ok = session_drop_subscriptions(SessionId), - Session = export_session(session_create(SessionId, Props)), + Session = export_session(session_create(SessionId, ConnInfo, Props)), Session#{ subscriptions => #{}, inflight => emqx_persistent_message_ds_replayer:new() } end). -session_create(SessionId, Props) -> +session_create(SessionId, ConnInfo, Props) -> Session = #session{ id = SessionId, - created_at = erlang:system_time(millisecond), - expires_at = never, + created_at = now_ms(), + disconnected_at = never, + conninfo = ConnInfo, props = Props }, ok = mnesia:write(?SESSION_TAB, Session, write), Session. +session_set_disconnected_at_trans(Session, NewConnInfo, DisconnectedAt) -> + #{id := SessionId} = Session, + transaction(fun() -> + case mnesia:read(?SESSION_TAB, SessionId, write) of + [#session{} = SessionRecord0] -> + SessionRecord = SessionRecord0#session{conninfo = NewConnInfo}, + _ = session_set_disconnected_at(SessionRecord, DisconnectedAt), + ok; + _ -> + %% log and crash? + ok + end + end), + Session#{conninfo := NewConnInfo, disconnected_at := DisconnectedAt}. + +session_set_disconnected_at(SessionRecord0, DisconnectedAt) -> + SessionRecord = SessionRecord0#session{disconnected_at = DisconnectedAt}, + ok = mnesia:write(?SESSION_TAB, SessionRecord, write), + SessionRecord. + %% @doc Called when a client reconnects with `clean session=true' or %% during session GC -spec session_drop(id()) -> ok. @@ -673,7 +720,7 @@ session_read_pubranges(DSSessionId, LockKind) -> new_subscription_id(DSSessionId, TopicFilter) -> %% Note: here we use _milliseconds_ to match with the timestamp %% field of `#message' record. - NowMS = erlang:system_time(millisecond), + NowMS = now_ms(), DSSubId = {DSSessionId, TopicFilter}, {DSSubId, NowMS}. @@ -681,6 +728,9 @@ new_subscription_id(DSSessionId, TopicFilter) -> subscription_id_to_topic_filter({_DSSessionId, TopicFilter}) -> TopicFilter. +now_ms() -> + erlang:system_time(millisecond). + %%-------------------------------------------------------------------- %% RPC targets (v1) %%-------------------------------------------------------------------- @@ -800,7 +850,7 @@ export_subscriptions(DSSubs) -> ). export_session(#session{} = Record) -> - export_record(Record, #session.id, [id, created_at, expires_at, props], #{}). + export_record(Record, #session.id, [id, created_at, disconnected_at, conninfo, props], #{}). export_subscription(#ds_sub{} = Record) -> export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}). @@ -832,11 +882,16 @@ receive_maximum(ConnInfo) -> %% indicates that it's optional. maps:get(receive_maximum, ConnInfo, 65_535). +-spec expiry_interval(conninfo()) -> millisecond(). +expiry_interval(ConnInfo) -> + maps:get(expiry_interval, ConnInfo, 0). + -ifdef(TEST). list_all_sessions() -> DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB), + ConnInfo = #{}, Sessions = lists:map( - fun(SessionID) -> {SessionID, session_open(SessionID)} end, + fun(SessionID) -> {SessionID, session_open(SessionID, ConnInfo)} end, DSSessionIds ), maps:from_list(Sessions). diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 653ac444a..cbdc00c09 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -73,7 +73,8 @@ id :: emqx_persistent_session_ds:id(), %% creation time created_at :: _Millisecond :: non_neg_integer(), - expires_at = never :: _Millisecond :: non_neg_integer() | never, + disconnected_at = never :: _Millisecond :: non_neg_integer() | never, + conninfo :: emqx_types:conninfo(), %% for future usage props = #{} :: map() }). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 64ef2e30d..108e8ec09 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -84,7 +84,7 @@ -export([ deliver/3, handle_timeout/3, - disconnect/2, + disconnect/3, terminate/3 ]). @@ -503,10 +503,10 @@ cancel_timer(Name, Timers) -> %%-------------------------------------------------------------------- --spec disconnect(clientinfo(), t()) -> +-spec disconnect(clientinfo(), eqmx_types:conninfo(), t()) -> {idle | shutdown, t()}. -disconnect(_ClientInfo, Session) -> - ?IMPL(Session):disconnect(Session). +disconnect(_ClientInfo, ConnInfo, Session) -> + ?IMPL(Session):disconnect(Session, ConnInfo). -spec terminate(clientinfo(), Reason :: term(), t()) -> ok. diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index d609435c0..178c71e12 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -87,7 +87,7 @@ deliver/3, replay/3, handle_timeout/3, - disconnect/1, + disconnect/2, terminate/2 ]). @@ -725,8 +725,8 @@ append(L1, L2) -> L1 ++ L2. %%-------------------------------------------------------------------- --spec disconnect(session()) -> {idle, session()}. -disconnect(Session = #session{}) -> +-spec disconnect(session(), emqx_types:conninfo()) -> {idle, session()}. +disconnect(Session = #session{}, _ConnInfo) -> % TODO: isolate expiry timer / timeout handling here? {idle, Session}. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 1be929c7f..b4946b7d3 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -347,8 +347,6 @@ t_connect_discards_existing_client(Config) -> end. %% [MQTT-3.1.2-23] -t_connect_session_expiry_interval(init, Config) -> skip_ds_tc(Config); -t_connect_session_expiry_interval('end', _Config) -> ok. t_connect_session_expiry_interval(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config), @@ -356,6 +354,45 @@ t_connect_session_expiry_interval(Config) -> Payload = <<"test message">>, ClientId = ?config(client_id, Config), + {ok, Client1} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client1, STopic, ?QOS_1), + ok = emqtt:disconnect(Client1), + + maybe_kill_connection_process(ClientId, Config), + + publish(Topic, Payload, ?QOS_1), + + {ok, Client2} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client2), + [Msg | _] = receive_messages(1), + ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), + ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), + ?assertEqual({ok, ?QOS_1}, maps:find(qos, Msg)), + ok = emqtt:disconnect(Client2). + +%% [MQTT-3.1.2-23] +%% TODO: un-skip after QoS 2 support is implemented in DS. +t_connect_session_expiry_interval_qos2(init, Config) -> skip_ds_tc(Config); +t_connect_session_expiry_interval_qos2('end', _Config) -> ok. +t_connect_session_expiry_interval_qos2(Config) -> + ConnFun = ?config(conn_fun, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payload = <<"test message">>, + ClientId = ?config(client_id, Config), + {ok, Client1} = emqtt:start_link([ {clientid, ClientId}, {proto_ver, v5}, From bd7a84fe3ed18e7e38179771bf1c0aaddb9418ec Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 26 Nov 2023 19:18:59 +0100 Subject: [PATCH 2/4] revert(ds): Don't duplicate the clean start in session_ds --- apps/emqx/src/emqx_persistent_session_ds.erl | 29 ++++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 3d38d5e60..e0be4eefc 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -153,19 +153,13 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> %% somehow isolate those idling not-yet-expired sessions into a separate process %% space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), - case maps:get(clean_start, ConnInfo, false) of + case session_open(ClientID, ConnInfo) of + Session0 = #{} -> + ensure_timers(), + ReceiveMaximum = receive_maximum(ConnInfo), + Session = Session0#{receive_maximum => ReceiveMaximum}, + {true, Session, []}; false -> - case session_open(ClientID, ConnInfo) of - Session0 = #{} -> - ensure_timers(), - ReceiveMaximum = receive_maximum(ConnInfo), - Session = Session0#{receive_maximum => ReceiveMaximum}, - {true, Session, []}; - false -> - false - end; - true -> - session_drop(ClientID), false end. @@ -554,7 +548,7 @@ session_open(SessionId, NewConnInfo) -> EI = expiry_interval(ConnInfo), case ?IS_EXPIRED(NowMS, DisconnectedAt, EI) of true -> - %% Should we drop the session now, or leave it to session GC? + session_drop(SessionId), false; false -> %% new connection being established @@ -831,8 +825,13 @@ session_drop_pubranges(DSSessionId) -> %%-------------------------------------------------------------------------------- transaction(Fun) -> - {atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun), - Res. + case mnesia:is_transaction() of + true -> + Fun(); + false -> + {atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun), + Res + end. ro_transaction(Fun) -> {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun), From 09c4e40511aa5591cf231e90f075d57b35e2eec1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Nov 2023 11:48:44 -0300 Subject: [PATCH 3/4] refactor(ds): rename `disconnected_at` to `last_alive_at`, add more assertions --- .../emqx_persistent_session_ds_SUITE.erl | 13 ++++++++ apps/emqx/src/emqx_persistent_session_ds.erl | 32 +++++++++---------- apps/emqx/src/emqx_persistent_session_ds.hrl | 2 +- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 7937c2fd4..05c1eb8f2 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -428,9 +429,13 @@ do_t_session_expiration(_Config, Opts) -> CommonParams = #{proto_ver => v5, clientid => ClientId}, ?check_trace( begin + Topic = <<"some/topic">>, Params0 = maps:merge(CommonParams, FirstConn), Client0 = start_client(Params0), {ok, _} = emqtt:connect(Client0), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, Topic, ?QOS_2), + Subs0 = emqx_persistent_session_ds:list_all_subscriptions(), + ?assertEqual(1, map_size(Subs0), #{subs => Subs0}), Info0 = maps:from_list(emqtt:info(Client0)), ?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}), emqtt:disconnect(Client0, ?RC_NORMAL_DISCONNECTION, FirstDisconn), @@ -440,6 +445,8 @@ do_t_session_expiration(_Config, Opts) -> {ok, _} = emqtt:connect(Client1), Info1 = maps:from_list(emqtt:info(Client1)), ?assertEqual(1, maps:get(session_present, Info1), #{info => Info1}), + Subs1 = emqtt:subscriptions(Client1), + ?assertEqual([], Subs1), emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn), ct:sleep(1_500), @@ -449,6 +456,12 @@ do_t_session_expiration(_Config, Opts) -> {ok, _} = emqtt:connect(Client2), Info2 = maps:from_list(emqtt:info(Client2)), ?assertEqual(0, maps:get(session_present, Info2), #{info => Info2}), + Subs2 = emqtt:subscriptions(Client2), + ?assertEqual([], Subs2), + emqtt:publish(Client2, Topic, <<"payload">>), + ?assertNotReceive({publish, #{topic := Topic}}), + %% ensure subscriptions are absent from table. + ?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()), emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn), ok diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index e0be4eefc..1429d6e97 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -98,8 +98,8 @@ id := id(), %% When the session was created created_at := timestamp(), - %% When the client last disconnected - disconnected_at := timestamp() | never, + %% When the client was last considered alive + last_alive_at := timestamp(), %% Client’s Subscriptions. subscriptions := #{topic_filter() => subscription()}, %% Inflight messages @@ -126,10 +126,10 @@ next_pkt_id ]). --define(IS_EXPIRED(NOW_MS, DISCONNECTED_AT, EI), - (is_number(DisconnectedAt) andalso +-define(IS_EXPIRED(NOW_MS, LAST_ALIVE_AT, EI), + (is_number(LAST_ALIVE_AT) andalso is_number(EI) andalso - (NowMS >= DisconnectedAt + EI)) + (NOW_MS >= LAST_ALIVE_AT + EI)) ). -export_type([id/0]). @@ -408,7 +408,7 @@ replay(_ClientInfo, [], Session = #{inflight := Inflight0}) -> -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. disconnect(Session0, ConnInfo) -> - Session = session_set_disconnected_at_trans(Session0, ConnInfo, now_ms()), + Session = session_set_last_alive_at_trans(Session0, ConnInfo, now_ms()), {shutdown, Session}. -spec terminate(Reason :: term(), session()) -> ok. @@ -544,16 +544,16 @@ session_open(SessionId, NewConnInfo) -> NowMS = now_ms(), transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of - [Record0 = #session{disconnected_at = DisconnectedAt, conninfo = ConnInfo}] -> + [Record0 = #session{last_alive_at = LastAliveAt, conninfo = ConnInfo}] -> EI = expiry_interval(ConnInfo), - case ?IS_EXPIRED(NowMS, DisconnectedAt, EI) of + case ?IS_EXPIRED(NowMS, LastAliveAt, EI) of true -> session_drop(SessionId), false; false -> %% new connection being established Record1 = Record0#session{conninfo = NewConnInfo}, - Record = session_set_disconnected_at(Record1, never), + Record = session_set_last_alive_at(Record1, never), Session = export_session(Record), DSSubs = session_read_subscriptions(SessionId), Subscriptions = export_subscriptions(DSSubs), @@ -585,30 +585,30 @@ session_create(SessionId, ConnInfo, Props) -> Session = #session{ id = SessionId, created_at = now_ms(), - disconnected_at = never, + last_alive_at = now_ms(), conninfo = ConnInfo, props = Props }, ok = mnesia:write(?SESSION_TAB, Session, write), Session. -session_set_disconnected_at_trans(Session, NewConnInfo, DisconnectedAt) -> +session_set_last_alive_at_trans(Session, NewConnInfo, LastAliveAt) -> #{id := SessionId} = Session, transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of [#session{} = SessionRecord0] -> SessionRecord = SessionRecord0#session{conninfo = NewConnInfo}, - _ = session_set_disconnected_at(SessionRecord, DisconnectedAt), + _ = session_set_last_alive_at(SessionRecord, LastAliveAt), ok; _ -> %% log and crash? ok end end), - Session#{conninfo := NewConnInfo, disconnected_at := DisconnectedAt}. + Session#{conninfo := NewConnInfo, last_alive_at := LastAliveAt}. -session_set_disconnected_at(SessionRecord0, DisconnectedAt) -> - SessionRecord = SessionRecord0#session{disconnected_at = DisconnectedAt}, +session_set_last_alive_at(SessionRecord0, LastAliveAt) -> + SessionRecord = SessionRecord0#session{last_alive_at = LastAliveAt}, ok = mnesia:write(?SESSION_TAB, SessionRecord, write), SessionRecord. @@ -849,7 +849,7 @@ export_subscriptions(DSSubs) -> ). export_session(#session{} = Record) -> - export_record(Record, #session.id, [id, created_at, disconnected_at, conninfo, props], #{}). + export_record(Record, #session.id, [id, created_at, last_alive_at, conninfo, props], #{}). export_subscription(#ds_sub{} = Record) -> export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}). diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index cbdc00c09..375bea97f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -73,7 +73,7 @@ id :: emqx_persistent_session_ds:id(), %% creation time created_at :: _Millisecond :: non_neg_integer(), - disconnected_at = never :: _Millisecond :: non_neg_integer() | never, + last_alive_at :: _Millisecond :: non_neg_integer(), conninfo :: emqx_types:conninfo(), %% for future usage props = #{} :: map() From d88deb9ceb5805da3895f1547c3c5da65815ee6a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Nov 2023 11:56:35 -0300 Subject: [PATCH 4/4] feat(ds): add session timer to bump last alive at timestamp --- apps/emqx/src/emqx_persistent_session_ds.erl | 22 ++++++++++++++++---- apps/emqx/src/emqx_schema.erl | 8 +++++++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 1429d6e97..97825e728 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -117,6 +117,7 @@ -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). +-type timer() :: pull | get_streams | bump_last_alive_at. -define(STATS_KEYS, [ subscriptions_cnt, @@ -396,6 +397,11 @@ handle_timeout( handle_timeout(_ClientInfo, get_streams, Session) -> renew_streams(Session), ensure_timer(get_streams), + {ok, [], Session}; +handle_timeout(_ClientInfo, bump_last_alive_at, Session0) -> + NowMS = now_ms(), + Session = session_set_last_alive_at_trans(Session0, NowMS), + ensure_timer(bump_last_alive_at), {ok, [], Session}. -spec replay(clientinfo(), [], session()) -> @@ -553,7 +559,7 @@ session_open(SessionId, NewConnInfo) -> false -> %% new connection being established Record1 = Record0#session{conninfo = NewConnInfo}, - Record = session_set_last_alive_at(Record1, never), + Record = session_set_last_alive_at(Record1, NowMS), Session = export_session(Record), DSSubs = session_read_subscriptions(SessionId), Subscriptions = export_subscriptions(DSSubs), @@ -592,6 +598,10 @@ session_create(SessionId, ConnInfo, Props) -> ok = mnesia:write(?SESSION_TAB, Session, write), Session. +session_set_last_alive_at_trans(Session, LastAliveAt) -> + #{conninfo := ConnInfo} = Session, + session_set_last_alive_at_trans(Session, ConnInfo, LastAliveAt). + session_set_last_alive_at_trans(Session, NewConnInfo, LastAliveAt) -> #{id := SessionId} = Session, transaction(fun() -> @@ -863,13 +873,17 @@ export_record(_, _, [], Acc) -> %% effects. Add `CBM:init' callback to the session behavior? ensure_timers() -> ensure_timer(pull), - ensure_timer(get_streams). + ensure_timer(get_streams), + ensure_timer(bump_last_alive_at). --spec ensure_timer(pull | get_streams) -> ok. +-spec ensure_timer(timer()) -> ok. +ensure_timer(bump_last_alive_at = Type) -> + BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), + ensure_timer(Type, BumpInterval); ensure_timer(Type) -> ensure_timer(Type, 100). --spec ensure_timer(pull | get_streams, non_neg_integer()) -> ok. +-spec ensure_timer(timer(), non_neg_integer()) -> ok. ensure_timer(Type, Timeout) -> _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), ok. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 5c3f2e72f..e10160e4c 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1781,6 +1781,14 @@ fields("session_persistence") -> desc => ?DESC(session_ds_idle_poll_interval) } )}, + {"last_alive_update_interval", + sc( + timeout_duration(), + #{ + default => <<"5000ms">>, + desc => ?DESC(session_ds_last_alive_update_interval) + } + )}, {"force_persistence", sc( boolean(),