From 978a3bfef37cd1e9b3438bff13e8836db2bb3643 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 6 Jan 2024 04:33:10 +0100 Subject: [PATCH] refactor(sessds): Simplify representation of QoS tracks --- apps/emqx/src/emqx_persistent_session_ds.erl | 12 +-- apps/emqx/src/emqx_persistent_session_ds.hrl | 18 +++-- .../src/emqx_persistent_session_ds_state.erl | 78 +++++++++---------- ...persistent_session_ds_stream_scheduler.erl | 24 +++++- .../test/emqx_persistent_session_SUITE.erl | 13 +++- 5 files changed, 87 insertions(+), 58 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 15f214e03..145d6ccbf 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -290,7 +290,7 @@ subscribe( %% router and iterator information can be reconstructed %% from this table, if needed. ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID), - {SubId, S1} = emqx_persistent_session_ds_state:new_subid(S0), + {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), Subscription = #{ start_time => now_ms(), props => SubOpts, @@ -314,10 +314,10 @@ unsubscribe( TopicFilter, Session = #{id := ID, s := S0} ) -> - %% TODO: drop streams and messages from the buffer case subs_lookup(TopicFilter, S0) of - #{props := SubOpts, id := _SubId} -> - S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), + #{props := SubOpts, id := SubId} -> + S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), + S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1), ?tp_span( persistent_session_ds_subscription_route_delete, #{session_id => ID}, @@ -662,11 +662,13 @@ enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, Cli case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It0, BatchSize) of {ok, It, []} -> %% No new messages; just update the end iterator: + logger:warning(#{msg => "batch_empty"}), {Ifs0#ifs{it_end = It}, Inflight0}; {ok, end_of_stream} -> %% No new messages; just update the end iterator: {Ifs0#ifs{it_end = end_of_stream}, Inflight0}; - {ok, It, Messages} -> + {ok, It, [{K, _} | _] = Messages} -> + logger:warning(#{msg => "batch", it => K, msgs => length(Messages)}), {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch( IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0 ), diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index d8556c8c9..43e8b1cf8 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -34,15 +34,19 @@ %% Seqno becomes committed after receiving PUBACK for QoS1 or PUBCOMP %% for QoS2. --define(committed(QOS), {0, QOS}). +-define(committed(QOS), QOS). %% Seqno becomes dup: %% -%% 1. After broker sends QoS1 message to the client -%% 2. After it receives PUBREC from the client for the QoS2 message --define(dup(QOS), {1, QOS}). -%% Last seqno assigned to some message (that may reside in the -%% mqueue): --define(next(QOS), {2, QOS}). +%% 1. After broker sends QoS1 message to the client. Upon session +%% reconnect, QoS1 messages with seqno in the committed..dup range are +%% retransmitted with DUP flag. +%% +%% 2. After it receives PUBREC from the client for the QoS2 message. +%% Upon session reconnect, PUBREL for QoS2 messages with seqno in +%% committed..dup are retransmitted. +-define(dup(QOS), (10 + QOS)). +%% Last seqno assigned to a message. +-define(next(QOS), (20 + QOS)). %%%%% State of the stream: -record(ifs, { diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 8f7cb5ca0..a1147aec5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -30,7 +30,7 @@ -export([get_created_at/1, set_created_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_conninfo/1, set_conninfo/2]). --export([new_subid/1]). +-export([new_id/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). -export([get_seqno/2, put_seqno/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). @@ -43,6 +43,7 @@ -include("emqx_mqtt.hrl"). -include("emqx_persistent_session_ds.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ %% Type declarations @@ -79,14 +80,15 @@ -define(created_at, created_at). -define(last_alive_at, last_alive_at). -define(conninfo, conninfo). --define(last_subid, last_subid). +%% Unique integer used to create unique identities +-define(last_id, last_id). -type metadata() :: #{ ?created_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(), ?conninfo => emqx_types:conninfo(), - ?last_subid => integer() + ?last_id => integer() }. -type seqno_type() :: @@ -112,7 +114,7 @@ -define(stream_tab, emqx_ds_session_streams). -define(seqno_tab, emqx_ds_session_seqnos). -define(rank_tab, emqx_ds_session_ranks). --define(bag_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). +-define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). %%================================================================================ %% API funcions @@ -130,8 +132,8 @@ create_tables() -> {attributes, record_info(fields, kv)} ] ), - [create_kv_bag_table(Table) || Table <- ?bag_tables], - mria:wait_for_tables([?session_tab | ?bag_tables]). + [create_kv_pmap_table(Table) || Table <- ?pmap_tables], + mria:wait_for_tables([?session_tab | ?pmap_tables]). -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined. open(SessionId) -> @@ -191,7 +193,7 @@ list_sessions() -> delete(Id) -> transaction( fun() -> - [kv_delete(Table, Id) || Table <- ?bag_tables], + [kv_pmap_delete(Table, Id) || Table <- ?pmap_tables], mnesia:delete(?session_tab, Id, write) end ). @@ -259,14 +261,14 @@ get_conninfo(Rec) -> set_conninfo(Val, Rec) -> set_meta(?conninfo, Val, Rec). --spec new_subid(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. -new_subid(Rec) -> - LastSubId = - case get_meta(?last_subid, Rec) of +-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. +new_id(Rec) -> + LastId = + case get_meta(?last_id, Rec) of undefined -> 0; N when is_integer(N) -> N end, - {LastSubId, set_meta(?last_subid, LastSubId + 1, Rec)}. + {LastId, set_meta(?last_id, LastId + 1, Rec)}. %% @@ -283,7 +285,7 @@ get_subscriptions(#{subscriptions := Subs}) -> put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) -> %% Note: currently changes to the subscriptions are persisted immediately. Key = {TopicFilter, SubId}, - transaction(fun() -> kv_bag_persist(?subscription_tab, Id, Key, Subscription) end), + transaction(fun() -> kv_pmap_persist(?subscription_tab, Id, Key, Subscription) end), Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0), Rec#{subscriptions => Subs}. @@ -291,13 +293,13 @@ put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptio del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) -> %% Note: currently the subscriptions are persisted immediately. Key = {TopicFilter, SubId}, - transaction(fun() -> kv_bag_delete(?subscription_tab, Id, Key) end), + transaction(fun() -> kv_pmap_delete(?subscription_tab, Id, Key) end), Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0), Rec#{subscriptions => Subs}. %% --type stream_key() :: {emqx_persistent_session_ds:subscription_id(), emqx_ds:stream()}. +-type stream_key() :: {emqx_persistent_session_ds:subscription_id(), binary()}. -spec get_stream(stream_key(), t()) -> emqx_persistent_session_ds:stream_state() | undefined. @@ -390,7 +392,7 @@ gen_del(Field, Key, Rec) -> %% read_subscriptions(SessionId) -> - Records = kv_bag_restore(?subscription_tab, SessionId), + Records = kv_pmap_restore(?subscription_tab, SessionId), lists:foldl( fun({{TopicFilter, SubId}, Subscription}, Acc) -> emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Acc) @@ -405,7 +407,7 @@ read_subscriptions(SessionId) -> %% This functtion should be ran in a transaction. -spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V). pmap_open(Table, SessionId) -> - Clean = maps:from_list(kv_bag_restore(Table, SessionId)), + Clean = maps:from_list(kv_pmap_restore(Table, SessionId)), #pmap{ table = Table, cache = Clean, @@ -444,10 +446,10 @@ pmap_commit( maps:foreach( fun (K, del) -> - kv_bag_delete(Tab, SessionId, K); + kv_pmap_delete(Tab, SessionId, K); (K, dirty) -> V = maps:get(K, Cache), - kv_bag_persist(Tab, SessionId, K, V) + kv_pmap_persist(Tab, SessionId, K, V) end, Dirty ), @@ -465,47 +467,43 @@ kv_persist(Tab, SessionId, Val0) -> Val = encoder(encode, Tab, Val0), mnesia:write(Tab, #kv{k = SessionId, v = Val}, write). -kv_delete(Table, Namespace) -> - mnesia:delete({Table, Namespace}). - kv_restore(Tab, SessionId) -> [encoder(decode, Tab, V) || #kv{v = V} <- mnesia:read(Tab, SessionId)]. %% Functions dealing with bags: %% @doc Create a mnesia table for the PMAP: --spec create_kv_bag_table(atom()) -> ok. -create_kv_bag_table(Table) -> +-spec create_kv_pmap_table(atom()) -> ok. +create_kv_pmap_table(Table) -> mria:create_table(Table, [ - {type, bag}, + {type, ordered_set}, {rlog_shard, ?DS_MRIA_SHARD}, {storage, rocksdb_copies}, {record_name, kv}, {attributes, record_info(fields, kv)} ]). -kv_bag_persist(Tab, SessionId, Key, Val0) -> - %% Remove the previous entry corresponding to the key: - kv_bag_delete(Tab, SessionId, Key), +kv_pmap_persist(Tab, SessionId, Key, Val0) -> %% Write data to mnesia: Val = encoder(encode, Tab, Val0), - mnesia:write(Tab, #kv{k = SessionId, v = {Key, Val}}, write). + mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write). -kv_bag_restore(Tab, SessionId) -> - [{K, encoder(decode, Tab, V)} || #kv{v = {K, V}} <- mnesia:read(Tab, SessionId)]. +kv_pmap_restore(Table, SessionId) -> + MS = [{#kv{k = {SessionId, '_'}, _ = '_'}, [], ['$_']}], + Objs = mnesia:select(Table, MS, read), + [{K, encoder(decode, Table, V)} || #kv{k = {_, K}, v = V} <- Objs]. -kv_bag_delete(Table, SessionId, Key) -> +kv_pmap_delete(Table, SessionId) -> + MS = [{#kv{k = {SessionId, '$1'}, _ = '_'}, [], ['$1']}], + Keys = mnesia:select(Table, MS, read), + [mnesia:delete(Table, {SessionId, K}, write) || K <- Keys], + ok. + +kv_pmap_delete(Table, SessionId, Key) -> %% Note: this match spec uses a fixed primary key, so it doesn't %% require a table scan, and the transaction doesn't grab the %% whole table lock: - MS = [{#kv{k = SessionId, v = {Key, '_'}}, [], ['$_']}], - Objs = mnesia:select(Table, MS, write), - lists:foreach( - fun(Obj) -> - mnesia:delete_object(Table, Obj, write) - end, - Objs - ). + mnesia:delete(Table, {SessionId, Key}, write). %% diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index d572609e1..091b815d4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -17,7 +17,7 @@ %% API: -export([find_new_streams/1, find_replay_streams/1]). --export([renew_streams/1]). +-export([renew_streams/1, del_subscription/2]). %% behavior callbacks: -export([]). @@ -113,12 +113,31 @@ renew_streams(S0) -> emqx_persistent_session_ds_state:get_subscriptions(S1) ). +-spec del_subscription( + emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t() +) -> + emqx_persistent_session_ds_state:t(). +del_subscription(SubId, S0) -> + emqx_persistent_session_ds_state:fold_streams( + fun(Key, _, Acc) -> + case Key of + {SubId, _Stream} -> + emqx_persistent_session_ds_state:del_stream(Key, Acc); + _ -> + Acc + end + end, + S0, + S0 + ). + %%================================================================================ %% Internal functions %%================================================================================ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> - Key = {SubId, Stream}, + %% TODO: use next_id to enumerate streams + Key = {SubId, term_to_binary(Stream)}, case emqx_persistent_session_ds_state:get_stream(Key, S) of undefined -> {ok, Iterator} = emqx_ds:make_iterator( @@ -127,6 +146,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> NewStreamState = #ifs{ rank_x = RankX, rank_y = RankY, + it_begin = Iterator, it_end = Iterator }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 03f513684..3b9cb33cb 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -54,7 +54,8 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), TCsNonGeneric = [t_choose_impl], - TCGroups = [{group, tcp}, {group, quic}, {group, ws}], + % {group, quic}, {group, ws}], + TCGroups = [{group, tcp}], [ %% {persistence_disabled, TCGroups}, {persistence_enabled, TCGroups}, @@ -694,6 +695,9 @@ t_publish_many_while_client_is_gone_qos1(Config) -> ok = publish_many(Pubs2), NPubs2 = length(Pubs2), + _ = receive_messages(NPubs1, 2000), + [] = receive_messages(NPubs1, 2000), + debug_info(ClientId), {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, {clientid, ClientId}, @@ -702,12 +706,14 @@ t_publish_many_while_client_is_gone_qos1(Config) -> {auto_ack, false} | Config ]), + {ok, _} = emqtt:ConnFun(Client2), %% Try to receive _at most_ `NPubs` messages. %% There shouldn't be that much unacked messages in the replay anyway, %% but it's an easy number to pick. NPubs = NPubs1 + NPubs2, + Msgs2 = receive_messages(NPubs, _Timeout = 2000), NMsgs2 = length(Msgs2), @@ -1086,7 +1092,6 @@ skip_ds_tc(Config) -> Config end. -throw_with_debug_info(Error, ClientId) -> +debug_info(ClientId) -> Info = emqx_persistent_session_ds:print_session(ClientId), - ct:pal("!!! Assertion failed: ~p~nState:~n~p", [Error, Info]), - exit(Error). + ct:pal("*** State:~n~p", [Info]).