diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index bc1d66ca2..664ec5803 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -23,6 +23,7 @@ -define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(CM_SHARD, emqx_cm_shard). -define(ROUTE_SHARD, route_shard). +-define(PS_ROUTER_SHARD, persistent_session_router_shard). %% Banner %%-------------------------------------------------------------------- diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl index d721df5ed..264cbde14 100644 --- a/apps/emqx/integration_test/emqx_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_ds_SUITE.erl @@ -14,6 +14,8 @@ -define(DS_SHARD, <<"local">>). -define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). +-import(emqx_common_test_helpers, [on_exit/1]). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -56,9 +58,11 @@ end_per_testcase(TestCase, Config) when TestCase =:= t_session_unsubscription_idempotency -> Nodes = ?config(nodes, Config), + emqx_common_test_helpers:call_janitor(60_000), ok = emqx_cth_cluster:stop(Nodes), ok; end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(60_000), ok. %%------------------------------------------------------------------------------ @@ -87,9 +91,6 @@ app_specs() -> [ emqx_durable_storage, {emqx, #{ - before_start => fun() -> - emqx_app:set_config_loader(?MODULE) - end, config => #{persistent_session_store => #{ds => true}}, override_env => [{boot_modules, [broker, listeners]}] }} @@ -124,10 +125,56 @@ wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) -> false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port) ). +start_client(Opts0 = #{}) -> + Defaults = #{ + proto_ver => v5, + properties => #{'Session-Expiry-Interval' => 300} + }, + Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), + {ok, Client} = emqtt:start_link(Opts), + on_exit(fun() -> catch emqtt:stop(Client) end), + Client. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ +t_non_persistent_session_subscription(_Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + SubTopicFilter = <<"t/#">>, + ?check_trace( + begin + ?tp(notice, "starting", #{}), + Client = start_client(#{ + clientid => ClientId, + properties => #{'Session-Expiry-Interval' => 0} + }), + {ok, _} = emqtt:connect(Client), + ?tp(notice, "subscribing", #{}), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2), + IteratorRefs = get_all_iterator_refs(node()), + IteratorIds = get_all_iterator_ids(node()), + + ok = emqtt:stop(Client), + + #{ + iterator_refs => IteratorRefs, + iterator_ids => IteratorIds + } + end, + fun(Res, Trace) -> + ct:pal("trace:\n ~p", [Trace]), + #{ + iterator_refs := IteratorRefs, + iterator_ids := IteratorIds + } = Res, + ?assertEqual([], IteratorRefs), + ?assertEqual({ok, []}, IteratorIds), + ok + end + ), + ok. + t_session_subscription_idempotency(Config) -> [Node1Spec | _] = ?config(node_specs, Config), [Node1] = ?config(nodes, Config), @@ -151,7 +198,7 @@ t_session_subscription_idempotency(Config) -> spawn_link(fun() -> ?tp(will_restart_node, #{}), - ct:pal("restarting node ~p", [Node1]), + ?tp(notice, "restarting node", #{node => Node1}), true = monitor_node(Node1, true), ok = erpc:call(Node1, init, restart, []), receive @@ -160,10 +207,10 @@ t_session_subscription_idempotency(Config) -> after 10_000 -> ct:fail("node ~p didn't stop", [Node1]) end, - ct:pal("waiting for nodeup ~p", [Node1]), + ?tp(notice, "waiting for nodeup", #{node => Node1}), wait_nodeup(Node1), wait_gen_rpc_down(Node1Spec), - ct:pal("restarting apps on ~p", [Node1]), + ?tp(notice, "restarting apps", #{node => Node1}), Apps = maps:get(apps, Node1Spec), ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]), _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]), @@ -171,19 +218,15 @@ t_session_subscription_idempotency(Config) -> %% end.... ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec), ok = snabbkaffe:forward_trace(Node1), - ct:pal("node ~p restarted", [Node1]), + ?tp(notice, "node restarted", #{node => Node1}), ?tp(restarted_node, #{}), ok end), - ct:pal("starting 1"), - {ok, Client0} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), + ?tp(notice, "starting 1", #{}), + Client0 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client0), - ct:pal("subscribing 1"), + ?tp(notice, "subscribing 1", #{}), process_flag(trap_exit, true), catch emqtt:subscribe(Client0, SubTopicFilter, qos2), receive @@ -194,14 +237,10 @@ t_session_subscription_idempotency(Config) -> process_flag(trap_exit, false), {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000), - ct:pal("starting 2"), - {ok, Client1} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), + ?tp(notice, "starting 2", #{}), + Client1 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client1), - ct:pal("subscribing 2"), + ?tp(notice, "subscribing 2", #{}), {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), ok = emqtt:stop(Client1), @@ -247,7 +286,7 @@ t_session_unsubscription_idempotency(Config) -> spawn_link(fun() -> ?tp(will_restart_node, #{}), - ct:pal("restarting node ~p", [Node1]), + ?tp(notice, "restarting node", #{node => Node1}), true = monitor_node(Node1, true), ok = erpc:call(Node1, init, restart, []), receive @@ -256,10 +295,10 @@ t_session_unsubscription_idempotency(Config) -> after 10_000 -> ct:fail("node ~p didn't stop", [Node1]) end, - ct:pal("waiting for nodeup ~p", [Node1]), + ?tp(notice, "waiting for nodeup", #{node => Node1}), wait_nodeup(Node1), wait_gen_rpc_down(Node1Spec), - ct:pal("restarting apps on ~p", [Node1]), + ?tp(notice, "restarting apps", #{node => Node1}), Apps = maps:get(apps, Node1Spec), ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]), _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]), @@ -267,21 +306,17 @@ t_session_unsubscription_idempotency(Config) -> %% end.... ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec), ok = snabbkaffe:forward_trace(Node1), - ct:pal("node ~p restarted", [Node1]), + ?tp(notice, "node restarted", #{node => Node1}), ?tp(restarted_node, #{}), ok end), - ct:pal("starting 1"), - {ok, Client0} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), + ?tp(notice, "starting 1", #{}), + Client0 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client0), - ct:pal("subscribing 1"), + ?tp(notice, "subscribing 1", #{}), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2), - ct:pal("unsubscribing 1"), + ?tp(notice, "unsubscribing 1", #{}), process_flag(trap_exit, true), catch emqtt:unsubscribe(Client0, SubTopicFilter), receive @@ -292,16 +327,12 @@ t_session_unsubscription_idempotency(Config) -> process_flag(trap_exit, false), {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000), - ct:pal("starting 2"), - {ok, Client1} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), + ?tp(notice, "starting 2", #{}), + Client1 = start_client(#{port => Port, clientid => ClientId}), {ok, _} = emqtt:connect(Client1), - ct:pal("subscribing 2"), + ?tp(notice, "subscribing 2", #{}), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), - ct:pal("unsubscribing 2"), + ?tp(notice, "unsubscribing 2", #{}), {{ok, _, [?RC_SUCCESS]}, {ok, _}} = ?wait_async_action( emqtt:unsubscribe(Client1, SubTopicFilter), diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 4aa9175cd..e2984e30b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -16,6 +16,7 @@ -module(emqx_persistent_session_ds). +-include("emqx.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([init/0]). @@ -56,11 +57,13 @@ %% init() -> - ?WHEN_ENABLED( + ?WHEN_ENABLED(begin ok = emqx_ds:ensure_shard(?DS_SHARD, #{ dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD]) - }) - ). + }), + ok = emqx_persistent_session_ds_router:init_tables(), + ok + end). %% @@ -71,8 +74,8 @@ persist_message(Msg) -> case needs_persistence(Msg) andalso find_subscribers(Msg) of [_ | _] -> store_message(Msg); - % [] -> - % {skipped, no_subscribers}; + [] -> + {skipped, no_subscribers}; false -> {skipped, needs_no_persistence} end @@ -87,8 +90,8 @@ store_message(Msg) -> Topic = emqx_topic:words(emqx_message:topic(Msg)), emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)). -find_subscribers(_Msg) -> - [node()]. +find_subscribers(#message{topic = Topic}) -> + emqx_persistent_session_ds_router:match_routes(Topic). open_session(ClientID) -> ?WHEN_ENABLED(emqx_ds:session_open(ClientID)). @@ -98,6 +101,7 @@ open_session(ClientID) -> add_subscription(TopicFilterBin, DSSessionID) -> ?WHEN_ENABLED( begin + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID), TopicFilter = emqx_topic:words(TopicFilterBin), {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( DSSessionID, TopicFilter @@ -160,7 +164,9 @@ del_subscription(TopicFilterBin, DSSessionID) -> persistent_session_ds_iterator_delete, #{}, emqx_ds:session_del_iterator(DSSessionID, TopicFilter) - ) + ), + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID), + ok end ). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl new file mode 100644 index 000000000..7815e25f5 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl @@ -0,0 +1,31 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_PS_DS_HRL). +-define(EMQX_PS_DS_HRL, true). + +-define(PS_ROUTER_TAB, emqx_ds_ps_router). +-define(PS_FILTERS_TAB, emqx_ds_ps_filters). + +-record(ps_route, { + topic :: binary(), + dest :: emqx_ds:session_id() +}). +-record(ps_routeidx, { + entry :: emqx_topic_index:key(emqx_persistent_session_ds_router:dest()), + unused = [] :: nil() +}). + +-endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_router.erl b/apps/emqx/src/emqx_persistent_session_ds_router.erl new file mode 100644 index 000000000..4400b23ff --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds_router.erl @@ -0,0 +1,230 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds_router). + +-include("emqx.hrl"). +-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl"). + +-export([init_tables/0]). + +%% Route APIs +-export([ + do_add_route/2, + do_delete_route/2, + match_routes/1, + lookup_routes/1, + foldr_routes/2, + foldl_routes/2 +]). + +-export([cleanup_routes/1]). +-export([print_routes/1]). +-export([topics/0]). + +-ifdef(TEST). +-export([has_route/2]). +-endif. + +-type dest() :: emqx_ds:session_id(). + +-export_type([dest/0]). + +%%-------------------------------------------------------------------- +%% Table Initialization +%%-------------------------------------------------------------------- + +init_tables() -> + mria_config:set_dirty_shard(?PS_ROUTER_SHARD, true), + ok = mria:create_table(?PS_ROUTER_TAB, [ + {type, bag}, + {rlog_shard, ?PS_ROUTER_SHARD}, + {storage, ram_copies}, + {record_name, ps_route}, + {attributes, record_info(fields, ps_route)}, + {storage_properties, [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ]} + ]), + ok = mria:create_table(?PS_FILTERS_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PS_ROUTER_SHARD}, + {storage, ram_copies}, + {record_name, ps_routeidx}, + {attributes, record_info(fields, ps_routeidx)}, + {storage_properties, [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, auto} + ]} + ]} + ]), + ok = mria:wait_for_tables([?PS_ROUTER_TAB, ?PS_FILTERS_TAB]), + ok. + +%%-------------------------------------------------------------------- +%% Route APIs +%%-------------------------------------------------------------------- + +-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. +do_add_route(Topic, Dest) when is_binary(Topic) -> + case has_route(Topic, Dest) of + true -> + ok; + false -> + mria_insert_route(Topic, Dest) + end. + +-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. +do_delete_route(Topic, Dest) -> + case emqx_trie_search:filter(Topic) of + Words when is_list(Words) -> + K = emqx_topic_index:make_key(Words, Dest), + mria:dirty_delete(?PS_FILTERS_TAB, K); + false -> + mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest}) + end. + +%% @doc Take a real topic (not filter) as input, return the matching topics and topic +%% filters associated with route destination. +-spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. +match_routes(Topic) when is_binary(Topic) -> + lookup_route_tab(Topic) ++ + [match_to_route(M) || M <- match_filters(Topic)]. + +%% @doc Take a topic or filter as input, and return the existing routes with exactly +%% this topic or filter. +-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. +lookup_routes(Topic) -> + case emqx_topic:wildcard(Topic) of + true -> + Pat = #ps_routeidx{entry = emqx_topic_index:make_key(Topic, '$1')}, + [Dest || [Dest] <- ets:match(?PS_FILTERS_TAB, Pat)]; + false -> + lookup_route_tab(Topic) + end. + +-spec has_route(emqx_types:topic(), dest()) -> boolean(). +has_route(Topic, Dest) -> + case emqx_topic:wildcard(Topic) of + true -> + ets:member(?PS_FILTERS_TAB, emqx_topic_index:make_key(Topic, Dest)); + false -> + has_route_tab_entry(Topic, Dest) + end. + +-spec topics() -> list(emqx_types:topic()). +topics() -> + Pat = #ps_routeidx{entry = '$1'}, + Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?PS_FILTERS_TAB, Pat)], + list_route_tab_topics() ++ Filters. + +%% @doc Print routes to a topic +-spec print_routes(emqx_types:topic()) -> ok. +print_routes(Topic) -> + lists:foreach( + fun(#ps_route{topic = To, dest = Dest}) -> + io:format("~ts -> ~ts~n", [To, Dest]) + end, + match_routes(Topic) + ). + +-spec cleanup_routes(emqx_ds:session_id()) -> ok. +cleanup_routes(DSSessionId) -> + %% NOTE + %% No point in transaction here because all the operations on filters table are dirty. + ok = ets:foldl( + fun(#ps_routeidx{entry = K}, ok) -> + case get_dest_session_id(emqx_topic_index:get_id(K)) of + DSSessionId -> + mria:dirty_delete(?PS_FILTERS_TAB, K); + _ -> + ok + end + end, + ok, + ?PS_FILTERS_TAB + ), + ok = ets:foldl( + fun(#ps_route{dest = Dest} = Route, ok) -> + case get_dest_session_id(Dest) of + DSSessionId -> + mria:dirty_delete_object(?PS_ROUTER_TAB, Route); + _ -> + ok + end + end, + ok, + ?PS_ROUTER_TAB + ). + +-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. +foldl_routes(FoldFun, AccIn) -> + fold_routes(foldl, FoldFun, AccIn). + +-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. +foldr_routes(FoldFun, AccIn) -> + fold_routes(foldr, FoldFun, AccIn). + +%%-------------------------------------------------------------------- +%% Internal fns +%%-------------------------------------------------------------------- + +mria_insert_route(Topic, Dest) -> + case emqx_trie_search:filter(Topic) of + Words when is_list(Words) -> + K = emqx_topic_index:make_key(Words, Dest), + mria:dirty_write(?PS_FILTERS_TAB, #ps_routeidx{entry = K}); + false -> + mria_route_tab_insert(#ps_route{topic = Topic, dest = Dest}) + end. + +fold_routes(FunName, FoldFun, AccIn) -> + FilterFoldFun = mk_filtertab_fold_fun(FoldFun), + Acc = ets:FunName(FoldFun, AccIn, ?PS_ROUTER_TAB), + ets:FunName(FilterFoldFun, Acc, ?PS_FILTERS_TAB). + +mk_filtertab_fold_fun(FoldFun) -> + fun(#ps_routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end. + +match_filters(Topic) -> + emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []). + +get_dest_session_id({_, DSSessionId}) -> + DSSessionId; +get_dest_session_id(DSSessionId) -> + DSSessionId. + +match_to_route(M) -> + #ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. + +mria_route_tab_insert(Route) -> + mria:dirty_write(?PS_ROUTER_TAB, Route). + +lookup_route_tab(Topic) -> + ets:lookup(?PS_ROUTER_TAB, Topic). + +has_route_tab_entry(Topic, Dest) -> + [] =/= ets:match(?PS_ROUTER_TAB, #ps_route{topic = Topic, dest = Dest}). + +list_route_tab_topics() -> + mnesia:dirty_all_keys(?PS_ROUTER_TAB). + +mria_route_tab_delete(Route) -> + mria:dirty_delete_object(?PS_ROUTER_TAB, Route). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index ebc1a00a3..ce71ade91 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -317,6 +317,8 @@ is_subscriptions_full(#session{ -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> session(). +add_persistent_subscription(_TopicFilterBin, _ClientId, Session = #session{is_persistent = false}) -> + Session; add_persistent_subscription(TopicFilterBin, ClientId, Session) -> _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId), Session. diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index c4f7ef73b..a04a0d4de 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -19,10 +19,13 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -compile(export_all). -compile(nowarn_export_all). +-import(emqx_common_test_helpers, [on_exit/1]). + -define(DS_SHARD, <<"local">>). all() -> @@ -33,29 +36,31 @@ init_per_suite(Config) -> %% TODO: remove after other suites start to use `emx_cth_suite' application:stop(emqx), application:stop(emqx_durable_storage), - TCApps = emqx_cth_suite:start( - app_specs(), - #{work_dir => emqx_cth_suite:work_dir(Config)} - ), - [{tc_apps, TCApps} | Config]. + Config. -end_per_suite(Config) -> - TCApps = ?config(tc_apps, Config), - emqx_cth_suite:stop(TCApps), +end_per_suite(_Config) -> ok. -init_per_testcase(t_session_subscription_iterators, Config) -> +init_per_testcase(t_session_subscription_iterators = TestCase, Config) -> Cluster = cluster(), - Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}), [{nodes, Nodes} | Config]; -init_per_testcase(_TestCase, Config) -> - Config. +init_per_testcase(TestCase, Config) -> + Apps = emqx_cth_suite:start( + app_specs(), + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), + [{apps, Apps} | Config]. end_per_testcase(t_session_subscription_iterators, Config) -> Nodes = ?config(nodes, Config), + emqx_common_test_helpers:call_janitor(60_000), ok = emqx_cth_cluster:stop(Nodes), ok; -end_per_testcase(_TestCase, _Config) -> +end_per_testcase(_TestCase, Config) -> + Apps = ?config(apps, Config), + emqx_common_test_helpers:call_janitor(60_000), + emqx_cth_suite:stop(Apps), ok. t_messages_persisted(_Config) -> @@ -75,12 +80,12 @@ t_messages_persisted(_Config) -> Messages = [ M1 = {<<"client/1/topic">>, <<"1">>}, M2 = {<<"client/2/topic">>, <<"2">>}, - M3 = {<<"client/3/topic/sub">>, <<"3">>}, - M4 = {<<"client/4">>, <<"4">>}, + _M3 = {<<"client/3/topic/sub">>, <<"3">>}, + _M4 = {<<"client/4">>, <<"4">>}, M5 = {<<"random/5">>, <<"5">>}, - M6 = {<<"random/6/topic">>, <<"6">>}, + _M6 = {<<"random/6/topic">>, <<"6">>}, M7 = {<<"client/7/topic">>, <<"7">>}, - M8 = {<<"client/8/topic/sub">>, <<"8">>}, + _M8 = {<<"client/8/topic/sub">>, <<"8">>}, M9 = {<<"random/9">>, <<"9">>}, M10 = {<<"random/10">>, <<"10">>} ], @@ -94,8 +99,53 @@ t_messages_persisted(_Config) -> ct:pal("Persisted = ~p", [Persisted]), ?assertEqual( - % [M1, M2, M5, M7, M9, M10], - [M1, M2, M3, M4, M5, M6, M7, M8, M9, M10], + [M1, M2, M5, M7, M9, M10], + [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted] + ), + + ok. + +t_messages_persisted_2(_Config) -> + Prefix = atom_to_binary(?FUNCTION_NAME), + C1 = connect(<>, _CleanStart0 = true, _EI0 = 30), + CP = connect(<>, _CleanStart1 = true, _EI1 = undefined), + T = fun(T0) -> <> end, + + %% won't be persisted + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"random/topic">>), <<"0">>, 1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"client/1/topic">>), <<"1">>, 1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"client/2/topic">>), <<"2">>, 1), + + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(C1, T(<<"client/+/topic">>), qos1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"random/topic">>), <<"3">>, 1), + %% will be persisted + {ok, #{reason_code := ?RC_SUCCESS}} = + emqtt:publish(CP, T(<<"client/1/topic">>), <<"4">>, 1), + {ok, #{reason_code := ?RC_SUCCESS}} = + emqtt:publish(CP, T(<<"client/2/topic">>), <<"5">>, 1), + + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C1, T(<<"client/+/topic">>)), + %% won't be persisted + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"random/topic">>), <<"6">>, 1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"client/1/topic">>), <<"7">>, 1), + {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = + emqtt:publish(CP, T(<<"client/2/topic">>), <<"8">>, 1), + + Persisted = consume(?DS_SHARD, {['#'], 0}), + + ct:pal("Persisted = ~p", [Persisted]), + + ?assertEqual( + [ + {T(<<"client/1/topic">>), <<"4">>}, + {T(<<"client/2/topic">>), <<"5">>} + ], [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted] ), @@ -121,12 +171,11 @@ t_session_subscription_iterators(Config) -> lists:seq(1, 4) ), ct:pal("starting"), - {ok, Client} = emqtt:start_link([ - {port, Port}, - {clientid, ClientId}, - {proto_ver, v5} - ]), - {ok, _} = emqtt:connect(Client), + Client = connect(#{ + clientid => ClientId, + port => Port, + properties => #{'Session-Expiry-Interval' => 300} + }), ct:pal("publishing 1"), Message1 = emqx_message:make(Topic, Payload1), publish(Node1, Message1), @@ -195,15 +244,19 @@ t_session_subscription_iterators(Config) -> %% connect(ClientId, CleanStart, EI) -> - {ok, Client} = emqtt:start_link([ - {clientid, ClientId}, - {proto_ver, v5}, - {clean_start, CleanStart}, - {properties, - maps:from_list( - [{'Session-Expiry-Interval', EI} || is_integer(EI)] - )} - ]), + connect(#{ + clientid => ClientId, + clean_start => CleanStart, + properties => maps:from_list( + [{'Session-Expiry-Interval', EI} || is_integer(EI)] + ) + }). + +connect(Opts0 = #{}) -> + Defaults = #{proto_ver => v5}, + Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), + {ok, Client} = emqtt:start_link(Opts), + on_exit(fun() -> catch emqtt:stop(Client) end), {ok, _} = emqtt:connect(Client), Client. @@ -222,6 +275,18 @@ consume(It) -> [] end. +delete_all_messages() -> + Persisted = consume(?DS_SHARD, {['#'], 0}), + lists:foreach( + fun(Msg) -> + GUID = emqx_message:id(Msg), + Topic = emqx_topic:words(emqx_message:topic(Msg)), + Timestamp = emqx_guid:timestamp(GUID), + ok = emqx_ds_storage_layer:delete(?DS_SHARD, GUID, Timestamp, Topic) + end, + Persisted + ). + receive_messages(Count) -> receive_messages(Count, []). diff --git a/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl new file mode 100644 index 000000000..742dc5b41 --- /dev/null +++ b/apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl @@ -0,0 +1,178 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds_router_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(R, emqx_persistent_session_ds_router). +-define(DEF_DS_SESSION_ID, <<"some-client-id">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = filename:join([?config(priv_dir, Config), ?MODULE]), + AppSpecs = [ + emqx_durable_storage, + {emqx, #{ + config => #{persistent_session_store => #{ds => true}}, + override_env => [{boot_modules, [broker]}] + }} + ], + Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok. + +init_per_testcase(_TestCase, Config) -> + clear_tables(), + Config. + +end_per_testcase(_TestCase, _Config) -> + clear_tables(). + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +clear_tables() -> + lists:foreach( + fun mnesia:clear_table/1, + [?PS_ROUTER_TAB, ?PS_FILTERS_TAB] + ). + +add_route(TopicFilter) -> + ?R:do_add_route(TopicFilter, ?DEF_DS_SESSION_ID). + +delete_route(TopicFilter) -> + ?R:do_delete_route(TopicFilter, ?DEF_DS_SESSION_ID). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +% t_lookup_routes(_) -> +% error('TODO'). + +t_add_delete(_) -> + add_route(<<"a/b/c">>), + add_route(<<"a/b/c">>), + add_route(<<"a/+/b">>), + ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), + delete_route(<<"a/b/c">>), + delete_route(<<"a/+/b">>), + ?assertEqual([], ?R:topics()). + +t_add_delete_incremental(_) -> + add_route(<<"a/b/c">>), + add_route(<<"a/+/c">>), + add_route(<<"a/+/+">>), + add_route(<<"a/b/#">>), + add_route(<<"#">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/+/+">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/+/c">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/+/c">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/+/+">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/+/+">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/b/#">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/b/c">>), + ?assertEqual( + [#ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ). + +t_do_add_delete(_) -> + add_route(<<"a/b/c">>), + add_route(<<"a/b/c">>), + add_route(<<"a/+/b">>), + ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), + + delete_route(<<"a/b/c">>), + delete_route(<<"a/+/b">>), + ?assertEqual([], ?R:topics()). + +t_match_routes(_) -> + add_route(<<"a/b/c">>), + add_route(<<"a/+/c">>), + add_route(<<"a/b/#">>), + add_route(<<"#">>), + ?assertEqual( + [ + #ps_route{topic = <<"#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/+/c">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/#">>, dest = ?DEF_DS_SESSION_ID}, + #ps_route{topic = <<"a/b/c">>, dest = ?DEF_DS_SESSION_ID} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + delete_route(<<"a/b/c">>), + delete_route(<<"a/+/c">>), + delete_route(<<"a/b/#">>), + delete_route(<<"#">>), + ?assertEqual([], lists:sort(?R:match_routes(<<"a/b/c">>))). + +t_print_routes(_) -> + add_route(<<"+/#">>), + add_route(<<"+/+">>), + ?R:print_routes(<<"a/b">>). + +t_has_route(_) -> + add_route(<<"devices/+/messages">>), + ?assert(?R:has_route(<<"devices/+/messages">>, ?DEF_DS_SESSION_ID)), + delete_route(<<"devices/+/messages">>). diff --git a/apps/emqx/test/emqx_sys_mon_SUITE.erl b/apps/emqx/test/emqx_sys_mon_SUITE.erl index 805a06328..f7e1790f4 100644 --- a/apps/emqx/test/emqx_sys_mon_SUITE.erl +++ b/apps/emqx/test/emqx_sys_mon_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -define(SYSMON, emqx_sys_mon). @@ -64,60 +65,66 @@ all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(t_sys_mon, Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps( - [], - fun - (emqx) -> - application:set_env(emqx, sysmon, [ - {busy_dist_port, true}, - {busy_port, false}, - {large_heap, 8388608}, - {long_schedule, 240}, - {long_gc, 0} - ]), - ok; - (_) -> - ok - end +init_per_testcase(t_sys_mon = TestCase, Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, #{ + override_env => [ + {sys_mon, [ + {busy_dist_port, true}, + {busy_port, false}, + {large_heap, 8388608}, + {long_schedule, 240}, + {long_gc, 0} + ]} + ] + }} + ], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} ), - Config; -init_per_testcase(t_sys_mon2, Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps( - [], - fun - (emqx) -> - application:set_env(emqx, sysmon, [ - {busy_dist_port, false}, - {busy_port, true}, - {large_heap, 8388608}, - {long_schedule, 0}, - {long_gc, 200}, - {nothing, 0} - ]), - ok; - (_) -> - ok - end + [{apps, Apps} | Config]; +init_per_testcase(t_sys_mon2 = TestCase, Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, #{ + override_env => [ + {sys_mon, [ + {busy_dist_port, false}, + {busy_port, true}, + {large_heap, 8388608}, + {long_schedule, 0}, + {long_gc, 200}, + {nothing, 0} + ]} + ] + }} + ], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), + [{apps, Apps} | Config]; +init_per_testcase(t_procinfo = TestCase, Config) -> + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} ), - Config; -init_per_testcase(t_procinfo, Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), ok = meck:new(emqx_vm, [passthrough, no_history]), - Config; -init_per_testcase(_, Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), - Config. + [{apps, Apps} | Config]; +init_per_testcase(TestCase, Config) -> + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), + [{apps, Apps} | Config]. -end_per_testcase(t_procinfo, _Config) -> +end_per_testcase(t_procinfo, Config) -> + Apps = ?config(apps, Config), ok = meck:unload(emqx_vm), - emqx_common_test_helpers:stop_apps([]); -end_per_testcase(_, _Config) -> - emqx_common_test_helpers:stop_apps([]). + ok = emqx_cth_suite:stop(Apps), + ok; +end_per_testcase(_, Config) -> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps), + ok. t_procinfo(_) -> ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end), diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 0b0ab7121..4ba04c758 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -34,22 +34,15 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:boot_modules(all), - ?check_trace( - ?wait_async_action( - emqx_common_test_helpers:start_apps([]), - #{?snk_kind := listener_started, bind := 1883}, - timer:seconds(10) - ), - fun(Trace) -> - %% more than one listener - ?assertMatch([_ | _], ?of_kind(listener_started, Trace)) - end + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(Config)} ), - Config. + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]), +end_per_suite(Config) -> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps), ok. %%-------------------------------------------------------------------- %% Testcases diff --git a/apps/emqx/test/emqx_tls_certfile_gc_SUITE.erl b/apps/emqx/test/emqx_tls_certfile_gc_SUITE.erl index 4d53a9413..d0cfbd26f 100644 --- a/apps/emqx/test/emqx_tls_certfile_gc_SUITE.erl +++ b/apps/emqx/test/emqx_tls_certfile_gc_SUITE.erl @@ -32,13 +32,10 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - _ = application:load(emqx), - ok = application:set_env(emqx, data_dir, ?config(priv_dir, Config)), - ok = emqx_config:save_schema_mod_and_names(?MODULE), Config. end_per_suite(_Config) -> - emqx_config:erase_all(). + ok. init_per_testcase(TC, Config) -> TCAbsDir = filename:join(?config(priv_dir, Config), TC), @@ -46,9 +43,10 @@ init_per_testcase(TC, Config) -> ok = snabbkaffe:start_trace(), [{tc_name, atom_to_list(TC)}, {tc_absdir, TCAbsDir} | Config]. -end_per_testcase(_TC, Config) -> +end_per_testcase(_TC, _Config) -> ok = snabbkaffe:stop(), - ok = application:set_env(emqx, data_dir, ?config(priv_dir, Config)), + _ = emqx_schema_hooks:erase_injections(), + _ = emqx_config:erase_all(), ok. t_no_orphans(Config) -> @@ -371,16 +369,18 @@ t_gc_spares_symlinked_datadir(Config) -> ok = proc_lib:stop(Pid). -t_gc_active(_Config) -> - ok = emqx_common_test_helpers:boot_modules([]), - ok = emqx_common_test_helpers:start_apps([]), +t_gc_active(Config) -> + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), try ?assertEqual( {ok, []}, emqx_tls_certfile_gc:run() ) after - emqx_common_test_helpers:stop_apps([]) + emqx_cth_suite:stop(Apps) end. orphans() -> diff --git a/apps/emqx/test/emqx_trace_SUITE.erl b/apps/emqx/test/emqx_trace_SUITE.erl index 7e932a1d0..02bac9d6c 100644 --- a/apps/emqx/test/emqx_trace_SUITE.erl +++ b/apps/emqx/test/emqx_trace_SUITE.erl @@ -34,17 +34,22 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([]), + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), Listeners = emqx_listeners:list(), ct:pal("emqx_listeners:list() = ~p~n", [Listeners]), ?assertMatch( [_ | _], [ID || {ID, #{running := true}} <- Listeners] ), - Config. + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]). +end_per_suite(Config) -> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps), + ok. init_per_testcase(_, Config) -> reload(), diff --git a/apps/emqx/test/emqx_trace_handler_SUITE.erl b/apps/emqx/test/emqx_trace_handler_SUITE.erl index 089575a1d..9ec4441c5 100644 --- a/apps/emqx/test/emqx_trace_handler_SUITE.erl +++ b/apps/emqx/test/emqx_trace_handler_SUITE.erl @@ -32,12 +32,16 @@ all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8]. init_per_suite(Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), - Config. + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]). +end_per_suite(Config) -> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps), + ok. init_per_testcase(t_trace_clientid, Config) -> init(), diff --git a/apps/emqx/test/emqx_vm_mon_SUITE.erl b/apps/emqx/test/emqx_vm_mon_SUITE.erl index ceeffafb5..d8f134129 100644 --- a/apps/emqx/test/emqx_vm_mon_SUITE.erl +++ b/apps/emqx/test/emqx_vm_mon_SUITE.erl @@ -20,12 +20,15 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(t_too_many_processes_alarm, Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), +init_per_testcase(t_too_many_processes_alarm = TestCase, Config) -> + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), emqx_config:put([sysmon, vm], #{ process_high_watermark => 0, process_low_watermark => 0, @@ -34,14 +37,18 @@ init_per_testcase(t_too_many_processes_alarm, Config) -> }), ok = supervisor:terminate_child(emqx_sys_sup, emqx_vm_mon), {ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_vm_mon), - Config; -init_per_testcase(_, Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), - Config. + [{apps, Apps} | Config]; +init_per_testcase(TestCase, Config) -> + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), + [{apps, Apps} | Config]. -end_per_testcase(_, _Config) -> - emqx_common_test_helpers:stop_apps([]). +end_per_testcase(_, Config) -> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps), + ok. t_too_many_processes_alarm(_) -> timer:sleep(500), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 3a26afec6..b31b39ce1 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -49,6 +49,10 @@ init_per_testcase(TestCase, Config) when TestCase =/= t_ws_non_check_origin -> add_bucket(), + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), %% Meck Cm ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end), @@ -80,37 +84,32 @@ init_per_testcase(TestCase, Config) when ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end), ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end), - PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]), - [ - {prev_config, PrevConfig} - | Config - ]; -init_per_testcase(t_ws_non_check_origin, Config) -> + [{apps, Apps} | Config]; +init_per_testcase(t_ws_non_check_origin = TestCase, Config) -> add_bucket(), - ok = emqx_common_test_helpers:start_apps([]), - PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]), + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), emqx_config:put_listener_conf(ws, default, [websocket, check_origin_enable], false), emqx_config:put_listener_conf(ws, default, [websocket, check_origins], []), - [ - {prev_config, PrevConfig} - | Config - ]; -init_per_testcase(_, Config) -> + [{apps, Apps} | Config]; +init_per_testcase(TestCase, Config) -> add_bucket(), - PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]), - ok = emqx_common_test_helpers:start_apps([]), - [ - {prev_config, PrevConfig} - | Config - ]. + Apps = emqx_cth_suite:start( + [emqx], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), + [{apps, Apps} | Config]. -end_per_testcase(TestCase, _Config) when +end_per_testcase(TestCase, Config) when TestCase =/= t_ws_sub_protocols_mqtt_equivalents, TestCase =/= t_ws_sub_protocols_mqtt, TestCase =/= t_ws_check_origin, TestCase =/= t_ws_non_check_origin, TestCase =/= t_ws_pingreq_before_connected -> + Apps = ?config(apps, Config), del_bucket(), lists:foreach( fun meck:unload/1, @@ -122,32 +121,20 @@ end_per_testcase(TestCase, _Config) when emqx_hooks, emqx_metrics ] - ); + ), + ok = emqx_cth_suite:stop(Apps), + ok; end_per_testcase(t_ws_non_check_origin, Config) -> + Apps = ?config(apps, Config), del_bucket(), - PrevConfig = ?config(prev_config, Config), - emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig), - stop_apps(), + ok = emqx_cth_suite:stop(Apps), ok; end_per_testcase(_, Config) -> + Apps = ?config(apps, Config), del_bucket(), - PrevConfig = ?config(prev_config, Config), - emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig), - stop_apps(), + ok = emqx_cth_suite:stop(Apps), Config. -init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([]), - Config. - -end_per_suite(_) -> - emqx_common_test_helpers:stop_apps([]), - ok. - -%% FIXME: this is a temp fix to tests share configs. -stop_apps() -> - emqx_common_test_helpers:stop_apps([], #{erase_all_configs => false}). - %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index cbcdb0b8c..7b36bd7bd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -15,7 +15,6 @@ start(_Type, _Args) -> emqx_ds_sup:start_link(). init_mnesia() -> - %% FIXME: This is a temporary workaround to avoid crashes when starting on Windows ok = mria:create_table( ?SESSION_TAB, [ @@ -39,6 +38,7 @@ init_mnesia() -> ok. storage() -> + %% FIXME: This is a temporary workaround to avoid crashes when starting on Windows case mria:rocksdb_backend_available() of true -> rocksdb_copies; diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 57608e5cb..a97b89580 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -77,11 +77,14 @@ %% %%================================================================================ +-behaviour(emqx_ds_storage_layer). + %% API: -export([create_new/3, open/5]). -export([make_keymapper/1]). -export([store/5]). +-export([delete/4]). -export([make_iterator/2]). -export([make_iterator/3]). -export([next/1]). @@ -270,13 +273,19 @@ make_keymapper(#{ epoch = 1 bsl TimestampLSBs }. --spec store(db(), emqx_guid:guid(), time(), topic(), binary()) -> +-spec store(db(), emqx_guid:guid(), emqx_ds:time(), topic(), binary()) -> ok | {error, _TODO}. store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) -> Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). +-spec delete(db(), emqx_guid:guid(), emqx_ds:time(), topic()) -> + ok | {error, _TODO}. +delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic) -> + Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), + rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options). + -spec make_iterator(db(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 47c29e170..a16c9b476 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -10,6 +10,7 @@ -export([create_generation/3]). -export([store/5]). +-export([delete/4]). -export([make_iterator/2, next/1]). @@ -109,7 +110,16 @@ -callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> term(). --callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) -> +-callback store( + _Schema, + _MessageID :: binary(), + emqx_ds:time(), + emqx_ds:topic(), + _Payload :: binary() +) -> + ok | {error, _}. + +-callback delete(_Schema, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) -> ok | {error, _}. -callback make_iterator(_Schema, emqx_ds:replay()) -> @@ -117,7 +127,7 @@ -callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}. --callback preserve_iterator(_Schema, _It) -> term(). +-callback preserve_iterator(_It) -> term(). -callback next(It) -> {value, binary(), It} | none | {error, closed}. @@ -140,6 +150,12 @@ store(Shard, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), Mod:store(Data, GUID, Time, Topic, Msg). +-spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) -> + ok | {error, _}. +delete(Shard, GUID, Time, Topic) -> + {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), + Mod:delete(Data, GUID, Time, Topic). + -spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(Shard, Replay = {_, StartTime}) ->