diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index d92e3cc24..16b6db8a9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -209,7 +209,7 @@ info(created_at, #{s := S}) -> info(is_persistent, #{}) -> true; info(subscriptions, #{s := S}) -> - subs_to_map(S); + emqx_persistent_session_ds_subs:to_map(S); info(subscriptions_cnt, #{s := S}) -> emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S)); info(subscriptions_max, #{props := Conf}) -> @@ -280,7 +280,7 @@ subscribe( SubOpts, Session = #{id := ID, s := S0} ) -> - case subs_lookup(TopicFilter, S0) of + case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of undefined -> %% TODO: max subscriptions @@ -322,7 +322,7 @@ subscribe( IsNew = false, S1 = S0 end, - S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S1), + S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1), ?tp(persistent_session_ds_subscription_added, #{ topic_filter => TopicFilter, sub => Subscription, is_new => IsNew }), @@ -334,7 +334,7 @@ unsubscribe( TopicFilter, Session = #{id := ID, s := S0} ) -> - case subs_lookup(TopicFilter, S0) of + case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}; Subscription = #{props := SubOpts} -> @@ -344,13 +344,8 @@ unsubscribe( -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). -do_unsubscribe(SessionId, TopicFilter, SubMeta0 = #{id := SubId}, S0) -> - %% Note: we cannot delete the subscription immediately, since its - %% metadata can be used during replay (see `process_batch'). We - %% instead mark it as deleted, and let `subscription_gc' function - %% dispatch it later: - SubMeta = SubMeta0#{deleted => true}, - S1 = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], SubMeta, S0), +do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) -> + S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0), ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, topic_filter => TopicFilter }), @@ -365,7 +360,7 @@ do_unsubscribe(SessionId, TopicFilter, SubMeta0 = #{id := SubId}, S0) -> -spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. get_subscription(TopicFilter, #{s := S}) -> - case subs_lookup(TopicFilter, S) of + case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of _Subscription = #{props := SubOpts} -> SubOpts; undefined -> @@ -465,7 +460,7 @@ handle_timeout( Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> - S1 = subscription_gc(S0), + S1 = emqx_persistent_session_ds_subs:gc(S0), S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), Interval = emqx_config:get([session_persistence, renew_streams_interval]), Session = emqx_session:ensure_timer( @@ -509,7 +504,6 @@ replay(ClientInfo, [], Session0 = #{s := S0}) -> Session0, Streams ), - logger:error("Replay streams: ~p~n~p", [Streams, Session]), %% Note: we filled the buffer with the historical messages, and %% from now on we'll rely on the normal inflight/flow control %% mechanisms to replay them: @@ -687,7 +681,7 @@ session_drop(ID, Reason) -> case emqx_persistent_session_ds_state:open(ID) of {ok, S0} -> ?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}), - _S = subs_fold( + _S = emqx_persistent_session_ds_subs:fold( fun(TopicFilter, Subscription, S) -> do_unsubscribe(ID, TopicFilter, Subscription, S) end, @@ -905,74 +899,6 @@ do_drain_buffer(Inflight0, S0, Acc) -> %%-------------------------------------------------------------------------------- -%% @doc Remove subscriptions that have been marked for deletion, and -%% that don't have any unacked messages: -subscription_gc(S0) -> - subs_fold_all( - fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> - case Deleted andalso has_no_unacked_streams(SubId, S0) of - true -> - emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); - false -> - Acc - end - end, - S0, - S0 - ). - -has_no_unacked_streams(SubId, S) -> - emqx_persistent_session_ds_state:fold_streams( - fun - ({SID, _Stream}, Srs, Acc) when SID =:= SubId -> - emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc; - (_StreamKey, _Srs, Acc) -> - Acc - end, - true, - S - ). - -%% @doc It only returns subscriptions that haven't been marked for deletion: -subs_lookup(TopicFilter, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of - #{deleted := true} -> - undefined; - Sub -> - Sub - end. - -subs_to_map(S) -> - subs_fold( - fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, - #{}, - S - ). - -subs_fold(Fun, AccIn, S) -> - subs_fold_all( - fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) -> - case Deleted of - true -> Acc; - false -> Fun(TopicFilter, Sub, Acc) - end - end, - AccIn, - S - ). - -%% @doc Iterate over all subscriptions, including the deleted ones: -subs_fold_all(Fun, AccIn, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, - AccIn, - Subs - ). - -%%-------------------------------------------------------------------------------- - %% TODO: find a more reliable way to perform actions that have side %% effects. Add `CBM:init' callback to the session behavior? -spec ensure_timers(session()) -> session(). diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index ae15f2bd6..286d32ef4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -126,10 +126,10 @@ find_new_streams(S) -> renew_streams(S0) -> S1 = remove_unsubscribed_streams(S0), S2 = remove_fully_replayed_streams(S1), - emqx_topic_gbt:fold( + emqx_persistent_session_ds_subs:fold( fun - (Key, _Subscription = #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> - TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)), + (Key, #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> + TopicFilter = emqx_topic:words(Key), Streams = select_streams( SubId, emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), @@ -146,7 +146,7 @@ renew_streams(S0) -> Acc end, S2, - emqx_persistent_session_ds_state:get_subscriptions(S2) + S2 ). -spec on_unsubscribe( diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl new file mode 100644 index 000000000..92f17b108 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -0,0 +1,154 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc This module encapsulates the data related to the client's +%% subscriptions. It tries to reppresent the subscriptions as if they +%% were a simple key-value map. +%% +%% In reality, however, the session has to retain old the +%% subscriptions for longer to ensure the consistency of message +%% replay. +-module(emqx_persistent_session_ds_subs). + +%% API: +-export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]). + +-export_type([]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%%================================================================================ +%% API functions +%%================================================================================ + +%% @doc Process a new subscription +-spec on_subscribe( + emqx_persistent_session_ds:topic_filter(), + emqx_persistent_session_ds:subscription(), + emqx_persistent_session_ds_state:t() +) -> + emqx_persistent_session_ds_state:t(). +on_subscribe(TopicFilter, Subscription, S) -> + emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S). + +%% @doc Process UNSUBSCRIBE +-spec on_unsubscribe( + emqx_persistent_session_ds:topic_filter(), + emqx_persistent_session_ds:subscription(), + emqx_persistent_session_ds_state:t() +) -> + emqx_persistent_session_ds_state:t(). +on_unsubscribe(TopicFilter, Subscription0, S0) -> + %% Note: we cannot delete the subscription immediately, since its + %% metadata can be used during replay (see `process_batch'). We + %% instead mark it as deleted, and let `subscription_gc' function + %% dispatch it later: + Subscription = Subscription0#{deleted => true}, + emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0). + +%% @doc Remove subscriptions that have been marked for deletion, and +%% that don't have any unacked messages: +-spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). +gc(S0) -> + fold_all( + fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> + case Deleted andalso has_no_unacked_streams(SubId, S0) of + true -> + emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); + false -> + Acc + end + end, + S0, + S0 + ). + +%% @doc Fold over active subscriptions: +-spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) -> + emqx_persistent_session_ds:subscription() | undefined. +lookup(TopicFilter, S) -> + Subs = emqx_persistent_session_ds_state:get_subscriptions(S), + case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of + #{deleted := true} -> + undefined; + Sub -> + Sub + end. + +%% @doc Convert active subscriptions to a map, for information +%% purpose: +-spec to_map(emqx_persistent_session_ds_state:t()) -> map(). +to_map(S) -> + fold( + fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, + #{}, + S + ). + +%% @doc Fold over active subscriptions: +-spec fold( + fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc), + Acc, + emqx_persistent_session_ds_state:t() +) -> + Acc. +fold(Fun, AccIn, S) -> + fold_all( + fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) -> + case Deleted of + true -> Acc; + false -> Fun(TopicFilter, Sub, Acc) + end + end, + AccIn, + S + ). + +%% @doc Fold over all subscriptions, including inactive ones: +-spec fold_all( + fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc), + Acc, + emqx_persistent_session_ds_state:t() +) -> + Acc. +fold_all(Fun, AccIn, S) -> + Subs = emqx_persistent_session_ds_state:get_subscriptions(S), + emqx_topic_gbt:fold( + fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, + AccIn, + Subs + ). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +-spec has_no_unacked_streams( + emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t() +) -> boolean(). +has_no_unacked_streams(SubId, S) -> + emqx_persistent_session_ds_state:fold_streams( + fun + ({SID, _Stream}, Srs, Acc) when SID =:= SubId -> + emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc; + (_StreamKey, _Srs, Acc) -> + Acc + end, + true, + S + ).