diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl deleted file mode 100644 index 1053978dc..000000000 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ /dev/null @@ -1,795 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc This module implements the routines for replaying streams of -%% messages. --module(emqx_persistent_message_ds_replayer). - -%% API: --export([new/0, open/1, next_packet_id/1, n_inflight/1]). - --export([poll/4, replay/2, commit_offset/4]). - --export([seqno_to_packet_id/1, packet_id_to_seqno/2]). - --export([committed_until/2]). - -%% internal exports: --export([]). - --export_type([inflight/0, seqno/0]). - --include_lib("emqx/include/logger.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("emqx_utils/include/emqx_message.hrl"). --include("emqx_persistent_session_ds.hrl"). - --ifdef(TEST). --include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). --endif. - --define(EPOCH_SIZE, 16#10000). - --define(ACK, 0). --define(COMP, 1). - --define(TRACK_FLAG(WHICH), (1 bsl WHICH)). --define(TRACK_FLAGS_ALL, ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)). --define(TRACK_FLAGS_NONE, 0). - -%%================================================================================ -%% Type declarations -%%================================================================================ - -%% Note: sequence numbers are monotonic; they don't wrap around: --type seqno() :: non_neg_integer(). - --type track() :: ack | comp. --type commit_type() :: rec. - --record(inflight, { - next_seqno = 1 :: seqno(), - commits = #{ack => 1, comp => 1, rec => 1} :: #{track() | commit_type() => seqno()}, - %% Ranges are sorted in ascending order of their sequence numbers. - offset_ranges = [] :: [ds_pubrange()] -}). - --opaque inflight() :: #inflight{}. - --type message() :: emqx_types:message(). --type replies() :: [emqx_session:reply()]. - --type preproc_fun() :: fun((message()) -> message() | [message()]). - -%%================================================================================ -%% API funcions -%%================================================================================ - --spec new() -> inflight(). -new() -> - #inflight{}. - --spec open(emqx_persistent_session_ds:id()) -> inflight(). -open(SessionId) -> - {Ranges, RecUntil} = ro_transaction( - fun() -> {get_ranges(SessionId), get_committed_offset(SessionId, rec)} end - ), - {Commits, NextSeqno} = compute_inflight_range(Ranges), - #inflight{ - commits = Commits#{rec => RecUntil}, - next_seqno = NextSeqno, - offset_ranges = Ranges - }. - --spec next_packet_id(inflight()) -> {emqx_types:packet_id(), inflight()}. -next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqno}) -> - Inflight = Inflight0#inflight{next_seqno = next_seqno(LastSeqno)}, - {seqno_to_packet_id(LastSeqno), Inflight}. - --spec n_inflight(inflight()) -> non_neg_integer(). -n_inflight(#inflight{offset_ranges = Ranges}) -> - %% TODO - %% This is not very efficient. Instead, we can take the maximum of - %% `range_size(AckedUntil, NextSeqno)` and `range_size(CompUntil, NextSeqno)`. - %% This won't be exact number but a pessimistic estimate, but this way we - %% will penalize clients that PUBACK QoS 1 messages but don't PUBCOMP QoS 2 - %% messages for some reason. For that to work, we need to additionally track - %% actual `AckedUntil` / `CompUntil` during `commit_offset/4`. - lists:foldl( - fun - (#ds_pubrange{type = ?T_CHECKPOINT}, N) -> - N; - (#ds_pubrange{type = ?T_INFLIGHT} = Range, N) -> - N + range_size(Range) - end, - 0, - Ranges - ). - --spec replay(preproc_fun(), inflight()) -> {emqx_session:replies(), inflight()}. -replay(PreprocFunFun, Inflight0 = #inflight{offset_ranges = Ranges0, commits = Commits}) -> - {Ranges, Replies} = lists:mapfoldr( - fun(Range, Acc) -> - replay_range(PreprocFunFun, Commits, Range, Acc) - end, - [], - Ranges0 - ), - Inflight = Inflight0#inflight{offset_ranges = Ranges}, - {Replies, Inflight}. - --spec commit_offset(emqx_persistent_session_ds:id(), Offset, emqx_types:packet_id(), inflight()) -> - {_IsValidOffset :: boolean(), inflight()} -when - Offset :: track() | commit_type(). -commit_offset( - SessionId, - Track, - PacketId, - Inflight0 = #inflight{commits = Commits} -) when Track == ack orelse Track == comp -> - case validate_commit(Track, PacketId, Inflight0) of - CommitUntil when is_integer(CommitUntil) -> - %% TODO - %% We do not preserve `CommitUntil` in the database. Instead, we discard - %% fully acked ranges from the database. In effect, this means that the - %% most recent `CommitUntil` the client has sent may be lost in case of a - %% crash or client loss. - Inflight1 = Inflight0#inflight{commits = Commits#{Track := CommitUntil}}, - Inflight = discard_committed(SessionId, Inflight1), - {true, Inflight}; - false -> - {false, Inflight0} - end; -commit_offset( - SessionId, - CommitType = rec, - PacketId, - Inflight0 = #inflight{commits = Commits} -) -> - case validate_commit(CommitType, PacketId, Inflight0) of - CommitUntil when is_integer(CommitUntil) -> - update_committed_offset(SessionId, CommitType, CommitUntil), - Inflight = Inflight0#inflight{commits = Commits#{CommitType := CommitUntil}}, - {true, Inflight}; - false -> - {false, Inflight0} - end. - --spec poll(preproc_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) -> - {emqx_session:replies(), inflight()}. -poll(PreprocFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE -> - MinBatchSize = emqx_config:get([session_persistence, min_batch_size]), - FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)), - FreeSpace = WindowSize - n_inflight(Inflight0), - case FreeSpace >= FetchThreshold of - false -> - %% TODO: this branch is meant to avoid fetching data from - %% the DB in chunks that are too small. However, this - %% logic is not exactly good for the latency. Can the - %% client get stuck even? - {[], Inflight0}; - true -> - %% TODO: Wrap this in `mria:async_dirty/2`? - Checkpoints = find_checkpoints(Inflight0#inflight.offset_ranges), - StreamGroups = group_streams(get_streams(SessionId)), - {Publihes, Inflight} = - fetch(PreprocFun, SessionId, Inflight0, Checkpoints, StreamGroups, FreeSpace, []), - %% Discard now irrelevant QoS0-only ranges, if any. - {Publihes, discard_committed(SessionId, Inflight)} - end. - -%% Which seqno this track is committed until. -%% "Until" means this is first seqno that is _not yet committed_ for this track. --spec committed_until(track() | commit_type(), inflight()) -> seqno(). -committed_until(Track, #inflight{commits = Commits}) -> - maps:get(Track, Commits). - --spec seqno_to_packet_id(seqno()) -> emqx_types:packet_id() | 0. -seqno_to_packet_id(Seqno) -> - Seqno rem ?EPOCH_SIZE. - -%% Reconstruct session counter by adding most significant bits from -%% the current counter to the packet id. --spec packet_id_to_seqno(emqx_types:packet_id(), inflight()) -> seqno(). -packet_id_to_seqno(PacketId, #inflight{next_seqno = NextSeqno}) -> - packet_id_to_seqno_(NextSeqno, PacketId). - -%%================================================================================ -%% Internal exports -%%================================================================================ - -%%================================================================================ -%% Internal functions -%%================================================================================ - -compute_inflight_range([]) -> - {#{ack => 1, comp => 1}, 1}; -compute_inflight_range(Ranges) -> - _RangeLast = #ds_pubrange{until = LastSeqno} = lists:last(Ranges), - AckedUntil = find_committed_until(ack, Ranges), - CompUntil = find_committed_until(comp, Ranges), - Commits = #{ - ack => emqx_maybe:define(AckedUntil, LastSeqno), - comp => emqx_maybe:define(CompUntil, LastSeqno) - }, - {Commits, LastSeqno}. - -find_committed_until(Track, Ranges) -> - RangesUncommitted = lists:dropwhile( - fun(Range) -> - case Range of - #ds_pubrange{type = ?T_CHECKPOINT} -> - true; - #ds_pubrange{type = ?T_INFLIGHT, tracks = Tracks} -> - not has_track(Track, Tracks) - end - end, - Ranges - ), - case RangesUncommitted of - [#ds_pubrange{id = {_, CommittedUntil, _StreamRef}} | _] -> - CommittedUntil; - [] -> - undefined - end. - --spec get_ranges(emqx_persistent_session_ds:id()) -> [ds_pubrange()]. -get_ranges(SessionId) -> - Pat = erlang:make_tuple( - record_info(size, ds_pubrange), - '_', - [{1, ds_pubrange}, {#ds_pubrange.id, {SessionId, '_', '_'}}] - ), - mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read). - -fetch(PreprocFun, SessionId, Inflight0, CPs, Groups, N, Acc) when N > 0, Groups =/= [] -> - #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0, - {Stream, Groups2} = get_the_first_stream(Groups), - case get_next_n_messages_from_stream(Stream, CPs, N) of - [] -> - fetch(PreprocFun, SessionId, Inflight0, CPs, Groups2, N, Acc); - {ItBegin, ItEnd, Messages} -> - %% We need to preserve the iterator pointing to the beginning of the - %% range, so that we can replay it if needed. - {Publishes, UntilSeqno} = publish_fetch(PreprocFun, FirstSeqno, Messages), - Size = range_size(FirstSeqno, UntilSeqno), - Range0 = #ds_pubrange{ - id = {SessionId, FirstSeqno, Stream#ds_stream.ref}, - type = ?T_INFLIGHT, - tracks = compute_pub_tracks(Publishes), - until = UntilSeqno, - iterator = ItBegin - }, - ok = preserve_range(Range0), - %% ...Yet we need to keep the iterator pointing past the end of the - %% range, so that we can pick up where we left off: it will become - %% `ItBegin` of the next range for this stream. - Range = keep_next_iterator(ItEnd, Range0), - Inflight = Inflight0#inflight{ - next_seqno = UntilSeqno, - offset_ranges = Ranges ++ [Range] - }, - fetch(PreprocFun, SessionId, Inflight, CPs, Groups2, N - Size, [Publishes | Acc]) - end; -fetch(_ReplyFun, _SessionId, Inflight, _CPs, _Groups, _N, Acc) -> - Publishes = lists:append(lists:reverse(Acc)), - {Publishes, Inflight}. - -discard_committed( - SessionId, - Inflight0 = #inflight{commits = Commits, offset_ranges = Ranges0} -) -> - %% TODO: This could be kept and incrementally updated in the inflight state. - Checkpoints = find_checkpoints(Ranges0), - %% TODO: Wrap this in `mria:async_dirty/2`? - Ranges = discard_committed_ranges(SessionId, Commits, Checkpoints, Ranges0), - Inflight0#inflight{offset_ranges = Ranges}. - -find_checkpoints(Ranges) -> - lists:foldl( - fun(#ds_pubrange{id = {_SessionId, _, StreamRef}} = Range, Acc) -> - %% For each stream, remember the last range over this stream. - Acc#{StreamRef => Range} - end, - #{}, - Ranges - ). - -discard_committed_ranges( - SessionId, - Commits, - Checkpoints, - Ranges = [Range = #ds_pubrange{id = {_SessionId, _, StreamRef}} | Rest] -) -> - case discard_committed_range(Commits, Range) of - discard -> - %% This range has been fully committed. - %% Either discard it completely, or preserve the iterator for the next range - %% over this stream (i.e. a checkpoint). - RangeKept = - case maps:get(StreamRef, Checkpoints) of - Range -> - [checkpoint_range(Range)]; - _Previous -> - discard_range(Range), - [] - end, - %% Since we're (intentionally) not using transactions here, it's important to - %% issue database writes in the same order in which ranges are stored: from - %% the oldest to the newest. This is also why we need to compute which ranges - %% should become checkpoints before we start writing anything. - RangeKept ++ discard_committed_ranges(SessionId, Commits, Checkpoints, Rest); - keep -> - %% This range has not been fully committed. - [Range | discard_committed_ranges(SessionId, Commits, Checkpoints, Rest)]; - keep_all -> - %% The rest of ranges (if any) still have uncommitted messages. - Ranges; - TracksLeft -> - %% Only some track has been committed. - %% Preserve the uncommitted tracks in the database. - RangeKept = Range#ds_pubrange{tracks = TracksLeft}, - preserve_range(restore_first_iterator(RangeKept)), - [RangeKept | discard_committed_ranges(SessionId, Commits, Checkpoints, Rest)] - end; -discard_committed_ranges(_SessionId, _Commits, _Checkpoints, []) -> - []. - -discard_committed_range(_Commits, #ds_pubrange{type = ?T_CHECKPOINT}) -> - discard; -discard_committed_range( - #{ack := AckedUntil, comp := CompUntil}, - #ds_pubrange{until = Until} -) when Until > AckedUntil andalso Until > CompUntil -> - keep_all; -discard_committed_range(Commits, #ds_pubrange{until = Until, tracks = Tracks}) -> - case discard_tracks(Commits, Until, Tracks) of - 0 -> - discard; - Tracks -> - keep; - TracksLeft -> - TracksLeft - end. - -discard_tracks(#{ack := AckedUntil, comp := CompUntil}, Until, Tracks) -> - TAck = - case Until > AckedUntil of - true -> ?TRACK_FLAG(?ACK) band Tracks; - false -> 0 - end, - TComp = - case Until > CompUntil of - true -> ?TRACK_FLAG(?COMP) band Tracks; - false -> 0 - end, - TAck bor TComp. - -replay_range( - PreprocFun, - Commits, - Range0 = #ds_pubrange{ - type = ?T_INFLIGHT, id = {_, First, _StreamRef}, until = Until, iterator = It - }, - Acc -) -> - Size = range_size(First, Until), - {ok, ItNext, MessagesUnacked} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, Size), - %% Asserting that range is consistent with the message storage state. - {Replies, Until} = publish_replay(PreprocFun, Commits, First, MessagesUnacked), - %% Again, we need to keep the iterator pointing past the end of the - %% range, so that we can pick up where we left off. - Range = keep_next_iterator(ItNext, Range0), - {Range, Replies ++ Acc}; -replay_range(_PreprocFun, _Commits, Range0 = #ds_pubrange{type = ?T_CHECKPOINT}, Acc) -> - {Range0, Acc}. - -validate_commit( - Track, - PacketId, - Inflight = #inflight{commits = Commits, next_seqno = NextSeqno} -) -> - Seqno = packet_id_to_seqno_(NextSeqno, PacketId), - CommittedUntil = maps:get(Track, Commits), - CommitNext = get_commit_next(Track, Inflight), - case Seqno >= CommittedUntil andalso Seqno < CommitNext of - true -> - next_seqno(Seqno); - false -> - ?SLOG(warning, #{ - msg => "out-of-order_commit", - track => Track, - packet_id => PacketId, - commit_seqno => Seqno, - committed_until => CommittedUntil, - commit_next => CommitNext - }), - false - end. - -get_commit_next(ack, #inflight{next_seqno = NextSeqno}) -> - NextSeqno; -get_commit_next(rec, #inflight{next_seqno = NextSeqno}) -> - NextSeqno; -get_commit_next(comp, #inflight{commits = Commits}) -> - maps:get(rec, Commits). - -publish_fetch(PreprocFun, FirstSeqno, Messages) -> - flatmapfoldl( - fun({_DSKey, MessageIn}, Acc) -> - Message = PreprocFun(MessageIn), - publish_fetch(Message, Acc) - end, - FirstSeqno, - Messages - ). - -publish_fetch(#message{qos = ?QOS_0} = Message, Seqno) -> - {{undefined, Message}, Seqno}; -publish_fetch(#message{} = Message, Seqno) -> - PacketId = seqno_to_packet_id(Seqno), - {{PacketId, Message}, next_seqno(Seqno)}; -publish_fetch(Messages, Seqno) -> - flatmapfoldl(fun publish_fetch/2, Seqno, Messages). - -publish_replay(PreprocFun, Commits, FirstSeqno, Messages) -> - #{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits, - flatmapfoldl( - fun({_DSKey, MessageIn}, Acc) -> - Message = PreprocFun(MessageIn), - publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc) - end, - FirstSeqno, - Messages - ). - -publish_replay(#message{qos = ?QOS_0}, _, _, _, Seqno) -> - %% QoS 0 (at most once) messages should not be replayed. - {[], Seqno}; -publish_replay(#message{qos = Qos} = Message, AckedUntil, CompUntil, RecUntil, Seqno) -> - case Qos of - ?QOS_1 when Seqno < AckedUntil -> - %% This message has already been acked, so we can skip it. - %% We still need to advance seqno, because previously we assigned this message - %% a unique Packet Id. - {[], next_seqno(Seqno)}; - ?QOS_2 when Seqno < CompUntil -> - %% This message's flow has already been fully completed, so we can skip it. - %% We still need to advance seqno, because previously we assigned this message - %% a unique Packet Id. - {[], next_seqno(Seqno)}; - ?QOS_2 when Seqno < RecUntil -> - %% This message's flow has been partially completed, we need to resend a PUBREL. - PacketId = seqno_to_packet_id(Seqno), - Pub = {pubrel, PacketId}, - {Pub, next_seqno(Seqno)}; - _ -> - %% This message flow hasn't been acked and/or received, we need to resend it. - PacketId = seqno_to_packet_id(Seqno), - Pub = {PacketId, emqx_message:set_flag(dup, true, Message)}, - {Pub, next_seqno(Seqno)} - end; -publish_replay([], _, _, _, Seqno) -> - {[], Seqno}; -publish_replay(Messages, AckedUntil, CompUntil, RecUntil, Seqno) -> - flatmapfoldl( - fun(Message, Acc) -> - publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc) - end, - Seqno, - Messages - ). - --spec compute_pub_tracks(replies()) -> non_neg_integer(). -compute_pub_tracks(Pubs) -> - compute_pub_tracks(Pubs, ?TRACK_FLAGS_NONE). - -compute_pub_tracks(_Pubs, Tracks = ?TRACK_FLAGS_ALL) -> - Tracks; -compute_pub_tracks([Pub | Rest], Tracks) -> - Track = - case Pub of - {_PacketId, #message{qos = ?QOS_1}} -> ?TRACK_FLAG(?ACK); - {_PacketId, #message{qos = ?QOS_2}} -> ?TRACK_FLAG(?COMP); - {pubrel, _PacketId} -> ?TRACK_FLAG(?COMP); - _ -> ?TRACK_FLAGS_NONE - end, - compute_pub_tracks(Rest, Track bor Tracks); -compute_pub_tracks([], Tracks) -> - Tracks. - -keep_next_iterator(ItNext, Range = #ds_pubrange{iterator = ItFirst, misc = Misc}) -> - Range#ds_pubrange{ - iterator = ItNext, - %% We need to keep the first iterator around, in case we need to preserve - %% this range again, updating still uncommitted tracks it's part of. - misc = Misc#{iterator_first => ItFirst} - }. - -restore_first_iterator(Range = #ds_pubrange{misc = Misc = #{iterator_first := ItFirst}}) -> - Range#ds_pubrange{ - iterator = ItFirst, - misc = maps:remove(iterator_first, Misc) - }. - --spec preserve_range(ds_pubrange()) -> ok. -preserve_range(Range = #ds_pubrange{type = ?T_INFLIGHT}) -> - mria:dirty_write(?SESSION_PUBRANGE_TAB, Range). - -has_track(ack, Tracks) -> - (?TRACK_FLAG(?ACK) band Tracks) > 0; -has_track(comp, Tracks) -> - (?TRACK_FLAG(?COMP) band Tracks) > 0. - --spec discard_range(ds_pubrange()) -> ok. -discard_range(#ds_pubrange{id = RangeId}) -> - mria:dirty_delete(?SESSION_PUBRANGE_TAB, RangeId). - --spec checkpoint_range(ds_pubrange()) -> ds_pubrange(). -checkpoint_range(Range0 = #ds_pubrange{type = ?T_INFLIGHT}) -> - Range = Range0#ds_pubrange{type = ?T_CHECKPOINT, misc = #{}}, - ok = mria:dirty_write(?SESSION_PUBRANGE_TAB, Range), - Range; -checkpoint_range(Range = #ds_pubrange{type = ?T_CHECKPOINT}) -> - %% This range should have been checkpointed already. - Range. - -get_last_iterator(Stream = #ds_stream{ref = StreamRef}, Checkpoints) -> - case maps:get(StreamRef, Checkpoints, none) of - none -> - Stream#ds_stream.beginning; - #ds_pubrange{iterator = ItNext} -> - ItNext - end. - --spec get_streams(emqx_persistent_session_ds:id()) -> [ds_stream()]. -get_streams(SessionId) -> - mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId). - --spec get_committed_offset(emqx_persistent_session_ds:id(), _Name) -> seqno(). -get_committed_offset(SessionId, Name) -> - case mnesia:read(?SESSION_COMMITTED_OFFSET_TAB, {SessionId, Name}) of - [] -> - 1; - [#ds_committed_offset{until = Seqno}] -> - Seqno - end. - --spec update_committed_offset(emqx_persistent_session_ds:id(), _Name, seqno()) -> ok. -update_committed_offset(SessionId, Name, Until) -> - mria:dirty_write(?SESSION_COMMITTED_OFFSET_TAB, #ds_committed_offset{ - id = {SessionId, Name}, until = Until - }). - -next_seqno(Seqno) -> - NextSeqno = Seqno + 1, - case seqno_to_packet_id(NextSeqno) of - 0 -> - %% We skip sequence numbers that lead to PacketId = 0 to - %% simplify math. Note: it leads to occasional gaps in the - %% sequence numbers. - NextSeqno + 1; - _ -> - NextSeqno - end. - -packet_id_to_seqno_(NextSeqno, PacketId) -> - Epoch = NextSeqno bsr 16, - case (Epoch bsl 16) + PacketId of - N when N =< NextSeqno -> - N; - N -> - N - ?EPOCH_SIZE - end. - -range_size(#ds_pubrange{id = {_, First, _StreamRef}, until = Until}) -> - range_size(First, Until). - -range_size(FirstSeqno, UntilSeqno) -> - %% This function assumes that gaps in the sequence ID occur _only_ when the - %% packet ID wraps. - Size = UntilSeqno - FirstSeqno, - Size + (FirstSeqno bsr 16) - (UntilSeqno bsr 16). - -%%================================================================================ -%% stream scheduler - -%% group streams by the first position in the rank --spec group_streams(list(ds_stream())) -> list(list(ds_stream())). -group_streams(Streams) -> - Groups = maps:groups_from_list( - fun(#ds_stream{rank = {RankX, _}}) -> RankX end, - Streams - ), - shuffle(maps:values(Groups)). - --spec shuffle([A]) -> [A]. -shuffle(L0) -> - L1 = lists:map( - fun(A) -> - %% maybe topic/stream prioritization could be introduced here? - {rand:uniform(), A} - end, - L0 - ), - L2 = lists:sort(L1), - {_, L} = lists:unzip(L2), - L. - -get_the_first_stream([Group | Groups]) -> - case get_next_stream_from_group(Group) of - {Stream, {sorted, []}} -> - {Stream, Groups}; - {Stream, Group2} -> - {Stream, [Group2 | Groups]}; - undefined -> - get_the_first_stream(Groups) - end; -get_the_first_stream([]) -> - %% how this possible ? - throw(#{reason => no_valid_stream}). - -%% the scheduler is simple, try to get messages from the same shard, but it's okay to take turns -get_next_stream_from_group({sorted, [H | T]}) -> - {H, {sorted, T}}; -get_next_stream_from_group({sorted, []}) -> - undefined; -get_next_stream_from_group(Streams) -> - [Stream | T] = lists:sort( - fun(#ds_stream{rank = {_, RankA}}, #ds_stream{rank = {_, RankB}}) -> - RankA < RankB - end, - Streams - ), - {Stream, {sorted, T}}. - -get_next_n_messages_from_stream(Stream, CPs, N) -> - ItBegin = get_last_iterator(Stream, CPs), - case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N) of - {ok, _ItEnd, []} -> - []; - {ok, ItEnd, Messages} -> - {ItBegin, ItEnd, Messages}; - {ok, end_of_stream} -> - %% TODO: how to skip this closed stream or it should be taken over by lower level layer - [] - end. - -%%================================================================================ - --spec flatmapfoldl(fun((X, Acc) -> {Y | [Y], Acc}), Acc, [X]) -> {[Y], Acc}. -flatmapfoldl(_Fun, Acc, []) -> - {[], Acc}; -flatmapfoldl(Fun, Acc, [X | Xs]) -> - {Ys, NAcc} = Fun(X, Acc), - {Zs, FAcc} = flatmapfoldl(Fun, NAcc, Xs), - case is_list(Ys) of - true -> - {Ys ++ Zs, FAcc}; - _ -> - {[Ys | Zs], FAcc} - end. - -ro_transaction(Fun) -> - {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun), - Res. - --ifdef(TEST). - -%% This test only tests boundary conditions (to make sure property-based test didn't skip them): -packet_id_to_seqno_test() -> - %% Packet ID = 1; first epoch: - ?assertEqual(1, packet_id_to_seqno_(1, 1)), - ?assertEqual(1, packet_id_to_seqno_(10, 1)), - ?assertEqual(1, packet_id_to_seqno_(1 bsl 16 - 1, 1)), - ?assertEqual(1, packet_id_to_seqno_(1 bsl 16, 1)), - %% Packet ID = 1; second and 3rd epochs: - ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno_(1 bsl 16 + 1, 1)), - ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno_(2 bsl 16, 1)), - ?assertEqual(2 bsl 16 + 1, packet_id_to_seqno_(2 bsl 16 + 1, 1)), - %% Packet ID = 16#ffff: - PID = 1 bsl 16 - 1, - ?assertEqual(PID, packet_id_to_seqno_(PID, PID)), - ?assertEqual(PID, packet_id_to_seqno_(1 bsl 16, PID)), - ?assertEqual(1 bsl 16 + PID, packet_id_to_seqno_(2 bsl 16, PID)), - ok. - -packet_id_to_seqno_test_() -> - Opts = [{numtests, 1000}, {to_file, user}], - {timeout, 30, fun() -> ?assert(proper:quickcheck(packet_id_to_seqno_prop(), Opts)) end}. - -packet_id_to_seqno_prop() -> - ?FORALL( - NextSeqNo, - next_seqno_gen(), - ?FORALL( - SeqNo, - seqno_gen(NextSeqNo), - begin - PacketId = seqno_to_packet_id(SeqNo), - ?assertEqual(SeqNo, packet_id_to_seqno_(NextSeqNo, PacketId)), - true - end - ) - ). - -next_seqno_gen() -> - ?LET( - {Epoch, Offset}, - {non_neg_integer(), non_neg_integer()}, - Epoch bsl 16 + Offset - ). - -seqno_gen(NextSeqNo) -> - WindowSize = 1 bsl 16 - 1, - Min = max(0, NextSeqNo - WindowSize), - Max = max(0, NextSeqNo - 1), - range(Min, Max). - -range_size_test_() -> - [ - ?_assertEqual(0, range_size(42, 42)), - ?_assertEqual(1, range_size(42, 43)), - ?_assertEqual(1, range_size(16#ffff, 16#10001)), - ?_assertEqual(16#ffff - 456 + 123, range_size(16#1f0000 + 456, 16#200000 + 123)) - ]. - -compute_inflight_range_test_() -> - [ - ?_assertEqual( - {#{ack => 1, comp => 1}, 1}, - compute_inflight_range([]) - ), - ?_assertEqual( - {#{ack => 12, comp => 13}, 42}, - compute_inflight_range([ - #ds_pubrange{id = {<<>>, 1, 0}, until = 2, type = ?T_CHECKPOINT}, - #ds_pubrange{id = {<<>>, 4, 0}, until = 8, type = ?T_CHECKPOINT}, - #ds_pubrange{id = {<<>>, 11, 0}, until = 12, type = ?T_CHECKPOINT}, - #ds_pubrange{ - id = {<<>>, 12, 0}, - until = 13, - type = ?T_INFLIGHT, - tracks = ?TRACK_FLAG(?ACK) - }, - #ds_pubrange{ - id = {<<>>, 13, 0}, - until = 20, - type = ?T_INFLIGHT, - tracks = ?TRACK_FLAG(?COMP) - }, - #ds_pubrange{ - id = {<<>>, 20, 0}, - until = 42, - type = ?T_INFLIGHT, - tracks = ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP) - } - ]) - ), - ?_assertEqual( - {#{ack => 13, comp => 13}, 13}, - compute_inflight_range([ - #ds_pubrange{id = {<<>>, 1, 0}, until = 2, type = ?T_CHECKPOINT}, - #ds_pubrange{id = {<<>>, 4, 0}, until = 8, type = ?T_CHECKPOINT}, - #ds_pubrange{id = {<<>>, 11, 0}, until = 12, type = ?T_CHECKPOINT}, - #ds_pubrange{id = {<<>>, 12, 0}, until = 13, type = ?T_CHECKPOINT} - ]) - ) - ]. - --endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index a8c62fe7a..1ab256d32 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ -behaviour(emqx_session). -include("emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). @@ -78,59 +79,64 @@ -ifdef(TEST). -export([ session_open/2, - list_all_sessions/0, - list_all_subscriptions/0, - list_all_streams/0, - list_all_pubranges/0 + list_all_sessions/0 ]). -endif. -export_type([ id/0, - subscription_id/0, - session/0 + seqno/0, + timestamp/0, + topic_filter/0, + subscription/0, + session/0, + stream_state/0 ]). +-type seqno() :: non_neg_integer(). + %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). -type id() :: binary(). -type topic_filter() :: emqx_types:topic(). --type topic_filter_words() :: emqx_ds:topic_filter(). --type subscription_id() :: {id(), topic_filter()}. + -type subscription() :: #{ start_time := emqx_ds:time(), props := map(), extra := map() }. +%%%%% Session sequence numbers: +-define(next(QOS), {0, QOS}). +%% Note: we consider the sequence number _committed_ once the full +%% packet MQTT flow is completed for the sequence number. That is, +%% when we receive PUBACK for the QoS1 message, or PUBCOMP, or PUBREC +%% with Reason code > 0x80 for QoS2 message. +-define(committed(QOS), {1, QOS}). +%% For QoS2 messages we also need to store the sequence number of the +%% last PUBREL message: +-define(pubrec, 2). + -define(TIMER_PULL, timer_pull). -define(TIMER_GET_STREAMS, timer_get_streams). -define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). -type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. --type subscriptions() :: emqx_topic_gbt:t(nil(), subscription()). - -type session() :: #{ %% Client ID id := id(), - %% When the session was created - created_at := timestamp(), - %% When the client was last considered alive - last_alive_at := timestamp(), - %% Client’s Subscriptions. - subscriptions := subscriptions(), - %% Inflight messages - inflight := emqx_persistent_message_ds_replayer:inflight(), - %% Receive maximum - receive_maximum := pos_integer(), - %% Connection Info - conninfo := emqx_types:conninfo(), - %% Timers - timer() => reference(), - %% - props := map() + %% Configuration: + props := map(), + %% Persistent state: + s := emqx_persistent_session_ds_state:t(), + %% Buffer: + inflight := emqx_persistent_session_ds_inflight:t(), + %% Timers: + timer() => reference() }. +-type stream_state() :: #ifs{}. + -type timestamp() :: emqx_utils_calendar:epoch_millisecond(). -type millisecond() :: non_neg_integer(). -type clientinfo() :: emqx_types:clientinfo(). @@ -141,23 +147,15 @@ subscriptions_cnt, subscriptions_max, inflight_cnt, - inflight_max, - next_pkt_id + inflight_max ]). --define(IS_EXPIRED(NOW_MS, LAST_ALIVE_AT, EI), - (is_number(LAST_ALIVE_AT) andalso - is_number(EI) andalso - (NOW_MS >= LAST_ALIVE_AT + EI)) -). - %% -spec create(clientinfo(), conninfo(), emqx_session:conf()) -> session(). create(#{clientid := ClientID}, ConnInfo, Conf) -> - Session = session_ensure_new(ClientID, ConnInfo), - apply_conf(ConnInfo, Conf, ensure_timers(Session)). + ensure_timers(session_ensure_new(ClientID, ConnInfo, Conf)). -spec open(clientinfo(), conninfo(), emqx_session:conf()) -> {_IsPresent :: true, session(), []} | false. @@ -171,18 +169,12 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> ok = emqx_cm:discard_session(ClientID), case session_open(ClientID, ConnInfo) of Session0 = #{} -> - Session = apply_conf(ConnInfo, Conf, Session0), + Session = Session0#{props => Conf}, {true, ensure_timers(Session), []}; false -> false end. -apply_conf(ConnInfo, Conf, Session) -> - Session#{ - receive_maximum => receive_maximum(ConnInfo), - props => Conf - }. - -spec destroy(session() | clientinfo()) -> ok. destroy(#{id := ClientID}) -> destroy_session(ClientID); @@ -202,14 +194,14 @@ info(id, #{id := ClientID}) -> ClientID; info(clientid, #{id := ClientID}) -> ClientID; -info(created_at, #{created_at := CreatedAt}) -> - CreatedAt; +info(created_at, #{s := S}) -> + emqx_persistent_session_ds_state:get_created_at(S); info(is_persistent, #{}) -> true; -info(subscriptions, #{subscriptions := Subs}) -> - subs_to_map(Subs); -info(subscriptions_cnt, #{subscriptions := Subs}) -> - subs_size(Subs); +info(subscriptions, #{s := S}) -> + subs_to_map(S); +info(subscriptions_cnt, #{s := S}) -> + emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S)); info(subscriptions_max, #{props := Conf}) -> maps:get(max_subscriptions, Conf); info(upgrade_qos, #{props := Conf}) -> @@ -217,9 +209,9 @@ info(upgrade_qos, #{props := Conf}) -> info(inflight, #{inflight := Inflight}) -> Inflight; info(inflight_cnt, #{inflight := Inflight}) -> - emqx_persistent_message_ds_replayer:n_inflight(Inflight); -info(inflight_max, #{receive_maximum := ReceiveMaximum}) -> - ReceiveMaximum; + emqx_persistent_session_ds_inflight:n_inflight(Inflight); +info(inflight_max, #{inflight := Inflight}) -> + emqx_persistent_session_ds_inflight:receive_maximum(Inflight); info(retry_interval, #{props := Conf}) -> maps:get(retry_interval, Conf); % info(mqueue, #sessmem{mqueue = MQueue}) -> @@ -230,9 +222,9 @@ info(retry_interval, #{props := Conf}) -> % emqx_mqueue:max_len(MQueue); % info(mqueue_dropped, #sessmem{mqueue = MQueue}) -> % emqx_mqueue:dropped(MQueue); -info(next_pkt_id, #{inflight := Inflight}) -> - {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(Inflight), - PacketId; +%% info(next_pkt_id, #{s := S}) -> +%% {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S), +%% PacketId; % info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> % AwaitingRel; % info(awaiting_rel_cnt, #sessmem{awaiting_rel = AwaitingRel}) -> @@ -249,22 +241,7 @@ stats(Session) -> %% Debug/troubleshooting -spec print_session(emqx_types:clientid()) -> map() | undefined. print_session(ClientId) -> - catch ro_transaction( - fun() -> - case mnesia:read(?SESSION_TAB, ClientId) of - [Session] -> - #{ - session => Session, - streams => mnesia:read(?SESSION_STREAM_TAB, ClientId), - pubranges => session_read_pubranges(ClientId), - offsets => session_read_offsets(ClientId), - subscriptions => session_read_subscriptions(ClientId) - }; - [] -> - undefined - end - end - ). + emqx_persistent_session_ds_state:print_session(ClientId). %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE @@ -275,39 +252,74 @@ print_session(ClientId) -> subscribe( TopicFilter, SubOpts, - Session = #{id := ID, subscriptions := Subs} + Session = #{id := ID, s := S0} ) -> - case subs_lookup(TopicFilter, Subs) of - Subscription = #{} -> - NSubscription = update_subscription(TopicFilter, Subscription, SubOpts, ID), - NSubs = subs_insert(TopicFilter, NSubscription, Subs), - {ok, Session#{subscriptions := NSubs}}; + case subs_lookup(TopicFilter, S0) of undefined -> - % TODO: max_subscriptions - Subscription = add_subscription(TopicFilter, SubOpts, ID), - NSubs = subs_insert(TopicFilter, Subscription, Subs), - {ok, Session#{subscriptions := NSubs}} - end. + %% N.B.: we chose to update the router before adding the + %% subscription to the session/iterator table. The + %% reasoning for this is as follows: + %% + %% Messages matching this topic filter should start to be + %% persisted as soon as possible to avoid missing + %% messages. If this is the first such persistent session + %% subscription, it's important to do so early on. + %% + %% This could, in turn, lead to some inconsistency: if + %% such a route gets created but the session/iterator data + %% fails to be updated accordingly, we have a dangling + %% route. To remove such dangling routes, we may have a + %% periodic GC process that removes routes that do not + %% have a matching persistent subscription. Also, route + %% operations use dirty mnesia operations, which + %% inherently have room for inconsistencies. + %% + %% In practice, we use the iterator reference table as a + %% source of truth, since it is guarded by a transaction + %% context: we consider a subscription operation to be + %% successful if it ended up changing this table. Both + %% router and iterator information can be reconstructed + %% from this table, if needed. + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID), + Subscription = #{ + start_time => now_ms(), + props => SubOpts + }, + IsNew = true; + Subscription0 = #{} -> + Subscription = Subscription0#{props => SubOpts}, + IsNew = false + end, + S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0), + ?tp(persistent_session_ds_subscription_added, #{ + topic_filter => TopicFilter, sub => Subscription, is_new => IsNew + }), + {ok, Session#{s => S}}. -spec unsubscribe(topic_filter(), session()) -> {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. unsubscribe( TopicFilter, - Session = #{id := ID, subscriptions := Subs} + Session = #{id := ID, s := S0} ) -> - case subs_lookup(TopicFilter, Subs) of - _Subscription = #{props := SubOpts} -> - ok = del_subscription(TopicFilter, ID), - NSubs = subs_delete(TopicFilter, Subs), - {ok, Session#{subscriptions := NSubs}, SubOpts}; + %% TODO: drop streams and messages from the buffer + case subs_lookup(TopicFilter, S0) of + #{props := SubOpts} -> + S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), + ?tp_span( + persistent_session_ds_subscription_route_delete, + #{session_id => ID}, + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, ID) + ), + {ok, Session#{s => S}, SubOpts}; undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED} end. -spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. -get_subscription(TopicFilter, #{subscriptions := Subs}) -> - case subs_lookup(TopicFilter, Subs) of +get_subscription(TopicFilter, #{s := S}) -> + case subs_lookup(TopicFilter, S) of _Subscription = #{props := SubOpts} -> SubOpts; undefined -> @@ -333,15 +345,12 @@ publish(_PacketId, Msg, Session) -> -spec puback(clientinfo(), emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}. -puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> - case emqx_persistent_message_ds_replayer:commit_offset(Id, ack, PacketId, Inflight0) of - {true, Inflight} -> - %% TODO: we pass a bogus message into the hook: - Msg = emqx_message:make(Id, <<>>, <<>>), - {ok, Msg, [], pull_now(Session#{inflight => Inflight})}; - {false, _} -> - %% Invalid Packet Id - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} +puback(_ClientInfo, PacketId, Session0) -> + case commit_seqno(puback, PacketId, Session0) of + {ok, Msg, Session} -> + {ok, Msg, [], inc_send_quota(Session)}; + Error -> + Error end. %%-------------------------------------------------------------------- @@ -351,15 +360,12 @@ puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> -spec pubrec(emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}. -pubrec(PacketId, Session = #{id := Id, inflight := Inflight0}) -> - case emqx_persistent_message_ds_replayer:commit_offset(Id, rec, PacketId, Inflight0) of - {true, Inflight} -> - %% TODO: we pass a bogus message into the hook: - Msg = emqx_message:make(Id, <<>>, <<>>), - {ok, Msg, pull_now(Session#{inflight => Inflight})}; - {false, _} -> - %% Invalid Packet Id - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} +pubrec(PacketId, Session0) -> + case commit_seqno(pubrec, PacketId, Session0) of + {ok, Msg, Session} -> + {ok, Msg, Session}; + Error = {error, _} -> + Error end. %%-------------------------------------------------------------------- @@ -379,15 +385,12 @@ pubrel(_PacketId, Session = #{}) -> -spec pubcomp(clientinfo(), emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}. -pubcomp(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> - case emqx_persistent_message_ds_replayer:commit_offset(Id, comp, PacketId, Inflight0) of - {true, Inflight} -> - %% TODO - Msg = emqx_message:make(Id, <<>>, <<>>), - {ok, Msg, [], Session#{inflight => Inflight}}; - {false, _} -> - %% Invalid Packet Id - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} +pubcomp(_ClientInfo, PacketId, Session0) -> + case commit_seqno(pubcomp, PacketId, Session0) of + {ok, Msg, Session} -> + {ok, Msg, [], inc_send_quota(Session)}; + Error = {error, _} -> + Error end. %%-------------------------------------------------------------------- @@ -403,215 +406,87 @@ deliver(_ClientInfo, _Delivers, Session) -> handle_timeout( ClientInfo, ?TIMER_PULL, - Session0 = #{ - id := Id, - inflight := Inflight0, - subscriptions := Subs, - props := Conf, - receive_maximum := ReceiveMaximum - } + Session0 ) -> - MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]), - BatchSize = min(ReceiveMaximum, MaxBatchSize), - UpgradeQoS = maps:get(upgrade_qos, Conf), - PreprocFun = make_preproc_fun(ClientInfo, Subs, UpgradeQoS), - {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( - PreprocFun, - Id, - Inflight0, - BatchSize - ), - IdlePollInterval = emqx_config:get([session_persistence, idle_poll_interval]), + {Publishes, Session1} = drain_buffer(fill_buffer(Session0, ClientInfo)), Timeout = case Publishes of [] -> - IdlePollInterval; + emqx_config:get([session_persistence, idle_poll_interval]); [_ | _] -> 0 end, - Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session0#{inflight := Inflight}), + Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; -handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) -> - renew_streams(Session), +handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> + S = renew_streams(S0), Interval = emqx_config:get([session_persistence, renew_streams_interval]), - {ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, Interval, Session)}; -handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> - %% Note: we take a pessimistic approach here and assume that the client will be alive - %% until the next bump timeout. With this, we avoid garbage collecting this session - %% too early in case the session/connection/node crashes earlier without having time - %% to commit the time. - BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), - EstimatedLastAliveAt = now_ms() + BumpInterval, - Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), - {ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)}; + Session = emqx_session:ensure_timer( + ?TIMER_GET_STREAMS, + Interval, + Session0#{s => S} + ), + {ok, [], Session}; +handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) -> + S = emqx_persistent_session_ds_state:commit(bump_last_alive(S0)), + Session = emqx_session:ensure_timer( + ?TIMER_BUMP_LAST_ALIVE_AT, + bump_interval(), + Session0#{s => S} + ), + {ok, [], Session}; handle_timeout(_ClientInfo, expire_awaiting_rel, Session) -> %% TODO: stub {ok, [], Session}. +bump_last_alive(S0) -> + %% Note: we take a pessimistic approach here and assume that the client will be alive + %% until the next bump timeout. With this, we avoid garbage collecting this session + %% too early in case the session/connection/node crashes earlier without having time + %% to commit the time. + EstimatedLastAliveAt = now_ms() + bump_interval(), + emqx_persistent_session_ds_state:set_last_alive_at(EstimatedLastAliveAt, S0). + -spec replay(clientinfo(), [], session()) -> {ok, replies(), session()}. -replay( - ClientInfo, - [], - Session = #{inflight := Inflight0, subscriptions := Subs, props := Conf} -) -> - UpgradeQoS = maps:get(upgrade_qos, Conf), - PreprocFun = make_preproc_fun(ClientInfo, Subs, UpgradeQoS), - {Replies, Inflight} = emqx_persistent_message_ds_replayer:replay(PreprocFun, Inflight0), - {ok, Replies, Session#{inflight := Inflight}}. - +replay(ClientInfo, [], Session0) -> + Streams = find_replay_streams(Session0), + Session = lists:foldl( + fun({StreamKey, Stream}, SessionAcc) -> + replay_batch(StreamKey, Stream, SessionAcc, ClientInfo) + end, + Session0, + Streams + ), + %% Note: we filled the buffer with the historical messages, and + %% from now on we'll rely on the normal inflight/flow control + %% mechanisms to replay them: + {ok, [], pull_now(Session)}. %%-------------------------------------------------------------------- -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. -disconnect(Session0, ConnInfo) -> - Session = session_set_last_alive_at_trans(Session0, ConnInfo, now_ms()), - {shutdown, Session}. +disconnect(Session = #{s := S0}, _ConnInfo) -> + S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0), + S = emqx_persistent_session_ds_state:commit(S1), + {shutdown, Session#{s => S}}. -spec terminate(Reason :: term(), session()) -> ok. -terminate(_Reason, _Session = #{}) -> +terminate(_Reason, _Session = #{s := S}) -> + emqx_persistent_session_ds_state:commit(S), ok. -%%-------------------------------------------------------------------- - -make_preproc_fun(ClientInfo, Subs, UpgradeQoS) -> - fun(Message = #message{topic = Topic}) -> - emqx_utils:flattermap( - fun(Match) -> - #{props := SubOpts} = subs_get_match(Match, Subs), - emqx_session:enrich_message(ClientInfo, Message, SubOpts, UpgradeQoS) - end, - subs_matches(Topic, Subs) - ) - end. - -%%-------------------------------------------------------------------- - --spec add_subscription(topic_filter(), emqx_types:subopts(), id()) -> - subscription(). -add_subscription(TopicFilter, SubOpts, DSSessionID) -> - %% N.B.: we chose to update the router before adding the subscription to the - %% session/iterator table. The reasoning for this is as follows: - %% - %% Messages matching this topic filter should start to be persisted as soon as - %% possible to avoid missing messages. If this is the first such persistent - %% session subscription, it's important to do so early on. - %% - %% This could, in turn, lead to some inconsistency: if such a route gets - %% created but the session/iterator data fails to be updated accordingly, we - %% have a dangling route. To remove such dangling routes, we may have a - %% periodic GC process that removes routes that do not have a matching - %% persistent subscription. Also, route operations use dirty mnesia - %% operations, which inherently have room for inconsistencies. - %% - %% In practice, we use the iterator reference table as a source of truth, - %% since it is guarded by a transaction context: we consider a subscription - %% operation to be successful if it ended up changing this table. Both router - %% and iterator information can be reconstructed from this table, if needed. - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, DSSessionID), - {ok, DSSubExt, IsNew} = session_add_subscription( - DSSessionID, TopicFilter, SubOpts - ), - ?tp(persistent_session_ds_subscription_added, #{sub => DSSubExt, is_new => IsNew}), - %% we'll list streams and open iterators when implementing message replay. - DSSubExt. - --spec update_subscription(topic_filter(), subscription(), emqx_types:subopts(), id()) -> - subscription(). -update_subscription(TopicFilter, DSSubExt, SubOpts, DSSessionID) -> - {ok, NDSSubExt, false} = session_add_subscription( - DSSessionID, TopicFilter, SubOpts - ), - ok = ?tp(persistent_session_ds_iterator_updated, #{sub => DSSubExt}), - NDSSubExt. - --spec del_subscription(topic_filter(), id()) -> - ok. -del_subscription(TopicFilter, DSSessionId) -> - %% TODO: transaction? - ?tp_span( - persistent_session_ds_subscription_delete, - #{session_id => DSSessionId}, - ok = session_del_subscription(DSSessionId, TopicFilter) - ), - ?tp_span( - persistent_session_ds_subscription_route_delete, - #{session_id => DSSessionId}, - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, DSSessionId) - ). - %%-------------------------------------------------------------------- %% Session tables operations %%-------------------------------------------------------------------- create_tables() -> - ok = mria:create_table( - ?SESSION_TAB, - [ - {rlog_shard, ?DS_MRIA_SHARD}, - {type, set}, - {storage, storage()}, - {record_name, session}, - {attributes, record_info(fields, session)} - ] - ), - ok = mria:create_table( - ?SESSION_SUBSCRIPTIONS_TAB, - [ - {rlog_shard, ?DS_MRIA_SHARD}, - {type, ordered_set}, - {storage, storage()}, - {record_name, ds_sub}, - {attributes, record_info(fields, ds_sub)} - ] - ), - ok = mria:create_table( - ?SESSION_STREAM_TAB, - [ - {rlog_shard, ?DS_MRIA_SHARD}, - {type, bag}, - {storage, storage()}, - {record_name, ds_stream}, - {attributes, record_info(fields, ds_stream)} - ] - ), - ok = mria:create_table( - ?SESSION_PUBRANGE_TAB, - [ - {rlog_shard, ?DS_MRIA_SHARD}, - {type, ordered_set}, - {storage, storage()}, - {record_name, ds_pubrange}, - {attributes, record_info(fields, ds_pubrange)} - ] - ), - ok = mria:create_table( - ?SESSION_COMMITTED_OFFSET_TAB, - [ - {rlog_shard, ?DS_MRIA_SHARD}, - {type, set}, - {storage, storage()}, - {record_name, ds_committed_offset}, - {attributes, record_info(fields, ds_committed_offset)} - ] - ), - ok = mria:wait_for_tables([ - ?SESSION_TAB, - ?SESSION_SUBSCRIPTIONS_TAB, - ?SESSION_STREAM_TAB, - ?SESSION_PUBRANGE_TAB, - ?SESSION_COMMITTED_OFFSET_TAB - ]), - ok. + emqx_persistent_session_ds_state:create_tables(). --dialyzer({nowarn_function, storage/0}). -storage() -> - %% FIXME: This is a temporary workaround to avoid crashes when starting on Windows - case mria:rocksdb_backend_available() of - true -> - rocksdb_copies; - _ -> - disc_copies - end. +-define(IS_EXPIRED(NOW_MS, LAST_ALIVE_AT, EI), + (is_number(LAST_ALIVE_AT) andalso + is_number(EI) andalso + (NOW_MS >= LAST_ALIVE_AT + EI)) +). %% @doc Called when a client connects. This function looks up a %% session or returns `false` if previous one couldn't be found. @@ -622,204 +497,59 @@ storage() -> session() | false. session_open(SessionId, NewConnInfo) -> NowMS = now_ms(), - transaction(fun() -> - case mnesia:read(?SESSION_TAB, SessionId, write) of - [Record0 = #session{last_alive_at = LastAliveAt, conninfo = ConnInfo}] -> - EI = expiry_interval(ConnInfo), - case ?IS_EXPIRED(NowMS, LastAliveAt, EI) of - true -> - session_drop(SessionId), - false; - false -> - %% new connection being established - Record1 = Record0#session{conninfo = NewConnInfo}, - Record = session_set_last_alive_at(Record1, NowMS), - Session = export_session(Record), - DSSubs = session_read_subscriptions(SessionId), - Subscriptions = export_subscriptions(DSSubs), - Inflight = emqx_persistent_message_ds_replayer:open(SessionId), - Session#{ - conninfo => NewConnInfo, - inflight => Inflight, - subscriptions => Subscriptions - } - end; - _ -> - false - end - end). + case emqx_persistent_session_ds_state:open(SessionId) of + {ok, S0} -> + EI = expiry_interval(emqx_persistent_session_ds_state:get_conninfo(S0)), + LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S0), + case ?IS_EXPIRED(NowMS, LastAliveAt, EI) of + true -> + emqx_persistent_session_ds_state:delete(SessionId), + false; + false -> + %% New connection being established + S1 = emqx_persistent_session_ds_state:set_conninfo(NewConnInfo, S0), + S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1), + S = emqx_persistent_session_ds_state:commit(S2), + Inflight = emqx_persistent_session_ds_inflight:new( + receive_maximum(NewConnInfo) + ), + #{ + id => SessionId, + s => S, + inflight => Inflight, + props => #{} + } + end; + undefined -> + false + end. --spec session_ensure_new(id(), emqx_types:conninfo()) -> +-spec session_ensure_new(id(), emqx_types:conninfo(), emqx_session:conf()) -> session(). -session_ensure_new(SessionId, ConnInfo) -> - transaction(fun() -> - ok = session_drop_records(SessionId), - Session = export_session(session_create(SessionId, ConnInfo)), - Session#{ - subscriptions => subs_new(), - inflight => emqx_persistent_message_ds_replayer:new() - } - end). - -session_create(SessionId, ConnInfo) -> - Session = #session{ - id = SessionId, - created_at = now_ms(), - last_alive_at = now_ms(), - conninfo = ConnInfo - }, - ok = mnesia:write(?SESSION_TAB, Session, write), - Session. - -session_set_last_alive_at_trans(Session, LastAliveAt) -> - #{conninfo := ConnInfo} = Session, - session_set_last_alive_at_trans(Session, ConnInfo, LastAliveAt). - -session_set_last_alive_at_trans(Session, NewConnInfo, LastAliveAt) -> - #{id := SessionId} = Session, - transaction(fun() -> - case mnesia:read(?SESSION_TAB, SessionId, write) of - [#session{} = SessionRecord0] -> - SessionRecord = SessionRecord0#session{conninfo = NewConnInfo}, - _ = session_set_last_alive_at(SessionRecord, LastAliveAt), - ok; - _ -> - %% log and crash? - ok - end - end), - Session#{conninfo := NewConnInfo, last_alive_at := LastAliveAt}. - -session_set_last_alive_at(SessionRecord0, LastAliveAt) -> - SessionRecord = SessionRecord0#session{last_alive_at = LastAliveAt}, - ok = mnesia:write(?SESSION_TAB, SessionRecord, write), - SessionRecord. +session_ensure_new(Id, ConnInfo, Conf) -> + Now = now_ms(), + S0 = emqx_persistent_session_ds_state:create_new(Id), + S1 = emqx_persistent_session_ds_state:set_conninfo(ConnInfo, S0), + S2 = bump_last_alive(S1), + S3 = emqx_persistent_session_ds_state:set_created_at(Now, S2), + S4 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), 0, S3), + S5 = emqx_persistent_session_ds_state:put_seqno(?committed(?QOS_1), 0, S4), + S6 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), 0, S5), + S7 = emqx_persistent_session_ds_state:put_seqno(?committed(?QOS_2), 0, S6), + S8 = emqx_persistent_session_ds_state:put_seqno(?pubrec, 0, S7), + S = emqx_persistent_session_ds_state:commit(S8), + #{ + id => Id, + props => Conf, + s => S, + inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo)) + }. %% @doc Called when a client reconnects with `clean session=true' or %% during session GC -spec session_drop(id()) -> ok. -session_drop(DSSessionId) -> - transaction(fun() -> - ok = session_drop_records(DSSessionId), - ok = mnesia:delete(?SESSION_TAB, DSSessionId, write) - end). - --spec session_drop_records(id()) -> ok. -session_drop_records(DSSessionId) -> - ok = session_drop_subscriptions(DSSessionId), - ok = session_drop_pubranges(DSSessionId), - ok = session_drop_offsets(DSSessionId), - ok = session_drop_streams(DSSessionId). - --spec session_drop_subscriptions(id()) -> ok. -session_drop_subscriptions(DSSessionId) -> - Subscriptions = session_read_subscriptions(DSSessionId, write), - lists:foreach( - fun(#ds_sub{id = DSSubId} = DSSub) -> - TopicFilter = subscription_id_to_topic_filter(DSSubId), - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, DSSessionId), - ok = session_del_subscription(DSSub) - end, - Subscriptions - ). - -%% @doc Called when a client subscribes to a topic. Idempotent. --spec session_add_subscription(id(), topic_filter(), _Props :: map()) -> - {ok, subscription(), _IsNew :: boolean()}. -session_add_subscription(DSSessionId, TopicFilter, Props) -> - DSSubId = {DSSessionId, TopicFilter}, - transaction(fun() -> - case mnesia:read(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write) of - [] -> - DSSub = session_insert_subscription(DSSessionId, TopicFilter, Props), - DSSubExt = export_subscription(DSSub), - ?tp( - ds_session_subscription_added, - #{sub => DSSubExt, session_id => DSSessionId} - ), - {ok, DSSubExt, _IsNew = true}; - [#ds_sub{} = DSSub] -> - NDSSub = session_update_subscription(DSSub, Props), - NDSSubExt = export_subscription(NDSSub), - ?tp( - ds_session_subscription_present, - #{sub => NDSSubExt, session_id => DSSessionId} - ), - {ok, NDSSubExt, _IsNew = false} - end - end). - --spec session_insert_subscription(id(), topic_filter(), map()) -> ds_sub(). -session_insert_subscription(DSSessionId, TopicFilter, Props) -> - {DSSubId, StartMS} = new_subscription_id(DSSessionId, TopicFilter), - DSSub = #ds_sub{ - id = DSSubId, - start_time = StartMS, - props = Props, - extra = #{} - }, - ok = mnesia:write(?SESSION_SUBSCRIPTIONS_TAB, DSSub, write), - DSSub. - --spec session_update_subscription(ds_sub(), map()) -> ds_sub(). -session_update_subscription(DSSub, Props) -> - NDSSub = DSSub#ds_sub{props = Props}, - ok = mnesia:write(?SESSION_SUBSCRIPTIONS_TAB, NDSSub, write), - NDSSub. - -session_del_subscription(DSSessionId, TopicFilter) -> - DSSubId = {DSSessionId, TopicFilter}, - transaction(fun() -> - mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write) - end). - -session_del_subscription(#ds_sub{id = DSSubId}) -> - mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write). - -session_read_subscriptions(DSSessionID) -> - session_read_subscriptions(DSSessionID, read). - -session_read_subscriptions(DSSessionId, LockKind) -> - MS = ets:fun2ms( - fun(Sub = #ds_sub{id = {Sess, _}}) when Sess =:= DSSessionId -> - Sub - end - ), - mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, LockKind). - -session_read_pubranges(DSSessionID) -> - session_read_pubranges(DSSessionID, read). - -session_read_pubranges(DSSessionId, LockKind) -> - MS = ets:fun2ms( - fun(#ds_pubrange{id = ID}) when element(1, ID) =:= DSSessionId -> - ID - end - ), - mnesia:select(?SESSION_PUBRANGE_TAB, MS, LockKind). - -session_read_offsets(DSSessionID) -> - session_read_offsets(DSSessionID, read). - -session_read_offsets(DSSessionId, LockKind) -> - MS = ets:fun2ms( - fun(#ds_committed_offset{id = {Sess, Type}}) when Sess =:= DSSessionId -> - {DSSessionId, Type} - end - ), - mnesia:select(?SESSION_COMMITTED_OFFSET_TAB, MS, LockKind). - --spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}. -new_subscription_id(DSSessionId, TopicFilter) -> - %% Note: here we use _milliseconds_ to match with the timestamp - %% field of `#message' record. - NowMS = now_ms(), - DSSubId = {DSSessionId, TopicFilter}, - {DSSubId, NowMS}. - --spec subscription_id_to_topic_filter(subscription_id()) -> topic_filter(). -subscription_id_to_topic_filter({_DSSessionId, TopicFilter}) -> - TopicFilter. +session_drop(ID) -> + emqx_persistent_session_ds_state:delete(ID). now_ms() -> erlang:system_time(millisecond). @@ -845,124 +575,341 @@ do_ensure_all_iterators_closed(_DSSessionID) -> ok. %%-------------------------------------------------------------------- -%% Reading batches +%% Buffer filling %%-------------------------------------------------------------------- --spec renew_streams(session()) -> ok. -renew_streams(#{id := SessionId, subscriptions := Subscriptions}) -> - transaction(fun() -> - ExistingStreams = mnesia:read(?SESSION_STREAM_TAB, SessionId, write), - subs_fold( - fun(TopicFilter, #{start_time := StartTime}, Streams) -> - TopicFilterWords = emqx_topic:words(TopicFilter), - renew_topic_streams(SessionId, TopicFilterWords, StartTime, Streams) - end, - ExistingStreams, - Subscriptions - ) - end), - ok. +fill_buffer(Session = #{s := S}, ClientInfo) -> + fill_buffer(shuffle(find_new_streams(S)), Session, ClientInfo). --spec renew_topic_streams(id(), topic_filter_words(), emqx_ds:time(), _Acc :: [ds_stream()]) -> ok. -renew_topic_streams(DSSessionId, TopicFilter, StartTime, ExistingStreams) -> - TopicStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), - lists:foldl( - fun({Rank, Stream}, Streams) -> - case lists:keymember(Stream, #ds_stream.stream, Streams) of - true -> - Streams; - false -> - StreamRef = length(Streams) + 1, - DSStream = session_store_stream( - DSSessionId, - StreamRef, - Stream, - Rank, - TopicFilter, - StartTime +-spec shuffle([A]) -> [A]. +shuffle(L0) -> + L1 = lists:map( + fun(A) -> + %% maybe topic/stream prioritization could be introduced here? + {rand:uniform(), A} + end, + L0 + ), + L2 = lists:sort(L1), + {_, L} = lists:unzip(L2), + L. + +fill_buffer([], Session, _ClientInfo) -> + Session; +fill_buffer( + [{StreamKey, Stream0 = #ifs{it_end = It0}} | Streams], + Session0 = #{s := S0, inflight := Inflight0}, + ClientInfo +) -> + BatchSize = emqx_config:get([session_persistence, max_batch_size]), + MaxBufferSize = BatchSize * 2, + case emqx_persistent_session_ds_inflight:n_buffered(Inflight0) < MaxBufferSize of + true -> + case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It0, BatchSize) of + {ok, It, []} -> + S = emqx_persistent_session_ds_state:put_stream( + StreamKey, Stream0#ifs{it_end = It}, S0 ), - [DSStream | Streams] + fill_buffer(Streams, Session0#{s := S}, ClientInfo); + {ok, It, Messages} -> + Session = new_batch(StreamKey, Stream0, It, Messages, Session0, ClientInfo), + fill_buffer(Streams, Session, ClientInfo); + {ok, end_of_stream} -> + S = emqx_persistent_session_ds_state:put_stream( + StreamKey, Stream0#ifs{it_end = end_of_stream}, S0 + ), + fill_buffer(Streams, Session0#{s := S}, ClientInfo) + end; + false -> + Session0 + end. + +new_batch( + StreamKey, Stream0, Iterator, [{BatchBeginMsgKey, _} | _] = Messages0, Session0, ClientInfo +) -> + #{inflight := Inflight0, s := S0} = Session0, + FirstSeqnoQos1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0), + FirstSeqnoQos2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0), + NBefore = emqx_persistent_session_ds_inflight:n_buffered(Inflight0), + {LastSeqnoQos1, LastSeqnoQos2, Session} = do_process_batch( + false, FirstSeqnoQos1, FirstSeqnoQos2, Messages0, Session0, ClientInfo + ), + NAfter = emqx_persistent_session_ds_inflight:n_buffered(maps:get(inflight, Session)), + Stream = Stream0#ifs{ + batch_size = NAfter - NBefore, + batch_begin_key = BatchBeginMsgKey, + first_seqno_qos1 = FirstSeqnoQos1, + first_seqno_qos2 = FirstSeqnoQos2, + last_seqno_qos1 = LastSeqnoQos1, + last_seqno_qos2 = LastSeqnoQos2, + it_end = Iterator + }, + S1 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), LastSeqnoQos1, S0), + S2 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), LastSeqnoQos2, S1), + S = emqx_persistent_session_ds_state:put_stream(StreamKey, Stream, S2), + Session#{s => S}. + +replay_batch(_StreamKey, Stream, Session0, ClientInfo) -> + #ifs{ + batch_begin_key = BatchBeginMsgKey, + batch_size = BatchSize, + first_seqno_qos1 = FirstSeqnoQos1, + first_seqno_qos2 = FirstSeqnoQos2, + it_end = ItEnd + } = Stream, + {ok, ItBegin} = emqx_ds:update_iterator(?PERSISTENT_MESSAGE_DB, ItEnd, BatchBeginMsgKey), + case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of + {ok, _ItEnd, Messages} -> + {_LastSeqnoQo1, _LastSeqnoQos2, Session} = do_process_batch( + true, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Session0, ClientInfo + ), + %% TODO: check consistency of the sequence numbers + Session + end. + +do_process_batch(_IsReplay, LastSeqnoQos1, LastSeqnoQos2, [], Session, _ClientInfo) -> + {LastSeqnoQos1, LastSeqnoQos2, Session}; +do_process_batch(IsReplay, FirstSeqnoQos1, FirstSeqnoQos2, [KV | Messages], Session, ClientInfo) -> + #{s := S, props := #{upgrade_qos := UpgradeQoS}, inflight := Inflight0} = Session, + {_DsMsgKey, Msg0 = #message{topic = Topic}} = KV, + Subs = emqx_persistent_session_ds_state:get_subscriptions(S), + Msgs = [ + Msg + || SubMatch <- emqx_topic_gbt:matches(Topic, Subs, []), + Msg <- begin + #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs), + emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS) + end + ], + CommittedQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), + CommittedQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), + {Inflight, LastSeqnoQos1, LastSeqnoQos2} = lists:foldl( + fun(Msg = #message{qos = Qos}, {Inflight1, SeqnoQos10, SeqnoQos20}) -> + case Qos of + ?QOS_0 -> + SeqnoQos1 = SeqnoQos10, + SeqnoQos2 = SeqnoQos20, + PacketId = undefined; + ?QOS_1 -> + SeqnoQos1 = inc_seqno(?QOS_1, SeqnoQos10), + SeqnoQos2 = SeqnoQos20, + PacketId = seqno_to_packet_id(?QOS_1, SeqnoQos1); + ?QOS_2 -> + SeqnoQos1 = SeqnoQos10, + SeqnoQos2 = inc_seqno(?QOS_2, SeqnoQos20), + PacketId = seqno_to_packet_id(?QOS_2, SeqnoQos2) + end, + %% ?SLOG(debug, #{ + %% msg => "out packet", + %% qos => Qos, + %% packet_id => PacketId, + %% enriched => emqx_message:to_map(Msg), + %% original => emqx_message:to_map(Msg0), + %% upgrade_qos => UpgradeQoS + %% }), + + %% Handle various situations where we want to ignore the packet: + Inflight2 = + case IsReplay of + true when Qos =:= ?QOS_0 -> + Inflight1; + true when Qos =:= ?QOS_1, SeqnoQos1 < CommittedQos1 -> + Inflight1; + true when Qos =:= ?QOS_2, SeqnoQos2 < CommittedQos2 -> + Inflight1; + _ -> + emqx_persistent_session_ds_inflight:push({PacketId, Msg}, Inflight1) + end, + { + Inflight2, + SeqnoQos1, + SeqnoQos2 + } + end, + {Inflight0, FirstSeqnoQos1, FirstSeqnoQos2}, + Msgs + ), + do_process_batch( + IsReplay, LastSeqnoQos1, LastSeqnoQos2, Messages, Session#{inflight => Inflight}, ClientInfo + ). + +%%-------------------------------------------------------------------- +%% Buffer drain +%%-------------------------------------------------------------------- + +drain_buffer(Session = #{inflight := Inflight0}) -> + {Messages, Inflight} = emqx_persistent_session_ds_inflight:pop(Inflight0), + {Messages, Session#{inflight => Inflight}}. + +%%-------------------------------------------------------------------- +%% Stream renew +%%-------------------------------------------------------------------- + +%% erlfmt-ignore +-define(fully_replayed(STREAM, COMMITTEDQOS1, COMMITTEDQOS2), + ((STREAM#ifs.last_seqno_qos1 =< COMMITTEDQOS1 orelse STREAM#ifs.last_seqno_qos1 =:= undefined) andalso + (STREAM#ifs.last_seqno_qos2 =< COMMITTEDQOS2 orelse STREAM#ifs.last_seqno_qos2 =:= undefined))). + +-spec find_replay_streams(session()) -> + [{emqx_persistent_session_ds_state:stream_key(), stream_state()}]. +find_replay_streams(#{s := S}) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), + Streams = emqx_persistent_session_ds_state:fold_streams( + fun(Key, Stream, Acc) -> + case Stream of + #ifs{ + first_seqno_qos1 = F1, + first_seqno_qos2 = F2, + last_seqno_qos1 = L1, + last_seqno_qos2 = L2 + } when F1 >= CommQos1, L1 =< CommQos1, F2 >= CommQos2, L2 =< CommQos2 -> + [{Key, Stream} | Acc]; + _ -> + Acc end end, - ExistingStreams, - TopicStreams - ). - -session_store_stream(DSSessionId, StreamRef, Stream, Rank, TopicFilter, StartTime) -> - {ok, ItBegin} = emqx_ds:make_iterator( - ?PERSISTENT_MESSAGE_DB, - Stream, - TopicFilter, - StartTime + [], + S ), - DSStream = #ds_stream{ - session = DSSessionId, - ref = StreamRef, - stream = Stream, - rank = Rank, - beginning = ItBegin - }, - mnesia:write(?SESSION_STREAM_TAB, DSStream, write), - DSStream. - -%% must be called inside a transaction --spec session_drop_streams(id()) -> ok. -session_drop_streams(DSSessionId) -> - mnesia:delete(?SESSION_STREAM_TAB, DSSessionId, write). - -%% must be called inside a transaction --spec session_drop_pubranges(id()) -> ok. -session_drop_pubranges(DSSessionId) -> - RangeIds = session_read_pubranges(DSSessionId, write), - lists:foreach( - fun(RangeId) -> - mnesia:delete(?SESSION_PUBRANGE_TAB, RangeId, write) + lists:sort( + fun( + #ifs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}, + #ifs{first_seqno_qos1 = B1, first_seqno_qos2 = B2} + ) -> + case A1 =:= A2 of + true -> B1 =< B2; + false -> A1 < A2 + end end, - RangeIds + Streams ). -%% must be called inside a transaction --spec session_drop_offsets(id()) -> ok. -session_drop_offsets(DSSessionId) -> - OffsetIds = session_read_offsets(DSSessionId, write), - lists:foreach( - fun(OffsetId) -> - mnesia:delete(?SESSION_COMMITTED_OFFSET_TAB, OffsetId, write) +-spec find_new_streams(emqx_persistent_session_ds_state:t()) -> + [{emqx_persistent_session_ds_state:stream_key(), stream_state()}]. +find_new_streams(S) -> + %% FIXME: this function is currently very sensitive to the + %% consistency of the packet IDs on both broker and client side. + %% + %% If the client fails to properly ack packets due to a bug, or a + %% network issue, or if the state of streams and seqno tables ever + %% become de-synced, then this function will return an empty list, + %% and the replay cannot progress. + %% + %% In other words, this function is not robust, and we should find + %% some way to get the replays un-stuck at the cost of potentially + %% losing messages during replay (or just kill the stuck channel + %% after timeout?) + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), + emqx_persistent_session_ds_state:fold_streams( + fun + (Key, Stream, Acc) when ?fully_replayed(Stream, CommQos1, CommQos2) -> + %% This stream has been full acked by the client. It + %% means we can get more messages from it: + [{Key, Stream} | Acc]; + (_Key, _Stream, Acc) -> + Acc end, - OffsetIds + [], + S + ). + +-spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). +renew_streams(S0) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0), + subs_fold( + fun(TopicFilterBin, _Subscription = #{start_time := StartTime}, S1) -> + SubId = [], + TopicFilter = emqx_topic:words(TopicFilterBin), + TopicStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), + TopicStreamGroups = maps:groups_from_list(fun({{X, _}, _}) -> X end, TopicStreams), + %% Iterate over groups of streams with the same rank X, + %% finding the first eligible stream to replay: + maps:fold( + fun(RankX, Streams, S2) -> + Key = {RankX, SubId}, + case emqx_persistent_session_ds_state:get_stream(Key, S2) of + undefined -> + MinRankY = emqx_persistent_session_ds_state:get_rank(RankX, S2), + start_stream_replay( + TopicFilter, StartTime, Key, MinRankY, Streams, S2 + ); + Stream = #ifs{it_end = end_of_stream, rank_y = MinRankY} when + ?fully_replayed(Stream, CommQos1, CommQos2) + -> + %% We have fully replayed the stream with + %% the given rank X, and the client acked + %% all messages: + S3 = emqx_persistent_session_ds_state:del_stream(Key, S2), + S4 = emqx_persistent_session_ds_state:put_rank(RankX, MinRankY, S3), + start_stream_replay(TopicFilter, StartTime, Key, MinRankY, Streams, S4); + #ifs{} -> + %% Stream replay is currently in progress, leave it as is: + S2 + end + end, + S1, + TopicStreamGroups + ) + end, + S0, + S0 + ). + +start_stream_replay(TopicFilter, StartTime, Key, MinRankY, Streams, S0) -> + case find_first_stream(MinRankY, Streams) of + {RankY, Stream} -> + {ok, Iterator} = emqx_ds:make_iterator( + ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime + ), + NewStreamState = #ifs{ + rank_y = RankY, + it_end = Iterator + }, + emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S0); + undefined -> + S0 + end. + +%% @doc Find the first stream with rank Y greater than the one given as the first argument. +-spec find_first_stream(emqx_ds:rank_y() | undefined, [ + {emqx_ds:stream_rank(), emqx_ds:ds_specific_stream()} +]) -> + {emqx_ds:rank_y(), emqx_ds:ds_specific_stream()} | undefined. +find_first_stream(MinRankY, Streams) -> + lists:foldl( + fun + ({{_RankX, RankY}, Stream}, Acc) when RankY > MinRankY; MinRankY =:= undefined -> + case Acc of + {AccY, _} when AccY < RankY -> + Acc; + _ -> + {RankY, Stream} + end; + (_, Acc) -> + Acc + end, + undefined, + Streams ). %%-------------------------------------------------------------------------------- -subs_new() -> - emqx_topic_gbt:new(). - -subs_lookup(TopicFilter, Subs) -> +subs_lookup(TopicFilter, S) -> + Subs = emqx_persistent_session_ds_state:get_subscriptions(S), emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined). -subs_insert(TopicFilter, Subscription, Subs) -> - emqx_topic_gbt:insert(TopicFilter, [], Subscription, Subs). - -subs_delete(TopicFilter, Subs) -> - emqx_topic_gbt:delete(TopicFilter, [], Subs). - -subs_matches(Topic, Subs) -> - emqx_topic_gbt:matches(Topic, Subs, []). - -subs_get_match(M, Subs) -> - emqx_topic_gbt:get_record(M, Subs). - -subs_size(Subs) -> - emqx_topic_gbt:size(Subs). - -subs_to_map(Subs) -> +subs_to_map(S) -> subs_fold( fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, #{}, - Subs + S ). -subs_fold(Fun, AccIn, Subs) -> +subs_fold(Fun, AccIn, S) -> + Subs = emqx_persistent_session_ds_state:get_subscriptions(S), emqx_topic_gbt:fold( fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, AccIn, @@ -971,41 +918,6 @@ subs_fold(Fun, AccIn, Subs) -> %%-------------------------------------------------------------------------------- -transaction(Fun) -> - case mnesia:is_transaction() of - true -> - Fun(); - false -> - {atomic, Res} = mria:transaction(?DS_MRIA_SHARD, Fun), - Res - end. - -ro_transaction(Fun) -> - {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun), - Res. - -%%-------------------------------------------------------------------------------- - -export_subscriptions(DSSubs) -> - lists:foldl( - fun(DSSub = #ds_sub{id = {_DSSessionId, TopicFilter}}, Acc) -> - subs_insert(TopicFilter, export_subscription(DSSub), Acc) - end, - subs_new(), - DSSubs - ). - -export_session(#session{} = Record) -> - export_record(Record, #session.id, [id, created_at, last_alive_at, conninfo, props], #{}). - -export_subscription(#ds_sub{} = Record) -> - export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}). - -export_record(Record, I, [Field | Rest], Acc) -> - export_record(Record, I + 1, Rest, Acc#{Field => element(I, Record)}); -export_record(_, _, [], Acc) -> - Acc. - %% TODO: find a more reliable way to perform actions that have side %% effects. Add `CBM:init' callback to the session behavior? -spec ensure_timers(session()) -> session(). @@ -1014,6 +926,11 @@ ensure_timers(Session0) -> Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1), emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2). +-spec inc_send_quota(session()) -> session(). +inc_send_quota(Session = #{inflight := Inflight0}) -> + {_NInflight, Inflight} = emqx_persistent_session_ds_inflight:inc_send_quota(Inflight0), + pull_now(Session#{inflight => Inflight}). + -spec pull_now(session()) -> session(). pull_now(Session) -> emqx_session:reset_timer(?TIMER_PULL, 0, Session). @@ -1029,75 +946,119 @@ receive_maximum(ConnInfo) -> expiry_interval(ConnInfo) -> maps:get(expiry_interval, ConnInfo, 0). +bump_interval() -> + emqx_config:get([session_persistence, last_alive_update_interval]). + +%%-------------------------------------------------------------------- +%% SeqNo tracking +%% -------------------------------------------------------------------- + +-spec commit_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) -> + {ok, emqx_types:message(), session()} | {error, _}. +commit_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> + SeqNo = packet_id_to_seqno(PacketId, S), + case Track of + puback -> + Old = ?committed(?QOS_1), + Next = ?next(?QOS_1); + pubrec -> + Old = ?pubrec, + Next = ?next(?QOS_2); + pubcomp -> + Old = ?committed(?QOS_2), + Next = ?next(?QOS_2) + end, + NextSeqNo = emqx_persistent_session_ds_state:get_seqno(Next, S), + PrevSeqNo = emqx_persistent_session_ds_state:get_seqno(Old, S), + case PrevSeqNo =< SeqNo andalso SeqNo =< NextSeqNo of + true -> + %% TODO: we pass a bogus message into the hook: + Msg = emqx_message:make(SessionId, <<>>, <<>>), + {ok, Msg, Session#{s => emqx_persistent_session_ds_state:put_seqno(Old, SeqNo, S)}}; + false -> + ?SLOG(warning, #{ + msg => "out-of-order_commit", + track => Track, + packet_id => PacketId, + commit_seqno => SeqNo, + prev => PrevSeqNo, + next => NextSeqNo + }), + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} + end. + +%%-------------------------------------------------------------------- +%% Functions for dealing with the sequence number and packet ID +%% generation +%% -------------------------------------------------------------------- + +%% Epoch size = `16#10000 div 2' since we generate different sets of +%% packet IDs for QoS1 and QoS2: +-define(EPOCH_SIZE, 16#8000). + +%% Reconstruct session counter by adding most significant bits from +%% the current counter to the packet id: +-spec packet_id_to_seqno(emqx_types:packet_id(), emqx_persistent_session_ds_state:t()) -> + seqno(). +packet_id_to_seqno(PacketId, S) -> + NextSeqNo = emqx_persistent_session_ds_state:get_seqno(?next(packet_id_to_qos(PacketId)), S), + packet_id_to_seqno_(PacketId, NextSeqNo). + +packet_id_to_seqno_(PacketId, NextSeqNo) -> + Epoch = NextSeqNo bsr 15, + SeqNo = (Epoch bsl 15) + (PacketId bsr 1), + case SeqNo =< NextSeqNo of + true -> + SeqNo; + false -> + SeqNo - ?EPOCH_SIZE + end. + +-spec inc_seqno(?QOS_1 | ?QOS_2, seqno()) -> emqx_types:packet_id(). +inc_seqno(Qos, SeqNo) -> + NextSeqno = SeqNo + 1, + case seqno_to_packet_id(Qos, NextSeqno) of + 0 -> + %% We skip sequence numbers that lead to PacketId = 0 to + %% simplify math. Note: it leads to occasional gaps in the + %% sequence numbers. + NextSeqno + 1; + _ -> + NextSeqno + end. + +%% Note: we use the least significant bit to store the QoS. Even +%% packet IDs are QoS1, odd packet IDs are QoS2. +seqno_to_packet_id(?QOS_1, SeqNo) -> + (SeqNo bsl 1) band 16#ffff; +seqno_to_packet_id(?QOS_2, SeqNo) -> + ((SeqNo bsl 1) band 16#ffff) bor 1. + +packet_id_to_qos(PacketId) -> + case PacketId band 1 of + 0 -> ?QOS_1; + 1 -> ?QOS_2 + end. + +%%-------------------------------------------------------------------- +%% Tests +%%-------------------------------------------------------------------- + -ifdef(TEST). + +%% Warning: the below functions may return out-of-date results because +%% the sessions commit data to mria asynchronously. + list_all_sessions() -> - DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB), - ConnInfo = #{}, - Sessions = lists:filtermap( - fun(SessionID) -> - Sess = session_open(SessionID, ConnInfo), - case Sess of - false -> - false; - _ -> - {true, {SessionID, Sess}} - end - end, - DSSessionIds - ), - maps:from_list(Sessions). - -list_all_subscriptions() -> - DSSubIds = mnesia:dirty_all_keys(?SESSION_SUBSCRIPTIONS_TAB), - Subscriptions = lists:map( - fun(DSSubId) -> - [DSSub] = mnesia:dirty_read(?SESSION_SUBSCRIPTIONS_TAB, DSSubId), - {DSSubId, export_subscription(DSSub)} - end, - DSSubIds - ), - maps:from_list(Subscriptions). - -list_all_streams() -> - DSStreamIds = mnesia:dirty_all_keys(?SESSION_STREAM_TAB), - DSStreams = lists:map( - fun(DSStreamId) -> - Records = mnesia:dirty_read(?SESSION_STREAM_TAB, DSStreamId), - ExtDSStreams = - lists:map( - fun(Record) -> - export_record( - Record, - #ds_stream.session, - [session, topic_filter, stream, rank], - #{} - ) - end, - Records - ), - {DSStreamId, ExtDSStreams} - end, - DSStreamIds - ), - maps:from_list(DSStreams). - -list_all_pubranges() -> - DSPubranges = mnesia:dirty_match_object(?SESSION_PUBRANGE_TAB, #ds_pubrange{_ = '_'}), - lists:foldl( - fun(Record = #ds_pubrange{id = {SessionId, First, StreamRef}}, Acc) -> - Range = #{ - session => SessionId, - stream => StreamRef, - first => First, - until => Record#ds_pubrange.until, - type => Record#ds_pubrange.type, - iterator => Record#ds_pubrange.iterator - }, - maps:put(SessionId, maps:get(SessionId, Acc, []) ++ [Range], Acc) - end, - #{}, - DSPubranges + maps:from_list( + [ + {Id, emqx_persistent_session_ds_state:print_session(Id)} + || Id <- emqx_persistent_session_ds_state:list_sessions() + ] ). -%% ifdef(TEST) +%%%% Proper generators: + +%%%% Unit tests: + -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 31c9b2faf..936b36841 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -25,66 +25,27 @@ -define(SESSION_COMMITTED_OFFSET_TAB, emqx_ds_committed_offset_tab). -define(DS_MRIA_SHARD, emqx_ds_session_shard). --define(T_INFLIGHT, 1). --define(T_CHECKPOINT, 2). - --record(ds_sub, { - id :: emqx_persistent_session_ds:subscription_id(), - start_time :: emqx_ds:time(), - props = #{} :: map(), - extra = #{} :: map() -}). --type ds_sub() :: #ds_sub{}. - --record(ds_stream, { - session :: emqx_persistent_session_ds:id(), - ref :: _StreamRef, - stream :: emqx_ds:stream(), - rank :: emqx_ds:stream_rank(), - beginning :: emqx_ds:iterator() -}). --type ds_stream() :: #ds_stream{}. - --record(ds_pubrange, { - id :: { - %% What session this range belongs to. - _Session :: emqx_persistent_session_ds:id(), - %% Where this range starts. - _First :: emqx_persistent_message_ds_replayer:seqno(), - %% Which stream this range is over. - _StreamRef - }, - %% Where this range ends: the first seqno that is not included in the range. - until :: emqx_persistent_message_ds_replayer:seqno(), - %% Type of a range: - %% * Inflight range is a range of yet unacked messages from this stream. - %% * Checkpoint range was already acked, its purpose is to keep track of the - %% very last iterator for this stream. - type :: ?T_INFLIGHT | ?T_CHECKPOINT, - %% What commit tracks this range is part of. - tracks = 0 :: non_neg_integer(), - %% Meaning of this depends on the type of the range: - %% * For inflight range, this is the iterator pointing to the first message in - %% the range. - %% * For checkpoint range, this is the iterator pointing right past the last - %% message in the range. - iterator :: emqx_ds:iterator(), - %% Reserved for future use. - misc = #{} :: map() -}). --type ds_pubrange() :: #ds_pubrange{}. - --record(ds_committed_offset, { - id :: { - %% What session this marker belongs to. - _Session :: emqx_persistent_session_ds:id(), - %% Marker name. - _CommitType - }, - %% Where this marker is pointing to: the first seqno that is not marked. - until :: emqx_persistent_message_ds_replayer:seqno() +%% State of the stream: +-record(ifs, { + rank_y :: emqx_ds:rank_y(), + %% Iterator at the end of the last batch: + it_end :: emqx_ds:iterator() | undefined | end_of_stream, + %% Size of the last batch: + batch_size :: pos_integer() | undefined, + %% Key that points at the beginning of the batch: + batch_begin_key :: binary() | undefined, + %% Number of messages collected in the last batch: + batch_n_messages :: pos_integer() | undefined, + %% Session sequence number at the time when the batch was fetched: + first_seqno_qos1 :: emqx_persistent_session_ds:seqno() | undefined, + first_seqno_qos2 :: emqx_persistent_session_ds:seqno() | undefined, + %% Sequence numbers that the client must PUBACK or PUBREL + %% before we can consider the batch to be fully replayed: + last_seqno_qos1 :: emqx_persistent_session_ds:seqno() | undefined, + last_seqno_qos2 :: emqx_persistent_session_ds:seqno() | undefined }). +%% TODO: remove -record(session, { %% same as clientid id :: emqx_persistent_session_ds:id(), diff --git a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl new file mode 100644 index 000000000..75f246ec3 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl @@ -0,0 +1,111 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_persistent_session_ds_inflight). + +%% API: +-export([new/1, push/2, pop/1, n_buffered/1, n_inflight/1, inc_send_quota/1, receive_maximum/1]). + +%% behavior callbacks: +-export([]). + +%% internal exports: +-export([]). + +-export_type([t/0]). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-record(inflight, { + queue :: queue:queue(), + receive_maximum :: pos_integer(), + n_inflight = 0 :: non_neg_integer(), + n_qos0 = 0 :: non_neg_integer(), + n_qos1 = 0 :: non_neg_integer(), + n_qos2 = 0 :: non_neg_integer() +}). + +-type t() :: #inflight{}. + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec new(non_neg_integer()) -> t(). +new(ReceiveMaximum) when ReceiveMaximum > 0 -> + #inflight{queue = queue:new(), receive_maximum = ReceiveMaximum}. + +-spec receive_maximum(t()) -> pos_integer(). +receive_maximum(#inflight{receive_maximum = ReceiveMaximum}) -> + ReceiveMaximum. + +-spec push({emqx_types:packet_id() | undefined, emqx_types:message()}, t()) -> t(). +push(Val = {_PacketId, Msg}, Rec) -> + #inflight{queue = Q0, n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2} = Rec, + Q = queue:in(Val, Q0), + case Msg#message.qos of + ?QOS_0 -> + Rec#inflight{queue = Q, n_qos0 = NQos0 + 1}; + ?QOS_1 -> + Rec#inflight{queue = Q, n_qos1 = NQos1 + 1}; + ?QOS_2 -> + Rec#inflight{queue = Q, n_qos2 = NQos2 + 1} + end. + +-spec pop(t()) -> {[{emqx_types:packet_id() | undefined, emqx_types:message()}], t()}. +pop(Inflight = #inflight{receive_maximum = ReceiveMaximum}) -> + do_pop(ReceiveMaximum, Inflight, []). + +-spec n_buffered(t()) -> non_neg_integer(). +n_buffered(#inflight{n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2}) -> + NQos0 + NQos1 + NQos2. + +-spec n_inflight(t()) -> non_neg_integer(). +n_inflight(#inflight{n_inflight = NInflight}) -> + NInflight. + +%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Flow_Control +-spec inc_send_quota(t()) -> {non_neg_integer(), t()}. +inc_send_quota(Rec = #inflight{n_inflight = NInflight0}) -> + NInflight = max(NInflight0 - 1, 0), + {NInflight, Rec#inflight{n_inflight = NInflight}}. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +do_pop(ReceiveMaximum, Rec0 = #inflight{n_inflight = NInflight, queue = Q0}, Acc) -> + case NInflight < ReceiveMaximum andalso queue:out(Q0) of + {{value, Val}, Q} -> + #inflight{n_qos0 = NQos0, n_qos1 = NQos1, n_qos2 = NQos2} = Rec0, + {_PacketId, #message{qos = Qos}} = Val, + Rec = + case Qos of + ?QOS_0 -> + Rec0#inflight{queue = Q, n_qos0 = NQos0 - 1}; + ?QOS_1 -> + Rec0#inflight{queue = Q, n_qos1 = NQos1 - 1, n_inflight = NInflight + 1}; + ?QOS_2 -> + Rec0#inflight{queue = Q, n_qos2 = NQos2 - 1, n_inflight = NInflight + 1} + end, + do_pop(ReceiveMaximum, Rec, [Val | Acc]); + _ -> + {lists:reverse(Acc), Rec0} + end. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 5fd2c2ac9..39fd7eeb7 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ -export([create_tables/0]). --export([open/1, create_new/1, delete/1, commit/1, print_session/1]). +-export([open/1, create_new/1, delete/1, commit/1, print_session/1, list_sessions/0]). -export([get_created_at/1, set_created_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_conninfo/1, set_conninfo/2]). @@ -38,7 +38,7 @@ %% internal exports: -export([]). --export_type([t/0, seqno_type/0]). +-export_type([t/0, subscriptions/0, seqno_type/0, stream_key/0]). -include("emqx_persistent_session_ds.hrl"). @@ -46,12 +46,11 @@ %% Type declarations %%================================================================================ +-type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()). + %% Generic key-value wrapper that is used for exporting arbitrary %% terms to mnesia: --record(kv, { - k :: term(), - v :: map() -}). +-record(kv, {k, v}). %% Persistent map. %% @@ -62,9 +61,9 @@ %% It should be possible to make frequent changes to the pmap without %% stressing Mria. %% -%% It's implemented as two maps: `clean' and `dirty'. Updates are made -%% to the `dirty' area. `pmap_commit' function saves the updated -%% entries to Mnesia and moves them to the `clean' area. +%% It's implemented as three maps: `clean', `dirty' and `tombstones'. +%% Updates are made to the `dirty' area. `pmap_commit' function saves +%% the updated entries to Mnesia and moves them to the `clean' area. -record(pmap, {table, clean, dirty, tombstones}). -type pmap(K, V) :: @@ -87,15 +86,17 @@ ?conninfo => emqx_types:conninfo() }. --type seqno_type() :: next | acked | pubrel. +-type seqno_type() :: term(). + +-type stream_key() :: {emqx_ds:rank_x(), _SubId}. -opaque t() :: #{ id := emqx_persistent_session_ds:id(), dirty := boolean(), metadata := metadata(), - subscriptions := emqx_persistent_session_ds:subscriptions(), + subscriptions := subscriptions(), seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), - streams := pmap(emqx_ds:stream(), emqx_persistent_message_ds_replayer:stream_state()), + streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), ranks := pmap(term(), integer()) }. @@ -104,7 +105,7 @@ -define(stream_tab, emqx_ds_session_streams). -define(seqno_tab, emqx_ds_session_seqnos). -define(rank_tab, emqx_ds_session_ranks). --define(bag_tables, [?stream_tab, ?seqno_tab, ?rank_tab]). +-define(bag_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). %%================================================================================ %% API funcions @@ -125,7 +126,7 @@ create_tables() -> [create_kv_bag_table(Table) || Table <- ?bag_tables], mria:wait_for_tables([?session_tab | ?bag_tables]). --spec open(emqx_persistent_session_ds:session_id()) -> {ok, t()} | undefined. +-spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined. open(SessionId) -> ro_transaction(fun() -> case kv_restore(?session_tab, SessionId) of @@ -150,13 +151,13 @@ print_session(SessionId) -> case open(SessionId) of undefined -> undefined; - #{ + {ok, #{ metadata := Metadata, subscriptions := SubsGBT, streams := Streams, seqnos := Seqnos, ranks := Ranks - } -> + }} -> Subs = emqx_topic_gbt:fold( fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end, #{}, @@ -171,6 +172,10 @@ print_session(SessionId) -> } end. +-spec list_sessions() -> [emqx_persistent_session_ds:id()]. +list_sessions() -> + mnesia:dirty_all_keys(?session_tab). + -spec delete(emqx_persistent_session_ds:id()) -> ok. delete(Id) -> transaction( @@ -187,7 +192,6 @@ commit( Rec = #{ id := SessionId, metadata := Metadata, - subscriptions := Subs, streams := Streams, seqnos := SeqNos, ranks := Ranks @@ -196,10 +200,9 @@ commit( transaction(fun() -> kv_persist(?session_tab, SessionId, Metadata), Rec#{ - subscriptions => pmap_commit(SessionId, Subs), streams => pmap_commit(SessionId, Streams), seqnos => pmap_commit(SessionId, SeqNos), - ranksz => pmap_commit(SessionId, Ranks), + ranks => pmap_commit(SessionId, Ranks), dirty => false } end). @@ -247,18 +250,16 @@ set_conninfo(Val, Rec) -> %% --spec get_stream(emqx_persistent_session_ds:stream(), t()) -> - emqx_persistent_message_ds_replayer:stream_state() | undefined. +-spec get_stream(stream_key(), t()) -> + emqx_persistent_session_ds:stream_state() | undefined. get_stream(Key, Rec) -> gen_get(streams, Key, Rec). --spec put_stream( - emqx_persistent_session_ds:stream(), emqx_persistent_message_ds_replayer:stream_state(), t() -) -> t(). +-spec put_stream(stream_key(), emqx_persistent_session_ds:stream_state(), t()) -> t(). put_stream(Key, Val, Rec) -> gen_put(streams, Key, Val, Rec). --spec del_stream(emqx_persistent_session_ds:stream(), t()) -> t(). +-spec del_stream(stream_key(), t()) -> t(). del_stream(Key, Rec) -> gen_del(stream, Key, Rec). @@ -296,12 +297,12 @@ fold_ranks(Fun, Acc, Rec) -> %% --spec get_subscriptions(t()) -> emqx_persistent_session_ds:subscriptions(). +-spec get_subscriptions(t()) -> subscriptions(). get_subscriptions(#{subscriptions := Subs}) -> Subs. -spec put_subscription( - emqx_persistent_session_ds:subscription_id(), + emqx_persistent_session_ds:topic_filter(), _SubId, emqx_persistent_session_ds:subscription(), t() @@ -474,7 +475,7 @@ kv_bag_persist(Tab, SessionId, Key, Val0) -> kv_bag_delete(Tab, SessionId, Key), %% Write data to mnesia: Val = encoder(encode, Tab, Val0), - mnesia:write(Tab, #kv{k = SessionId, v = {Key, Val}}). + mnesia:write(Tab, #kv{k = SessionId, v = {Key, Val}}, write). kv_bag_restore(Tab, SessionId) -> [{K, encoder(decode, Tab, V)} || #kv{v = {K, V}} <- mnesia:read(Tab, SessionId)]. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 77340ca87..e26475855 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1810,7 +1810,7 @@ fields("session_persistence") -> sc( pos_integer(), #{ - default => 1000, + default => 100, desc => ?DESC(session_ds_max_batch_size) } )}, diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index c08109fe8..fa7441b11 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -409,12 +409,8 @@ enrich_delivers(ClientInfo, Delivers, Session) -> enrich_delivers(_ClientInfo, [], _UpgradeQoS, _Session) -> []; enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) -> - case enrich_deliver(ClientInfo, D, UpgradeQoS, Session) of - [] -> - enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session); - Msg -> - [Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)] - end. + enrich_deliver(ClientInfo, D, UpgradeQoS, Session) ++ + enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session). enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> SubOpts = @@ -435,13 +431,15 @@ enrich_message( _ = emqx_session_events:handle_event(ClientInfo, {dropped, Msg, no_local}), []; enrich_message(_ClientInfo, MsgIn, SubOpts = #{}, UpgradeQoS) -> - maps:fold( - fun(SubOpt, V, Msg) -> enrich_subopts(SubOpt, V, Msg, UpgradeQoS) end, - MsgIn, - SubOpts - ); + [ + maps:fold( + fun(SubOpt, V, Msg) -> enrich_subopts(SubOpt, V, Msg, UpgradeQoS) end, + MsgIn, + SubOpts + ) + ]; enrich_message(_ClientInfo, Msg, undefined, _UpgradeQoS) -> - Msg. + [Msg]. enrich_subopts(nl, 1, Msg, _) -> emqx_message:set_flag(nl, Msg); diff --git a/apps/emqx/src/emqx_topic_gbt.erl b/apps/emqx/src/emqx_topic_gbt.erl index 6e9e7d2fc..b399903f4 100644 --- a/apps/emqx/src/emqx_topic_gbt.erl +++ b/apps/emqx/src/emqx_topic_gbt.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -39,11 +39,11 @@ -type match(ID) :: key(ID). -opaque t(ID, Value) :: gb_trees:tree(key(ID), Value). --opaque t() :: t(_ID, _Value). +-type t() :: t(_ID, _Value). %% @doc Create a new gb_tree and store it in the persitent_term with the %% given name. --spec new() -> t(). +-spec new() -> t(_ID, _Value). new() -> gb_trees:empty(). @@ -54,19 +54,19 @@ size(Gbt) -> %% @doc Insert a new entry into the index that associates given topic filter to given %% record ID, and attaches arbitrary record to the entry. This allows users to choose %% between regular and "materialized" indexes, for example. --spec insert(emqx_types:topic() | words(), _ID, _Record, t()) -> t(). +-spec insert(emqx_types:topic() | words(), ID, Record, t(ID, Record)) -> t(ID, Record). insert(Filter, ID, Record, Gbt) -> Key = key(Filter, ID), gb_trees:enter(Key, Record, Gbt). %% @doc Delete an entry from the index that associates given topic filter to given %% record ID. Deleting non-existing entry is not an error. --spec delete(emqx_types:topic() | words(), _ID, t()) -> t(). +-spec delete(emqx_types:topic() | words(), ID, t(ID, Record)) -> t(ID, Record). delete(Filter, ID, Gbt) -> Key = key(Filter, ID), gb_trees:delete_any(Key, Gbt). --spec lookup(emqx_types:topic() | words(), _ID, t(), Default) -> _Record | Default. +-spec lookup(emqx_types:topic() | words(), ID, t(ID, Record), Default) -> Record | Default. lookup(Filter, ID, Gbt, Default) -> Key = key(Filter, ID), case gb_trees:lookup(Key, Gbt) of @@ -76,7 +76,7 @@ lookup(Filter, ID, Gbt, Default) -> Default end. --spec fold(fun((key(_ID), _Record, Acc) -> Acc), Acc, t()) -> Acc. +-spec fold(fun((key(ID), Record, Acc) -> Acc), Acc, t(ID, Record)) -> Acc. fold(Fun, Acc, Gbt) -> Iter = gb_trees:iterator(Gbt), fold_iter(Fun, Acc, Iter). @@ -91,13 +91,13 @@ fold_iter(Fun, Acc, Iter) -> %% @doc Match given topic against the index and return the first match, or `false` if %% no match is found. --spec match(emqx_types:topic(), t()) -> match(_ID) | false. +-spec match(emqx_types:topic(), t(ID, _Record)) -> match(ID) | false. match(Topic, Gbt) -> emqx_trie_search:match(Topic, make_nextf(Gbt)). %% @doc Match given topic against the index and return _all_ matches. %% If `unique` option is given, return only unique matches by record ID. --spec matches(emqx_types:topic(), t(), emqx_trie_search:opts()) -> [match(_ID)]. +-spec matches(emqx_types:topic(), t(ID, _Record), emqx_trie_search:opts()) -> [match(ID)]. matches(Topic, Gbt, Opts) -> emqx_trie_search:matches(Topic, make_nextf(Gbt), Opts). @@ -112,7 +112,7 @@ get_topic(Key) -> emqx_trie_search:get_topic(Key). %% @doc Fetch the record associated with the match. --spec get_record(match(_ID), t()) -> _Record. +-spec get_record(match(ID), t(ID, Record)) -> Record. get_record(Key, Gbt) -> gb_trees:get(Key, Gbt). diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index f2a42332e..64cd9c6a8 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -36,7 +36,7 @@ all() -> % NOTE % Tests are disabled while existing session persistence impl is being % phased out. - {group, persistence_disabled}, + %{group, persistence_disabled}, {group, persistence_enabled} ]. @@ -71,7 +71,11 @@ init_per_group(persistence_disabled, Config) -> ]; init_per_group(persistence_enabled, Config) -> [ - {emqx_config, "session_persistence { enable = true }"}, + {emqx_config, + "session_persistence {\n" + " enable = true\n" + " renew_streams_interval = 100ms\n" + "}"}, {persistence, ds} | Config ]; diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 04f19b95f..23f69a81b 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -594,7 +594,7 @@ fields("node") -> sc( hoconsc:enum([gen_rpc, distr]), #{ - mapping => "mria.shard_transport", + mapping => "mria.shardp_transport", importance => ?IMPORTANCE_HIDDEN, default => distr, desc => ?DESC(db_default_shard_transport) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 1402a19e3..4e408ed80 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -47,6 +47,8 @@ topic_filter/0, topic/0, stream/0, + rank_x/0, + rank_y/0, stream_rank/0, iterator/0, iterator_id/0, @@ -77,7 +79,11 @@ %% Parsed topic filter. -type topic_filter() :: list(binary() | '+' | '#' | ''). --type stream_rank() :: {term(), integer()}. +-type rank_x() :: term(). + +-type rank_y() :: integer(). + +-type stream_rank() :: {rank_x(), rank_y()}. %% TODO: Not implemented -type iterator_id() :: term(). diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index ffe932449..2d53886e3 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.