fix(sessds): Delay unsubscribe until full ack of in-flight messages
This commit is contained in:
parent
a9c55f7568
commit
30eb54e86b
|
|
@ -62,7 +62,10 @@
|
||||||
first_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(),
|
first_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(),
|
||||||
%% Sequence numbers that have to be committed for the batch:
|
%% Sequence numbers that have to be committed for the batch:
|
||||||
last_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(),
|
last_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(),
|
||||||
last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno()
|
last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(),
|
||||||
|
%% This stream belongs to an unsubscribed topic-filter, and is
|
||||||
|
%% marked for deletion:
|
||||||
|
unsubscribed = false :: boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% Session metadata keys:
|
%% Session metadata keys:
|
||||||
|
|
|
||||||
|
|
@ -93,7 +93,7 @@ find_new_streams(S) ->
|
||||||
(_Key, #srs{it_end = end_of_stream}, Acc) ->
|
(_Key, #srs{it_end = end_of_stream}, Acc) ->
|
||||||
Acc;
|
Acc;
|
||||||
(Key, Stream, Acc) ->
|
(Key, Stream, Acc) ->
|
||||||
case is_fully_acked(Comm1, Comm2, Stream) of
|
case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of
|
||||||
true ->
|
true ->
|
||||||
[{Key, Stream} | Acc];
|
[{Key, Stream} | Acc];
|
||||||
false ->
|
false ->
|
||||||
|
|
@ -124,25 +124,26 @@ find_new_streams(S) ->
|
||||||
%% This way, messages from the same topic/shard are never reordered.
|
%% This way, messages from the same topic/shard are never reordered.
|
||||||
-spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
|
-spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
|
||||||
renew_streams(S0) ->
|
renew_streams(S0) ->
|
||||||
S1 = remove_fully_replayed_streams(S0),
|
S1 = remove_unsubscribed_streams(S0),
|
||||||
|
S2 = remove_fully_replayed_streams(S1),
|
||||||
emqx_topic_gbt:fold(
|
emqx_topic_gbt:fold(
|
||||||
fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, S2) ->
|
fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, Acc) ->
|
||||||
TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)),
|
TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)),
|
||||||
Streams = select_streams(
|
Streams = select_streams(
|
||||||
SubId,
|
SubId,
|
||||||
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
|
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
|
||||||
S2
|
Acc
|
||||||
),
|
),
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(I, Acc) ->
|
fun(I, Acc1) ->
|
||||||
ensure_iterator(TopicFilter, StartTime, SubId, I, Acc)
|
ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1)
|
||||||
end,
|
end,
|
||||||
S2,
|
Acc,
|
||||||
Streams
|
Streams
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
S1,
|
S2,
|
||||||
emqx_persistent_session_ds_state:get_subscriptions(S1)
|
emqx_persistent_session_ds_state:get_subscriptions(S2)
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec del_subscription(
|
-spec del_subscription(
|
||||||
|
|
@ -150,11 +151,33 @@ renew_streams(S0) ->
|
||||||
) ->
|
) ->
|
||||||
emqx_persistent_session_ds_state:t().
|
emqx_persistent_session_ds_state:t().
|
||||||
del_subscription(SubId, S0) ->
|
del_subscription(SubId, S0) ->
|
||||||
|
%% NOTE: this function only marks the streams for deletion,
|
||||||
|
%% instead of outright deleting them.
|
||||||
|
%%
|
||||||
|
%% It's done for two reasons:
|
||||||
|
%%
|
||||||
|
%% - MQTT standard states that the broker MUST process acks for
|
||||||
|
%% all sent messages, and it MAY keep on sending buffered
|
||||||
|
%% messages:
|
||||||
|
%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901186
|
||||||
|
%%
|
||||||
|
%% - Deleting the streams may lead to gaps in the sequence number
|
||||||
|
%% series, and lead to problems with acknowledgement tracking, we
|
||||||
|
%% avoid that by delaying the deletion.
|
||||||
|
%%
|
||||||
|
%% When the stream is marked for deletion, the session won't fetch
|
||||||
|
%% _new_ batches from it. Actual deletion is done by
|
||||||
|
%% `renew_streams', when it detects that all in-flight messages
|
||||||
|
%% from the stream have been acked by the client.
|
||||||
emqx_persistent_session_ds_state:fold_streams(
|
emqx_persistent_session_ds_state:fold_streams(
|
||||||
fun(Key, _, Acc) ->
|
fun(Key, ReplayState, Acc) ->
|
||||||
case Key of
|
case Key of
|
||||||
{SubId, _Stream} ->
|
{SubId, _Stream} ->
|
||||||
emqx_persistent_session_ds_state:del_stream(Key, Acc);
|
%% This stream belongs to a deleted subscription.
|
||||||
|
%% Mark for deletion:
|
||||||
|
emqx_persistent_session_ds_state:put_stream(
|
||||||
|
Key, ReplayState#srs{unsubscribed = true}, Acc
|
||||||
|
);
|
||||||
_ ->
|
_ ->
|
||||||
Acc
|
Acc
|
||||||
end
|
end
|
||||||
|
|
@ -184,6 +207,10 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
|
||||||
it_end = Iterator
|
it_end = Iterator
|
||||||
},
|
},
|
||||||
emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
|
emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
|
||||||
|
SRS = #srs{unsubscribed = true} ->
|
||||||
|
%% The session resubscribed to the stream after
|
||||||
|
%% unsubscribing. Spare the stream:
|
||||||
|
emqx_persistent_session_ds_state:put_stream(Key, SRS#srs{unsubscribed = false}, S);
|
||||||
#srs{} ->
|
#srs{} ->
|
||||||
S
|
S
|
||||||
end.
|
end.
|
||||||
|
|
@ -222,6 +249,27 @@ select_streams(SubId, RankX, Streams0, S) ->
|
||||||
lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams)
|
lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Remove fully acked streams for the deleted subscriptions.
|
||||||
|
-spec remove_unsubscribed_streams(emqx_persistent_session_ds_state:t()) ->
|
||||||
|
emqx_persistent_session_ds_state:t().
|
||||||
|
remove_unsubscribed_streams(S0) ->
|
||||||
|
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0),
|
||||||
|
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0),
|
||||||
|
emqx_persistent_session_ds_state:fold_streams(
|
||||||
|
fun(Key, ReplayState, S1) ->
|
||||||
|
case
|
||||||
|
ReplayState#srs.unsubscribed andalso is_fully_acked(CommQos1, CommQos2, ReplayState)
|
||||||
|
of
|
||||||
|
true ->
|
||||||
|
emqx_persistent_session_ds_state:del_stream(Key, S1);
|
||||||
|
false ->
|
||||||
|
S1
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
S0,
|
||||||
|
S0
|
||||||
|
).
|
||||||
|
|
||||||
%% @doc Advance RankY for each RankX that doesn't have any unreplayed
|
%% @doc Advance RankY for each RankX that doesn't have any unreplayed
|
||||||
%% streams.
|
%% streams.
|
||||||
%%
|
%%
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue