From b9e8bde3b0d9b4be56a0c011d47152dc82b1bad7 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Sun, 10 Feb 2019 23:57:51 +0100 Subject: [PATCH] Add first CT test for emqx_portal based on rpc --- src/portal/emqx_portal_msg.erl | 10 +++-- test/emqx_ct_helpers.erl | 49 ++++++++++++++++++++++- test/emqx_portal_SUITE.erl | 73 ++++++++++++++++++++++++++++++++++ test/emqx_shared_sub_SUITE.erl | 48 +--------------------- 4 files changed, 129 insertions(+), 51 deletions(-) create mode 100644 test/emqx_portal_SUITE.erl diff --git a/src/portal/emqx_portal_msg.erl b/src/portal/emqx_portal_msg.erl index 252ea72d0..12f5926a3 100644 --- a/src/portal/emqx_portal_msg.erl +++ b/src/portal/emqx_portal_msg.erl @@ -32,8 +32,8 @@ %% 1. Mount topic to a prefix %% 2. fix QoS to 1 -spec to_export(msg(), undefined | binary()) -> msg(). -to_export(#{topic := Topic} = Msg, Mountpoint) -> - Msg#{topic := topic(Mountpoint, Topic), qos => 1}. +to_export(#message{topic = Topic} = Msg, Mountpoint) -> + Msg#message{topic = topic(Mountpoint, Topic), qos = 1}. %% @doc Make `binary()' in order to make iodata to be persisted on disk. -spec to_binary(msg()) -> binary(). @@ -46,15 +46,19 @@ from_binary(Bin) -> binary_to_term(Bin). %% @doc Estimate the size of a message. %% Count only the topic length + payload size -spec estimate_size(msg()) -> integer(). -estimate_size(#{topic := Topic, payload := Payload}) -> +estimate_size(#message{topic = Topic, payload = Payload}) -> size(Topic) + size(Payload). %% @doc By message/batch receiver, transform received batch into %% messages to dispatch to local brokers. to_broker_msgs(Batch) -> lists:map(fun to_broker_msg/1, Batch). +to_broker_msg(#message{} = Msg) -> + %% internal format from another EMQX node via rpc + Msg; to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic, properties := Props, payload := Payload}) -> + %% published from remote node over a MQTT connection emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(portal, QoS, Topic, Payload))). diff --git a/test/emqx_ct_helpers.erl b/test/emqx_ct_helpers.erl index eae22d6ab..361a6b4a9 100644 --- a/test/emqx_ct_helpers.erl +++ b/test/emqx_ct_helpers.erl @@ -14,9 +14,56 @@ -module(emqx_ct_helpers). --export([ensure_mnesia_stopped/0]). +-export([ensure_mnesia_stopped/0, wait_for/4]). ensure_mnesia_stopped() -> ekka_mnesia:ensure_stopped(), ekka_mnesia:delete_schema(). +%% Help function to wait for Fun to yeild 'true'. +wait_for(Fn, Ln, F, Timeout) -> + {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), + wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). + +wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) -> + receive + {'DOWN', Mref, process, Pid, normal} -> + ok; + {'DOWN', Mref, process, Pid, {unexpected, Result}} -> + erlang:error({unexpected, Fn, Ln, Result}); + {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} -> + erlang:raise(C, {Fn, Ln, E}, S) + after + Timeout -> + case Kill of + true -> + erlang:demonitor(Mref, [flush]), + erlang:exit(Pid, kill), + erlang:error({Fn, Ln, timeout}); + false -> + Pid ! stop, + wait_for_down(Fn, Ln, Timeout, Pid, Mref, true) + end + end. + +wait_loop(_F, ok) -> exit(normal); +wait_loop(F, LastRes) -> + receive + stop -> erlang:exit(LastRes) + after + 100 -> + Res = catch_call(F), + wait_loop(F, Res) + end. + +catch_call(F) -> + try + case F() of + true -> ok; + Other -> {unexpected, Other} + end + catch + C : E : S -> + {crashed, {C, E, S}} + end. + diff --git a/test/emqx_portal_SUITE.erl b/test/emqx_portal_SUITE.erl new file mode 100644 index 000000000..21effdb08 --- /dev/null +++ b/test/emqx_portal_SUITE.erl @@ -0,0 +1,73 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_portal_SUITE). + +-export([all/0, init_per_suite/1, end_per_suite/1]). +-export([t_rpc/1, + t_mqtt/1 + ]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include("emqx_mqtt.hrl"). +-include("emqx.hrl"). + +-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). + +all() -> [t_rpc, + t_mqtt + ]. + +init_per_suite(Config) -> + case node() of + nonode@nohost -> + net_kernel:start(['emqx@127.0.0.1', longnames]); + _ -> + ok + end, + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_rpc(Config) when is_list(Config) -> + Cfg = #{address => node(), + forwards => [<<"t_rpc/#">>], + connect_module => emqx_portal_rpc, + mountpoint => <<"forwarded">> + }, + {ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg), + ClientId = <<"ClientId">>, + try + {ok, ConnPid} = emqx_mock_client:start_link(ClientId), + {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), + %% message from a different client, to avoid getting terminated by no-local + Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>), + ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]), + PacketId = 1, + emqx_session:publish(SPid, PacketId, Msg1), + ?wait(case emqx_mock_client:get_last_message(ConnPid) of + {publish, PacketId, #message{topic = <<"forwarded/t_rpc/one">>}} -> true; + Other -> Other + end, 4000), + emqx_mock_client:close_session(ConnPid) + after + ok = emqx_portal:stop(Pid) + end. + +t_mqtt(Config) when is_list(Config) -> ok. + + diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 1ee059812..41537e58a 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -29,7 +29,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). +-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). all() -> [t_random_basic, t_random, @@ -259,49 +259,3 @@ ensure_config(Strategy, AckEnabled) -> subscribed(Group, Topic, Pid) -> lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)). -wait_for(Fn, Ln, F, Timeout) -> - {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), - wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). - -wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) -> - receive - {'DOWN', Mref, process, Pid, normal} -> - ok; - {'DOWN', Mref, process, Pid, {unexpected, Result}} -> - erlang:error({unexpected, Fn, Ln, Result}); - {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} -> - erlang:raise(C, {Fn, Ln, E}, S) - after - Timeout -> - case Kill of - true -> - erlang:demonitor(Mref, [flush]), - erlang:exit(Pid, kill), - erlang:error({Fn, Ln, timeout}); - false -> - Pid ! stop, - wait_for_down(Fn, Ln, Timeout, Pid, Mref, true) - end - end. - -wait_loop(_F, ok) -> exit(normal); -wait_loop(F, LastRes) -> - receive - stop -> erlang:exit(LastRes) - after - 100 -> - Res = catch_call(F), - wait_loop(F, Res) - end. - -catch_call(F) -> - try - case F() of - true -> ok; - Other -> {unexpected, Other} - end - catch - C : E : S -> - {crashed, {C, E, S}} - end. -