From 79744af68128e189d5ac91a06b3d10ecc95cf22e Mon Sep 17 00:00:00 2001 From: Gilbert Date: Fri, 16 Aug 2019 18:07:12 +0800 Subject: [PATCH] Wrapper proper test cases into common test cases (#2785) * Wrapper proper test cases into common test cases * Improve test cases for reason code module (#2789) * Split 3 proper tests into 3 ct cases * Improve test cases for client, rpc and request-response * Add psk suites to increase coverage * Add sys test cases --- .travis.yml | 1 - Makefile | 8 +- include/emqx_mqtt.hrl | 14 + rebar.config | 3 +- src/emqx_broker.erl | 2 +- src/emqx_channel.erl | 1 - src/emqx_client.erl | 10 +- src/emqx_plugins.erl | 16 +- src/emqx_protocol.erl | 4 +- src/emqx_reason_codes.erl | 17 +- src/emqx_rpc.erl | 5 +- src/emqx_sys.erl | 44 ++- ...{prop_base62.erl => emqx_base62_SUITE.erl} | 30 +- test/emqx_ctl_SUTIES.erl | 17 + test/emqx_plugins_SUITE.erl | 26 ++ test/emqx_psk_SUITE.erl | 67 ++++ test/emqx_reason_codes_SUITE.erl | 225 +++++------ test/emqx_request_handler.erl | 94 +++++ test/emqx_request_responser_SUITE.erl | 69 ++++ test/emqx_request_sender.erl | 77 ++++ test/emqx_rpc_SUITE.erl | 124 ++++++ test/emqx_session_SUITE.erl | 374 +++++++++++++++--- test/emqx_sys_SUITE.erl | 136 +++++++ test/prop_emqx_session.erl | 327 --------------- 24 files changed, 1137 insertions(+), 554 deletions(-) rename test/{prop_base62.erl => emqx_base62_SUITE.erl} (58%) create mode 100644 test/emqx_ctl_SUTIES.erl create mode 100644 test/emqx_plugins_SUITE.erl create mode 100644 test/emqx_psk_SUITE.erl create mode 100644 test/emqx_request_handler.erl create mode 100644 test/emqx_request_responser_SUITE.erl create mode 100644 test/emqx_request_sender.erl create mode 100644 test/emqx_rpc_SUITE.erl create mode 100644 test/emqx_sys_SUITE.erl delete mode 100644 test/prop_emqx_session.erl diff --git a/.travis.yml b/.travis.yml index c0b7bf5b9..abe6f7a6a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,6 @@ script: - make xref - make eunit - make ct - - make proper - make cover after_success: diff --git a/Makefile b/Makefile index b0a1dce16..707380095 100644 --- a/Makefile +++ b/Makefile @@ -15,11 +15,7 @@ RUN_NODE_NAME = emqxdebug@127.0.0.1 all: compile .PHONY: tests -tests: eunit ct proper - -.PHONY: proper -proper: - @rebar3 proper +tests: eunit ct .PHONY: run run: run_setup unlock @@ -99,7 +95,7 @@ ct: ct_setup ## e.g. make ct-one-suite suite=emqx_bridge .PHONY: $(SUITES:%=ct-%) $(CT_SUITES:%=ct-%): ct_setup - @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(@:ct-%=%)_SUITE + @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(@:ct-%=%)_SUITE --cover .PHONY: app.config app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 854399999..cbffe27ee 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -298,6 +298,20 @@ payload :: binary() | undefined }). +%%-------------------------------------------------------------------- +%% MQTT Message Internal +%%-------------------------------------------------------------------- + +-record(mqtt_msg, { + qos = ?QOS_0, + retain = false, + dup = false, + packet_id, + topic, + props, + payload + }). + %%-------------------------------------------------------------------- %% MQTT Packet Match %%-------------------------------------------------------------------- diff --git a/rebar.config b/rebar.config index 0f49084bc..7ae5f1d87 100644 --- a/rebar.config +++ b/rebar.config @@ -21,8 +21,7 @@ {cover_opts, [verbose]}. {cover_export_enabled, true}. -{plugins, [coveralls, - rebar3_proper]}. +{plugins, [coveralls]}. {erl_first_files, ["src/emqx_logger.erl"]}. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 3992a0b35..359e65ab8 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -206,7 +206,7 @@ publish(Msg) when is_record(Msg, message) -> end. %% Called internally --spec(safe_publish(emqx_types:message()) -> ok). +-spec(safe_publish(emqx_types:message()) -> ok | emqx_types:publish_result()). safe_publish(Msg) when is_record(Msg, message) -> try publish(Msg) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 630c4de6a..c8a598ef1 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -674,4 +674,3 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. - diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 511b36b6c..655218329 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -144,15 +144,7 @@ | {force_ping, boolean()} | {properties, properties()}). --record(mqtt_msg, { - qos = ?QOS_0, - retain = false, - dup = false, - packet_id, - topic, - props, - payload - }). + -opaque(mqtt_msg() :: #mqtt_msg{}). diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 7b7b1b895..e75a70e83 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -81,9 +81,9 @@ load_expand_plugins() -> load_expand_plugin(PluginDir) -> init_expand_plugin_config(PluginDir), - Ebin = PluginDir ++ "/ebin", + Ebin = filename:join([PluginDir, "ebin"]), code:add_patha(Ebin), - Modules = filelib:wildcard(Ebin ++ "/*.beam"), + Modules = filelib:wildcard(filename:join([Ebin ++ "*.beam"])), lists:foreach(fun(Mod) -> Module = list_to_atom(filename:basename(Mod, ".beam")), code:load_file(Module) @@ -308,14 +308,11 @@ read_loaded() -> read_loaded(File) -> file:consult(File). write_loaded(AppNames) -> - File = emqx_config:get_env(plugins_loaded_file), - case file:open(File, [binary, write]) of - {ok, Fd} -> - lists:foreach(fun(Name) -> - file:write(Fd, iolist_to_binary(io_lib:format("~p.~n", [Name]))) - end, AppNames); + FilePath = emqx_config:get_env(plugins_loaded_file), + case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- AppNames]) of + ok -> ok; {error, Error} -> - ?LOG(error, "Open File ~p Error: ~p", [File, Error]), + ?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]), {error, Error} end. @@ -324,4 +321,3 @@ plugin_type(protocol) -> protocol; plugin_type(backend) -> backend; plugin_type(bridge) -> bridge; plugin_type(_) -> feature. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 3fa92ac70..6e0ac8336 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -181,7 +181,7 @@ handle_in(?CONNECT_PACKET( handle_out({disconnect, ReasonCode}, NPState) end; -handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState) -> +handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState= #protocol{proto_ver = Ver}) -> case pipeline([fun validate_in/2, fun process_alias/2, fun check_publish/2], Packet, PState) of @@ -189,7 +189,7 @@ handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState) -> process_publish(NPacket, NPState); {error, ReasonCode, NPState} -> ?LOG(warning, "Cannot publish message to ~s due to ~s", - [Topic, emqx_reason_codes:text(ReasonCode)]), + [Topic, emqx_reason_codes:text(ReasonCode, Ver)]), puback(QoS, PacketId, ReasonCode, NPState) end; diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index ef4c0cedd..387208ebe 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -22,6 +22,7 @@ -export([ name/1 , name/2 , text/1 + , text/2 , connack_error/1 , puback/1 ]). @@ -30,7 +31,7 @@ name(I, Ver) when Ver >= ?MQTT_PROTO_V5 -> name(I); -name(0, _Ver) -> connection_acceptd; +name(0, _Ver) -> connection_accepted; name(1, _Ver) -> unacceptable_protocol_version; name(2, _Ver) -> client_identifier_not_valid; name(3, _Ver) -> server_unavaliable; @@ -83,6 +84,16 @@ name(16#A1) -> subscription_identifiers_not_supported; name(16#A2) -> wildcard_subscriptions_not_supported; name(_Code) -> unknown_error. +text(I, Ver) when Ver >= ?MQTT_PROTO_V5 -> + text(I); +text(0, _Ver) -> <<"Connection accepted">>; +text(1, _Ver) -> <<"unacceptable_protocol_version">>; +text(2, _Ver) -> <<"client_identifier_not_valid">>; +text(3, _Ver) -> <<"server_unavaliable">>; +text(4, _Ver) -> <<"malformed_username_or_password">>; +text(5, _Ver) -> <<"unauthorized_client">>; +text(_, _Ver) -> <<"unknown_error">>. + text(16#00) -> <<"Success">>; text(16#01) -> <<"Granted QoS 1">>; text(16#02) -> <<"Granted QoS 2">>; @@ -150,7 +161,8 @@ compat(connack, 16#9F) -> ?CONNACK_SERVER; compat(suback, Code) when Code =< ?QOS_2 -> Code; compat(suback, Code) when Code >= 16#80 -> 16#80; -compat(unsuback, _Code) -> undefined. +compat(unsuback, _Code) -> undefined; +compat(_Other, _Code) -> undefined. connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID; connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD; @@ -167,4 +179,3 @@ connack_error(_) -> ?RC_NOT_AUTHORIZED. %%TODO: This function should be removed. puback([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; puback(L) when is_list(L) -> ?RC_SUCCESS. - diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index c66d938e2..6af676802 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -29,6 +29,8 @@ -define(RPC, gen_rpc). +-define(DefaultClientNum, 1). + call(Node, Mod, Fun, Args) -> filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)). @@ -39,7 +41,7 @@ cast(Node, Mod, Fun, Args) -> filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)). rpc_node(Node) -> - {ok, ClientNum} = application:get_env(gen_rpc, tcp_client_num), + ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum), {Node, rand:uniform(ClientNum)}. rpc_nodes(Nodes) -> @@ -55,4 +57,3 @@ filter_result({Error, Reason}) {badrpc, Reason}; filter_result(Delivery) -> Delivery. - diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index f024d2504..41b27729b 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -23,7 +23,9 @@ -logger_header("[SYS]"). --export([start_link/0]). +-export([ start_link/0 + , stop/0 + ]). -export([ version/0 , uptime/0 @@ -41,23 +43,36 @@ , handle_cast/2 , handle_info/2 , terminate/2 - , code_change/3 ]). -import(emqx_topic, [systop/1]). -import(emqx_misc, [start_timer/2]). --record(state, {start_time, heartbeat, ticker, version, sysdescr}). +-type(timeref() :: reference()). + +-type(tickeref() :: reference()). + +-type(version() :: string()). + +-type(sysdescr() :: string()). + +-record(state, + { start_time :: erlang:timestamp() + , heartbeat :: timeref() + , ticker :: tickeref() + , version :: version() + , sysdescr :: sysdescr() + }). -define(APP, emqx). -define(SYS, ?MODULE). --define(INFO_KEYS, [ - version, % Broker version - uptime, % Broker uptime - datetime, % Broker local datetime - sysdescr % Broker description -]). +-define(INFO_KEYS, + [ version % Broker version + , uptime % Broker uptime + , datetime % Broker local datetime + , sysdescr % Broker description + ]). %%------------------------------------------------------------------------------ %% APIs @@ -67,6 +82,9 @@ start_link() -> gen_server:start_link({local, ?SYS}, ?MODULE, [], []). +stop() -> + gen_server:stop(?SYS). + %% @doc Get sys version -spec(version() -> string()). version() -> @@ -93,12 +111,12 @@ datetime() -> %% @doc Get sys interval -spec(sys_interval() -> pos_integer()). sys_interval() -> - application:get_env(?APP, broker_sys_interval, 60000). + emqx_config:get_env(broker_sys_interval, 60000). %% @doc Get sys heatbeat interval -spec(sys_heatbeat_interval() -> pos_integer()). sys_heatbeat_interval() -> - application:get_env(?APP, broker_sys_heartbeat, 30000). + emqx_config:get_env(broker_sys_heartbeat, 30000). %% @doc Get sys info -spec(info() -> list(tuple())). @@ -154,9 +172,6 @@ handle_info(Info, State) -> terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - %%----------------------------------------------------------------------------- %% Internal functions %%----------------------------------------------------------------------------- @@ -207,4 +222,3 @@ safe_publish(Topic, Flags, Payload) -> emqx_message:set_flags( maps:merge(#{sys => true}, Flags), emqx_message:make(?SYS, Topic, iolist_to_binary(Payload)))). - diff --git a/test/prop_base62.erl b/test/emqx_base62_SUITE.erl similarity index 58% rename from test/prop_base62.erl rename to test/emqx_base62_SUITE.erl index a660012db..83d11ae1c 100644 --- a/test/prop_base62.erl +++ b/test/emqx_base62_SUITE.erl @@ -1,5 +1,33 @@ --module(prop_base62). +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_base62_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + -include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +t_proper_base62(_) -> + Opts = [{numtests, 100}, {to_file, user}], + ?assert(proper:quickcheck(prop_symmetric(), Opts)), + ?assert(proper:quickcheck(prop_size(), Opts)). %%%%%%%%%%%%%%%%%% %%% Properties %%% diff --git a/test/emqx_ctl_SUTIES.erl b/test/emqx_ctl_SUTIES.erl new file mode 100644 index 000000000..a3ce8e8b0 --- /dev/null +++ b/test/emqx_ctl_SUTIES.erl @@ -0,0 +1,17 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ctl_SUTIES). diff --git a/test/emqx_plugins_SUITE.erl b/test/emqx_plugins_SUITE.erl new file mode 100644 index 000000000..5c4d43f24 --- /dev/null +++ b/test/emqx_plugins_SUITE.erl @@ -0,0 +1,26 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugins_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +all() -> emqx_ct:all(?MODULE). diff --git a/test/emqx_psk_SUITE.erl b/test/emqx_psk_SUITE.erl new file mode 100644 index 000000000..2cb4b5894 --- /dev/null +++ b/test/emqx_psk_SUITE.erl @@ -0,0 +1,67 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_psk_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> emqx_ct:all(?MODULE). + +t_lookup(_) -> + ok = load(), + ok = emqx_logger:set_log_level(emergency), + Opts = [{to_file, user}, {numtests, 10}], + ?assert(proper:quickcheck(prop_lookup(), Opts)), + ok = unload(), + ok = emqx_logger:set_log_level(error). + +prop_lookup() -> + ?FORALL({ClientPSKID, UserState}, + {client_pskid(), user_state()}, + begin + case emqx_psk:lookup(psk, ClientPSKID, UserState) of + {ok, _Result} -> true; + error -> true; + _Other -> false + end + end). + +%%-------------------------------------------------------------------- +%% Helper +%%-------------------------------------------------------------------- + +load() -> + ok = meck:new(emqx_hooks, [passthrough, no_history]), + ok = meck:expect(emqx_hooks, run_fold, + fun('tls_handshake.psk_lookup', [ClientPSKID], not_found) -> + unicode:characters_to_binary(ClientPSKID) + end). + +unload() -> + ok = meck:unload(emqx_hooks). + +%%-------------------------------------------------------------------- +%% Generator +%%-------------------------------------------------------------------- + +client_pskid() -> oneof([string(), integer(), [1, [-1]]]). + +user_state() -> term(). diff --git a/test/emqx_reason_codes_SUITE.erl b/test/emqx_reason_codes_SUITE.erl index c0f1c0e74..82acb8091 100644 --- a/test/emqx_reason_codes_SUITE.erl +++ b/test/emqx_reason_codes_SUITE.erl @@ -20,121 +20,126 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). --import(lists, [seq/2, zip/2, foreach/2]). - --define(MQTTV4_CODE_NAMES, [connection_acceptd, - unacceptable_protocol_version, - client_identifier_not_valid, - server_unavaliable, - malformed_username_or_password, - unauthorized_client, - unknown_error]). - --define(MQTTV5_CODE_NAMES, [success, granted_qos1, granted_qos2, disconnect_with_will_message, - no_matching_subscribers, no_subscription_existed, continue_authentication, - re_authenticate, unspecified_error, malformed_Packet, protocol_error, - implementation_specific_error, unsupported_protocol_version, - client_identifier_not_valid, bad_username_or_password, not_authorized, - server_unavailable, server_busy, banned,server_shutting_down, - bad_authentication_method, keepalive_timeout, session_taken_over, - topic_filter_invalid, topic_name_invalid, packet_identifier_inuse, - packet_identifier_not_found, receive_maximum_exceeded, topic_alias_invalid, - packet_too_large, message_rate_too_high, quota_exceeded, - administrative_action, payload_format_invalid, retain_not_supported, - qos_not_supported, use_another_server, server_moved, - shared_subscriptions_not_supported, connection_rate_exceeded, - maximum_connect_time, subscription_identifiers_not_supported, - wildcard_subscriptions_not_supported, unknown_error]). - --define(MQTTV5_CODES, [16#00, 16#01, 16#02, 16#04, 16#10, 16#11, 16#18, 16#19, 16#80, 16#81, 16#82, - 16#83, 16#84, 16#85, 16#86, 16#87, 16#88, 16#89, 16#8A, 16#8B, 16#8C, 16#8D, - 16#8E, 16#8F, 16#90, 16#91, 16#92, 16#93, 16#94, 16#95, 16#96, 16#97, 16#98, - 16#99, 16#9A, 16#9B, 16#9C, 16#9D, 16#9E, 16#9F, 16#A0, 16#A1, 16#A2, code]). - --define(MQTTV5_TXT, [<<"Success">>, <<"Granted QoS 1">>, <<"Granted QoS 2">>, - <<"Disconnect with Will Message">>, <<"No matching subscribers">>, - <<"No subscription existed">>, <<"Continue authentication">>, - <<"Re-authenticate">>, <<"Unspecified error">>, <<"Malformed Packet">>, - <<"Protocol Error">>, <<"Implementation specific error">>, - <<"Unsupported Protocol Version">>, <<"Client Identifier not valid">>, - <<"Bad User Name or Password">>, <<"Not authorized">>, - <<"Server unavailable">>, <<"Server busy">>, <<"Banned">>, - <<"Server shutting down">>, <<"Bad authentication method">>, - <<"Keep Alive timeout">>, <<"Session taken over">>, - <<"Topic Filter invalid">>, <<"Topic Name invalid">>, - <<"Packet Identifier in use">>, <<"Packet Identifier not found">>, - <<"Receive Maximum exceeded">>, <<"Topic Alias invalid">>, - <<"Packet too large">>, <<"Message rate too high">>, <<"Quota exceeded">>, - <<"Administrative action">>, <<"Payload format invalid">>, - <<"Retain not supported">>, <<"QoS not supported">>, - <<"Use another server">>, <<"Server moved">>, - <<"Shared Subscriptions not supported">>, <<"Connection rate exceeded">>, - <<"Maximum connect time">>, <<"Subscription Identifiers not supported">>, - <<"Wildcard Subscriptions not supported">>, <<"Unknown error">>]). - --define(COMPAT_CODES_V5, [16#80, 16#81, 16#82, 16#83, 16#84, 16#85, 16#86, 16#87, - 16#88, 16#89, 16#8A, 16#8B, 16#8C, 16#97, 16#9C, 16#9D, - 16#9F]). - --define(COMPAT_CODES_V4, [?CONNACK_PROTO_VER, ?CONNACK_PROTO_VER, ?CONNACK_PROTO_VER, - ?CONNACK_PROTO_VER, ?CONNACK_PROTO_VER, - ?CONNACK_INVALID_ID, - ?CONNACK_CREDENTIALS, - ?CONNACK_AUTH, - ?CONNACK_SERVER, - ?CONNACK_SERVER, - ?CONNACK_AUTH, - ?CONNACK_SERVER, - ?CONNACK_AUTH, - ?CONNACK_SERVER, ?CONNACK_SERVER, ?CONNACK_SERVER, ?CONNACK_SERVER]). - all() -> emqx_ct:all(?MODULE). -t_mqttv4_name(_) -> - (((codes_test(?MQTT_PROTO_V4)) - (seq(0,6))) - (?MQTTV4_CODE_NAMES)) - (fun emqx_reason_codes:name/2). +t_prop_name_text(_) -> + ?assert(proper:quickcheck(prop_name_text(), prop_name_text(opts))). -t_mqttv5_name(_) -> - (((codes_test(?MQTT_PROTO_V5)) - (?MQTTV5_CODES)) - (?MQTTV5_CODE_NAMES)) - (fun emqx_reason_codes:name/2). +t_prop_compat(_) -> + ?assert(proper:quickcheck(prop_compat(), prop_compat(opts))). -t_text(_) -> - (((codes_test(?MQTT_PROTO_V5)) - (?MQTTV5_CODES)) - (?MQTTV5_TXT)) - (fun emqx_reason_codes:text/1). +t_prop_connack_error(_) -> + ?assert(proper:quickcheck(prop_connack_error(), default_opts([]))). -t_compat(_) -> - (((codes_test(connack)) - (?COMPAT_CODES_V5)) - (?COMPAT_CODES_V4)) - (fun emqx_reason_codes:compat/2), - (((codes_test(suback)) - ([0,1,2, 16#80])) - ([0,1,2, 16#80])) - (fun emqx_reason_codes:compat/2), - (((codes_test(unsuback)) - ([0, 1, 2])) - ([undefined, undefined, undefined])) - (fun emqx_reason_codes:compat/2). +prop_name_text(opts) -> + default_opts([{numtests, 1000}]). -codes_test(AsistVar) -> - fun(CODES) -> - fun(NAMES) -> - fun(Procedure) -> - foreach(fun({Code, Result}) -> - ?assertEqual(Result, case erlang:fun_info(Procedure, name) of - {name, text} -> Procedure(Code); - {name, name} -> Procedure(Code, AsistVar); - {name, compat} -> Procedure(AsistVar, Code) - end) - end, zip(CODES, NAMES)) - end - end - end. +prop_name_text() -> + ?FORALL(UnionArgs, union_args(), + is_atom(apply_fun(name, UnionArgs)) andalso + is_binary(apply_fun(text, UnionArgs))). + +prop_compat(opts) -> + default_opts([{numtests, 512}]). + +prop_compat() -> + ?FORALL(CompatArgs, compat_args(), + begin + Result = apply_fun(compat, CompatArgs), + is_number(Result) orelse Result =:= undefined + end). + +prop_connack_error() -> + ?FORALL(CONNACK_ERROR_ARGS, connack_error_args(), + is_integer(apply_fun(connack_error, CONNACK_ERROR_ARGS))). + +%%-------------------------------------------------------------------- +%% Helper +%%-------------------------------------------------------------------- +default_opts() -> + default_opts([]). + +default_opts(AdditionalOpts) -> + [{to_file, user} | AdditionalOpts]. + +apply_fun(Fun, Args) -> + apply(emqx_reason_codes, Fun, Args). + +%%-------------------------------------------------------------------- +%% Generator +%%-------------------------------------------------------------------- + +union_args() -> + frequency([{6, [real_mqttv3_rc(), mqttv3_version()]}, + {43, [real_mqttv5_rc(), mqttv5_version()]}]). + +compat_args() -> + frequency([{18, [connack, compat_rc()]}, + {2, [suback, compat_rc()]}, + {1, [unsuback, compat_rc()]}]). + +connack_error_args() -> + [frequency([{10, connack_error()}, + {1, unexpected_connack_error()}])]. + +connack_error() -> + oneof([client_identifier_not_valid, + bad_username_or_password, + bad_clientid_or_password, + username_or_password_undefined, + password_error, + not_authorized, + server_unavailable, + server_busy, + banned, + bad_authentication_method]). + +unexpected_connack_error() -> + oneof([who_knows]). + + +real_mqttv3_rc() -> + frequency([{6, mqttv3_rc()}, + {1, unexpected_rc()}]). + +real_mqttv5_rc() -> + frequency([{43, mqttv5_rc()}, + {2, unexpected_rc()}]). + +compat_rc() -> + frequency([{95, ?SUCHTHAT(RC , mqttv5_rc(), RC >= 16#80 orelse RC =< 2)}, + {5, unexpected_rc()}]). + +mqttv3_rc() -> + oneof(mqttv3_rcs()). + +mqttv5_rc() -> + oneof(mqttv5_rcs()). + +unexpected_rc() -> + oneof(unexpected_rcs()). + +mqttv3_rcs() -> + [0, 1, 2, 3, 4, 5]. + +mqttv5_rcs() -> + [16#00, 16#01, 16#02, 16#04, 16#10, 16#11, 16#18, 16#19, + 16#80, 16#81, 16#82, 16#83, 16#84, 16#85, 16#86, 16#87, + 16#88, 16#89, 16#8A, 16#8B, 16#8C, 16#8D, 16#8E, 16#8F, + 16#90, 16#91, 16#92, 16#93, 16#94, 16#95, 16#96, 16#97, + 16#98, 16#99, 16#9A, 16#9B, 16#9C, 16#9D, 16#9E, 16#9F, + 16#A0, 16#A1, 16#A2]. + +unexpected_rcs() -> + ReasonCodes = mqttv3_rcs() ++ mqttv5_rcs(), + Unexpected = lists:seq(0, 16#FF) -- ReasonCodes, + lists:sublist(Unexpected, 5). + +mqttv5_version() -> + ?MQTT_PROTO_V5. + +mqttv3_version() -> + oneof([?MQTT_PROTO_V3, ?MQTT_PROTO_V4]). diff --git a/test/emqx_request_handler.erl b/test/emqx_request_handler.erl new file mode 100644 index 000000000..f2f218638 --- /dev/null +++ b/test/emqx_request_handler.erl @@ -0,0 +1,94 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_request_handler). + +-export([start_link/4, stop/1]). + +-include("emqx_mqtt.hrl"). + +-type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos(). +-type topic() :: emqx_topic:topic(). +-type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()). + +-spec start_link(topic(), qos(), handler(), emqx_client:options()) -> + {ok, pid()} | {error, any()}. +start_link(RequestTopic, QoS, RequestHandler, Options0) -> + Parent = self(), + MsgHandler = make_msg_handler(RequestHandler, Parent), + Options = [{msg_handler, MsgHandler} | Options0], + case emqx_client:start_link(Options) of + {ok, Pid} -> + {ok, _} = emqx_client:connect(Pid), + try subscribe(Pid, RequestTopic, QoS) of + ok -> {ok, Pid}; + {error, _} = Error -> Error + catch + C : E : S -> + emqx_client:stop(Pid), + {error, {C, E, S}} + end; + {error, _} = Error -> Error + end. + +stop(Pid) -> + emqx_client:disconnect(Pid). + +make_msg_handler(RequestHandler, Parent) -> + #{publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end, + puback => fun(_Ack) -> ok end, + disconnected => fun(_Reason) -> ok end + }. + +handle_msg(ReqMsg, RequestHandler, Parent) -> + #{qos := QoS, properties := Props, payload := ReqPayload} = ReqMsg, + case maps:find('Response-Topic', Props) of + {ok, RspTopic} when RspTopic =/= <<>> -> + CorrData = maps:get('Correlation-Data', Props), + RspProps = maps:without(['Response-Topic'], Props), + RspPayload = RequestHandler(CorrData, ReqPayload), + RspMsg = #mqtt_msg{qos = QoS, + topic = RspTopic, + props = RspProps, + payload = RspPayload + }, + emqx_logger:debug("~p sending response msg to topic ~s with~n" + "corr-data=~p~npayload=~p", + [?MODULE, RspTopic, CorrData, RspPayload]), + ok = send_response(RspMsg); + _ -> + Parent ! {discarded, ReqPayload}, + ok + end. + +send_response(Msg) -> + %% This function is evaluated by emqx_client itself. + %% hence delegate to another temp process for the loopback gen_statem call. + Client = self(), + _ = spawn_link(fun() -> + case emqx_client:publish(Client, Msg) of + ok -> ok; + {ok, _} -> ok; + {error, Reason} -> exit({failed_to_publish_response, Reason}) + end + end), + ok. + +subscribe(Client, Topic, QoS) -> + {ok, _Props, _QoS} = + emqx_client:subscribe(Client, [{Topic, [{rh, 2}, {rap, false}, + {nl, true}, {qos, QoS}]}]), + ok. diff --git a/test/emqx_request_responser_SUITE.erl b/test/emqx_request_responser_SUITE.erl new file mode 100644 index 000000000..4912f72ea --- /dev/null +++ b/test/emqx_request_responser_SUITE.erl @@ -0,0 +1,69 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_request_responser_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +all() -> + [request_response]. + +request_response(_Config) -> + request_response_per_qos(?QOS_0), + request_response_per_qos(?QOS_1), + request_response_per_qos(?QOS_2). + +request_response_per_qos(QoS) -> + ReqTopic = <<"request_topic">>, + RspTopic = <<"response_topic">>, + {ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS, + [{proto_ver, v5}, + {client_id, <<"requester">>}, + {properties, #{ 'Request-Response-Information' => 1}}]), + %% This is a square service + Square = fun(_CorrData, ReqBin) -> + I = b2i(ReqBin), + i2b(I * I) + end, + {ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square, + [{proto_ver, v5}, + {client_id, <<"responser">>} + ]), + ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS), + receive + {response, <<"corr-1">>, <<"4">>} -> + ok; + Other -> + erlang:error({unexpected, Other}) + after + 100 -> + erlang:error(timeout) + end, + ok = emqx_request_sender:stop(Requester), + ok = emqx_request_handler:stop(Responser). + +b2i(B) -> binary_to_integer(B). +i2b(I) -> integer_to_binary(I). diff --git a/test/emqx_request_sender.erl b/test/emqx_request_sender.erl new file mode 100644 index 000000000..27f3a45ac --- /dev/null +++ b/test/emqx_request_sender.erl @@ -0,0 +1,77 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_request_sender). + +-export([start_link/3, stop/1, send/6]). + +-include("emqx_mqtt.hrl"). + +start_link(ResponseTopic, QoS, Options0) -> + Parent = self(), + MsgHandler = make_msg_handler(Parent), + Options = [{msg_handler, MsgHandler} | Options0], + case emqx_client:start_link(Options) of + {ok, Pid} -> + {ok, _} = emqx_client:connect(Pid), + try subscribe(Pid, ResponseTopic, QoS) of + ok -> {ok, Pid}; + {error, _} = Error -> Error + catch + C : E : S -> + emqx_client:stop(Pid), + {error, {C, E, S}} + end; + {error, _} = Error -> Error + end. + +%% @doc Send a message to request topic with correlation-data `CorrData'. +%% Response should be delivered as a `{response, CorrData, Payload}' +send(Client, ReqTopic, RspTopic, CorrData, Payload, QoS) -> + Props = #{'Response-Topic' => RspTopic, + 'Correlation-Data' => CorrData + }, + Msg = #mqtt_msg{qos = QoS, + topic = ReqTopic, + props = Props, + payload = Payload + }, + case emqx_client:publish(Client, Msg) of + ok -> ok; %% QoS = 0 + {ok, _} -> ok; + {error, _} = E -> E + end. + +stop(Pid) -> + emqx_client:disconnect(Pid). + +subscribe(Client, Topic, QoS) -> + case emqx_client:subscribe(Client, Topic, QoS) of + {ok, _, _} -> ok; + {error, _} = Error -> Error + end. + +make_msg_handler(Parent) -> + #{publish => fun(Msg) -> handle_msg(Msg, Parent) end, + puback => fun(_Ack) -> ok end, + disconnected => fun(_Reason) -> ok end + }. + +handle_msg(Msg, Parent) -> + #{properties := Props, payload := Payload} = Msg, + CorrData = maps:get('Correlation-Data', Props), + Parent ! {response, CorrData, Payload}, + ok. diff --git a/test/emqx_rpc_SUITE.erl b/test/emqx_rpc_SUITE.erl new file mode 100644 index 000000000..62b597e1e --- /dev/null +++ b/test/emqx_rpc_SUITE.erl @@ -0,0 +1,124 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rpc_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +t_prop_rpc(_) -> + ok = load(), + Opts = [{to_file, user}, {numtests, 10}], + {ok, _Apps} = application:ensure_all_started(gen_rpc), + ok = application:set_env(gen_rpc, call_receive_timeout, 1), + ok = emqx_logger:set_log_level(emergency), + ?assert(proper:quickcheck(prop_node(), Opts)), + ?assert(proper:quickcheck(prop_nodes(), Opts)), + ok = application:stop(gen_rpc), + ok = unload(). + +prop_node() -> + ?FORALL(Node, nodename(), + begin + ?assert(emqx_rpc:cast(Node, erlang, system_time, [])), + case emqx_rpc:call(Node, erlang, system_time, []) of + {badrpc, _Reason} -> true; + Delivery when is_integer(Delivery) -> true; + _Other -> false + end + end). + +prop_nodes() -> + ?FORALL(Nodes, nodesname(), + begin + case emqx_rpc:multicall(Nodes, erlang, system_time, []) of + {badrpc, _Reason} -> true; + {RealResults, RealBadNodes} + when is_list(RealResults); + is_list(RealBadNodes) -> + true; + _Other -> false + end + end). + +%%-------------------------------------------------------------------- +%% helper +%%-------------------------------------------------------------------- + +load() -> + ok = meck:new(gen_rpc, [passthrough, no_history]), + ok = meck:expect(gen_rpc, multicall, + fun(Nodes, Mod, Fun, Args) -> + gen_rpc:multicall(Nodes, Mod, Fun, Args, 1) + end). + +unload() -> + ok = meck:unload(gen_rpc). + +%%-------------------------------------------------------------------- +%% Generator +%%-------------------------------------------------------------------- + +nodename() -> + ?LET({NodePrefix, HostName}, + {node_prefix(), hostname()}, + begin + Node = NodePrefix ++ "@" ++ HostName, + list_to_atom(Node) + end). + +nodesname() -> + oneof([list(nodename()), ["emqxct@127.0.0.1"]]). + +node_prefix() -> + oneof(["emqxct", text_like()]). + +text_like() -> + ?SUCHTHAT(Text, list(range($a, $z)), (length(Text) =< 5 andalso length(Text) > 0)). + +hostname() -> + oneof([ipv4_address(), ipv6_address(), "127.0.0.1", "localhost"]). + +ipv4_address() -> + ?LET({Num1, Num2, Num3, Num4}, + { choose(0, 255) + , choose(0, 255) + , choose(0, 255) + , choose(0, 255)}, + make_ip([Num1, Num2, Num3, Num4], ipv4)). + +ipv6_address() -> + ?LET({Num1, Num2, Num3, Num4, Num5, Num6}, + { choose(0, 65535) + , choose(0, 65535) + , choose(0, 65535) + , choose(0, 65535) + , choose(0, 65535) + , choose(0, 65535)}, + make_ip([Num1, Num2, Num3, Num4, Num5, Num6], ipv6)). + + +make_ip(NumList, ipv4) when is_list(NumList) -> + string:join([integer_to_list(Num) || Num <- NumList], "."); +make_ip(NumList, ipv6) when is_list(NumList) -> + string:join([integer_to_list(Num) || Num <- NumList], ":"); +make_ip(_List, _protocol) -> + "127.0.0.1". diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 8736d69b7..224b8afaf 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -19,87 +19,333 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx_mqtt.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). +-define(mock_modules, + [ emqx_metrics + , emqx_broker + , emqx_misc + , emqx_message + , emqx_hooks + , emqx_zone + , emqx_pd + ]). + all() -> emqx_ct:all(?MODULE). -init_per_suite(Config) -> - emqx_ct_helpers:start_apps([]), - Config. +t_proper_session(_) -> + Opts = [{numtests, 1000}, {to_file, user}], + ok = emqx_logger:set_log_level(emergency), + ok = before_proper(), + ?assert(proper:quickcheck(prop_session(), Opts)), + ok = after_proper(). -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). +before_proper() -> + load(?mock_modules). -t_info(_) -> - 'TODO'. +after_proper() -> + unload(?mock_modules), + emqx_logger:set_log_level(error). -t_attrs(_) -> - 'TODO'. +prop_session() -> + ?FORALL({Session, OpList}, {session(), session_op_list()}, + begin + try + apply_ops(Session, OpList), + true + after + true + end + end). -t_stats(_) -> - 'TODO'. +%%%%%%%%%%%%%%% +%%% Helpers %%% +%%%%%%%%%%%%%%% -t_subscribe(_) -> - 'TODO'. +apply_ops(Session, []) -> + ?assertEqual(session, element(1, Session)); +apply_ops(Session, [Op | Rest]) -> + NSession = apply_op(Session, Op), + apply_ops(NSession, Rest). -t_unsubscribe(_) -> - 'TODO'. +apply_op(Session, info) -> + Info = emqx_session:info(Session), + ?assert(is_map(Info)), + ?assertEqual(16, maps:size(Info)), + Session; +apply_op(Session, attrs) -> + Attrs = emqx_session:attrs(Session), + ?assert(is_map(Attrs)), + ?assertEqual(3, maps:size(Attrs)), + Session; +apply_op(Session, stats) -> + Stats = emqx_session:stats(Session), + ?assert(is_list(Stats)), + ?assertEqual(9, length(Stats)), + Session; +apply_op(Session, {info, InfoArg}) -> + _Ret = emqx_session:info(InfoArg, Session), + Session; +apply_op(Session, {subscribe, {Client, TopicFilter, SubOpts}}) -> + case emqx_session:subscribe(Client, TopicFilter, SubOpts, Session) of + {ok, NSession} -> + NSession; + {error, ?RC_QUOTA_EXCEEDED} -> + Session + end; +apply_op(Session, {unsubscribe, {Client, TopicFilter}}) -> + case emqx_session:unsubscribe(Client, TopicFilter, Session) of + {ok, NSession} -> + NSession; + {error, ?RC_NO_SUBSCRIPTION_EXISTED} -> + Session + end; +apply_op(Session, {publish, {PacketId, Msg}}) -> + case emqx_session:publish(PacketId, Msg, Session) of + {ok, _Msg} -> + Session; + {ok, _Deliver, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {puback, PacketId}) -> + case emqx_session:puback(PacketId, Session) of + {ok, _Msg} -> + Session; + {ok, _Deliver, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {pubrec, PacketId}) -> + case emqx_session:pubrec(PacketId, Session) of + {ok, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {pubrel, PacketId}) -> + case emqx_session:pubrel(PacketId, Session) of + {ok, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {pubcomp, PacketId}) -> + case emqx_session:pubcomp(PacketId, Session) of + {ok, _Msgs} -> + Session; + {ok, _Msgs, NSession} -> + NSession; + {error, _ErrorCode} -> + Session + end; +apply_op(Session, {deliver, Delivers}) -> + {ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session), + NSession; +apply_op(Session, {timeout, {TRef, TimeoutMsg}}) -> + case emqx_session:timeout(TRef, TimeoutMsg, Session) of + {ok, NSession} -> + NSession; + {ok, _Msg, NSession} -> + NSession + end. -t_publish(_) -> - 'TODO'. +%%%%%%%%%%%%%%%%%% +%%% Generators %%% +%%%%%%%%%%%%%%%%%% +session_op_list() -> + Union = [info, + attrs, + stats, + {info, info_args()}, + {subscribe, sub_args()}, + {unsubscribe, unsub_args()}, + {publish, publish_args()}, + {puback, puback_args()}, + {pubrec, pubrec_args()}, + {pubrel, pubrel_args()}, + {pubcomp, pubcomp_args()}, + {deliver, deliver_args()}, + {timeout, timeout_args()} + ], + list(?LAZY(oneof(Union))). -t_puback(_) -> - 'TODO'. +deliver_args() -> + list({deliver, topic(), message()}). -t_pubrec(_) -> - 'TODO'. +timeout_args() -> + {tref(), timeout_msg()}. -t_pubrel(_) -> - 'TODO'. +info_args() -> + oneof([clean_start, + subscriptions, + max_subscriptions, + upgrade_qos, + inflight, + max_inflight, + retry_interval, + mqueue_len, + max_mqueue, + mqueue_dropped, + next_pkt_id, + awaiting_rel, + max_awaiting_rel, + await_rel_timeout, + expiry_interval, + created_at + ]). -t_pubcomp(_) -> - 'TODO'. +sub_args() -> + ?LET({ClientId, TopicFilter, SubOpts}, + {clientid(), topic(), sub_opts()}, + {#{client_id => ClientId}, TopicFilter, SubOpts}). -t_deliver(_) -> - 'TODO'. +unsub_args() -> + ?LET({ClientId, TopicFilter}, + {clientid(), topic()}, + {#{client_id => ClientId}, TopicFilter}). -t_timeout(_) -> - 'TODO'. +publish_args() -> + ?LET({PacketId, Message}, + {packetid(), message()}, + {PacketId, Message}). -ignore_loop(_Config) -> - emqx_zone:set_env(external, ignore_loop_deliver, true), - {ok, Client} = emqx_client:start_link(), - {ok, _} = emqx_client:connect(Client), - TestTopic = <<"Self">>, - {ok, _, [2]} = emqx_client:subscribe(Client, TestTopic, qos2), - ok = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 0), - {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 1), - {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 2), - ?assertEqual(0, length(emqx_client_SUITE:receive_messages(3))), - ok = emqx_client:disconnect(Client), - emqx_zone:set_env(external, ignore_loop_deliver, false). +puback_args() -> + packetid(). -session_all(_) -> - emqx_zone:set_env(internal, idle_timeout, 1000), - ClientId = <<"ClientId">>, - {ok, ConnPid} = emqx_mock_client:start_link(ClientId), - {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), - Message1 = emqx_message:make(<<"ClientId">>, 2, <<"topic">>, <<"hello">>), - emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 2}}]), - emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 1}}]), - timer:sleep(200), - [{<<"topic">>, _}] = emqx:subscriptions(SPid), - emqx_session:publish(SPid, 1, Message1), - timer:sleep(200), - [{publish, 1, _}] = emqx_mock_client:get_last_message(ConnPid), - Attrs = emqx_session:attrs(SPid), - Info = emqx_session:info(SPid), - Stats = emqx_session:stats(SPid), - ClientId = proplists:get_value(client_id, Attrs), - ClientId = proplists:get_value(client_id, Info), - 1 = proplists:get_value(subscriptions_count, Stats), - emqx_session:unsubscribe(SPid, [<<"topic">>]), - timer:sleep(200), - [] = emqx:subscriptions(SPid), - emqx_mock_client:close_session(ConnPid). +pubrec_args() -> + packetid(). +pubrel_args() -> + packetid(). + +pubcomp_args() -> + packetid(). + +timeout_msg() -> + oneof([retry_delivery, check_awaiting_rel]). + +tref() -> oneof([tref, undefined]). + +sub_opts() -> + ?LET({RH, RAP, NL, QOS, SHARE, SUBID}, + {rh(), rap(), nl(), qos(), share(), subid()} + , make_subopts(RH, RAP, NL, QOS, SHARE, SUBID)). + +message() -> + ?LET({QoS, Topic, Payload}, + {qos(), topic(), payload()}, + emqx_message:make(proper, QoS, Topic, Payload)). + +subid() -> integer(). + +rh() -> oneof([0, 1, 2]). + +rap() -> oneof([0, 1]). + +nl() -> oneof([0, 1]). + +qos() -> oneof([0, 1, 2]). + +share() -> binary(). + +clientid() -> binary(). + +topic() -> ?LET(No, choose(1, 10), + begin + NoBin = integer_to_binary(No), + <<"topic/", NoBin/binary>> + end). + +payload() -> binary(). + +packetid() -> choose(1, 30). + +zone() -> + ?LET(Zone, [{max_subscriptions, max_subscription()}, + {upgrade_qos, upgrade_qos()}, + {retry_interval, retry_interval()}, + {max_awaiting_rel, max_awaiting_rel()}, + {await_rel_timeout, await_rel_timeout()}] + , maps:from_list(Zone)). + +max_subscription() -> + frequency([{33, 0}, + {33, 1}, + {34, choose(0,10)}]). + +upgrade_qos() -> bool(). + +retry_interval() -> ?LET(Interval, choose(0, 20), Interval*1000). + +max_awaiting_rel() -> choose(0, 10). + +await_rel_timeout() -> ?LET(Interval, choose(0, 150), Interval*1000). + +max_inflight() -> choose(0, 10). + +expiry_interval() -> ?LET(EI, choose(1, 10), EI * 3600). + +option() -> + ?LET(Option, [{max_inflight, max_inflight()}, + {expiry_interval, expiry_interval()}] + , maps:from_list(Option)). + +cleanstart() -> bool(). + +session() -> + ?LET({CleanStart, Zone, Options}, + {cleanstart(), zone(), option()}, + begin + Session = emqx_session:init(CleanStart, #{zone => Zone}, Options), + emqx_session:set_pkt_id(Session, 16#ffff) + end). + +%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Internal functions %%% +%%%%%%%%%%%%%%%%%%%%%%%%%% + +make_subopts(RH, RAP, NL, QOS, SHARE, SubId) -> + #{rh => RH, + rap => RAP, + nl => NL, + qos => QOS, + share => SHARE, + subid => SubId}. + + +load(Modules) -> + [mock(Module) || Module <- Modules], + ok. + +unload(Modules) -> + lists:foreach(fun(Module) -> + ok = meck:unload(Module) + end, Modules). + +mock(Module) -> + ok = meck:new(Module, [passthrough, no_history]), + do_mock(Module). + +do_mock(emqx_metrics) -> + meck:expect(emqx_metrics, inc, fun(_Anything) -> ok end); +do_mock(emqx_broker) -> + meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), + meck:expect(emqx_broker, set_subopts, fun(_, _) -> ok end), + meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end), + meck:expect(emqx_broker, publish, fun(_) -> ok end); +do_mock(emqx_misc) -> + meck:expect(emqx_misc, start_timer, fun(_, _) -> tref end); +do_mock(emqx_message) -> + meck:expect(emqx_message, set_header, fun(_Hdr, _Val, Msg) -> Msg end), + meck:expect(emqx_message, is_expired, fun(_Msg) -> (rand:uniform(16) > 8) end); +do_mock(emqx_hooks) -> + meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end); +do_mock(emqx_zone) -> + meck:expect(emqx_zone, get_env, fun(Env, Key, Default) -> maps:get(Key, Env, Default) end); +do_mock(emqx_pd) -> + meck:expect(emqx_pd, update_counter, fun(_stats, _num) -> ok end). diff --git a/test/emqx_sys_SUITE.erl b/test/emqx_sys_SUITE.erl new file mode 100644 index 000000000..26ec26fc2 --- /dev/null +++ b/test/emqx_sys_SUITE.erl @@ -0,0 +1,136 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_sys_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(mock_modules, + [ emqx_metrics + , emqx_stats + , emqx_broker + , ekka_mnesia + ]). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + application:load(emqx), + ok = application:set_env(emqx, broker_sys_interval, 1), + ok = application:set_env(emqx, broker_sys_heartbeat, 1), + ok = emqx_logger:set_log_level(emergency), + Config. + +end_per_suite(_Config) -> + application:unload(emqx), + ok = emqx_logger:set_log_level(error), + ok. + +t_prop_sys(_) -> + Opts = [{numtests, 100}, {to_file, user}], + ok = load(?mock_modules), + ?assert(proper:quickcheck(prop_sys(), Opts)), + ok = unload(?mock_modules). + +prop_sys() -> + ?FORALL(Cmds, commands(?MODULE), + begin + {ok, _Pid} = emqx_sys:start_link(), + {History, State, Result} = run_commands(?MODULE, Cmds), + ok = emqx_sys:stop(), + ?WHENFAIL(io:format("History: ~p\nState: ~p\nResult: ~p\n", + [History,State,Result]), + aggregate(command_names(Cmds), true)) + end). + +load(Modules) -> + [mock(Module) || Module <- Modules], + ok. + +unload(Modules) -> + lists:foreach(fun(Module) -> + ok = meck:unload(Module) + end, Modules). + +mock(Module) -> + ok = meck:new(Module, [passthrough, no_history]), + do_mock(Module). + +do_mock(emqx_broker) -> + meck:expect(emqx_broker, publish, + fun(Msg) -> {node(), <<"test">>, Msg} end), + meck:expect(emqx_broker, safe_publish, + fun(Msg) -> {node(), <<"test">>, Msg} end); +do_mock(emqx_stats) -> + meck:expect(emqx_stats, getstats, fun() -> [0] end); +do_mock(ekka_mnesia) -> + meck:expect(ekka_mnesia, running_nodes, fun() -> [node()] end); +do_mock(emqx_metrics) -> + meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end). + +unmock() -> + meck:unload(emqx_broker). + +%%%%%%%%%%%%% +%%% MODEL %%% +%%%%%%%%%%%%% +%% @doc Initial model value at system start. Should be deterministic. +initial_state() -> + #{}. + +%% @doc List of possible commands to run against the system +command(_State) -> + oneof([{call, emqx_sys, info, []}, + {call, emqx_sys, version, []}, + {call, emqx_sys, uptime, []}, + {call, emqx_sys, datetime, []}, + {call, emqx_sys, sysdescr, []}, + {call, emqx_sys, sys_interval, []}, + {call, emqx_sys, sys_heatbeat_interval, []}, + %------------ unexpected message ----------------------% + {call, emqx_sys, handle_call, [emqx_sys, other, state]}, + {call, emqx_sys, handle_cast, [emqx_sys, other]}, + {call, emqx_sys, handle_info, [info, state]} + ]). + +precondition(_State, {call, _Mod, _Fun, _Args}) -> + timer:sleep(1), + true. + +postcondition(_State, {call, emqx_sys, info, []}, Info) -> + is_list(Info) andalso length(Info) =:= 4; +postcondition(_State, {call, emqx_sys, version, []}, Version) -> + is_list(Version); +postcondition(_State, {call, emqx_sys, uptime, []}, Uptime) -> + is_list(Uptime); +postcondition(_State, {call, emqx_sys, datetime, []}, Datetime) -> + is_list(Datetime); +postcondition(_State, {call, emqx_sys, sysdescr, []}, Sysdescr) -> + is_list(Sysdescr); +postcondition(_State, {call, emqx_sys, sys_interval, []}, SysInterval) -> + is_integer(SysInterval) andalso SysInterval > 0; +postcondition(_State, {call, emqx_sys, sys_heartbeat_interval, []}, SysHeartInterval) -> + is_integer(SysHeartInterval) andalso SysHeartInterval > 0; +postcondition(_State, {call, _Mod, _Fun, _Args}, _Res) -> + true. + +next_state(State, _Res, {call, _Mod, _Fun, _Args}) -> + NewState = State, + NewState. diff --git a/test/prop_emqx_session.erl b/test/prop_emqx_session.erl deleted file mode 100644 index 5e137ee12..000000000 --- a/test/prop_emqx_session.erl +++ /dev/null @@ -1,327 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(prop_emqx_session). - --include("emqx_mqtt.hrl"). --include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). - --define(mock_modules, - [ emqx_metrics - , emqx_broker - , emqx_misc - , emqx_message - , emqx_hooks - , emqx_zone - , emqx_pd - ]). - --compile(export_all). --compile(nowarn_export_all). - -%%%%%%%%%%%%%%%%%% -%%% Properties %%% -%%%%%%%%%%%%%%%%%% -prop_session_pub(opts) -> [{numtests, 1000}]. - -prop_session_pub() -> - emqx_logger:set_log_level(emergency), - - ?SETUP(fun() -> - ok = load(?mock_modules), - fun() -> ok = unload(?mock_modules) end - end, - ?FORALL({Session, OpList}, {session(), session_op_list()}, - begin - try - apply_ops(Session, OpList), - true - after - ok - end - end)). - -%%%%%%%%%%%%%%% -%%% Helpers %%% -%%%%%%%%%%%%%%% - -apply_ops(Session, []) -> - ?assertEqual(session, element(1, Session)); -apply_ops(Session, [Op | Rest]) -> - NSession = apply_op(Session, Op), - apply_ops(NSession, Rest). - -apply_op(Session, info) -> - Info = emqx_session:info(Session), - ?assert(is_map(Info)), - ?assertEqual(16, maps:size(Info)), - Session; -apply_op(Session, attrs) -> - Attrs = emqx_session:attrs(Session), - ?assert(is_map(Attrs)), - ?assertEqual(3, maps:size(Attrs)), - Session; -apply_op(Session, stats) -> - Stats = emqx_session:stats(Session), - ?assert(is_list(Stats)), - ?assertEqual(9, length(Stats)), - Session; -apply_op(Session, {subscribe, {Client, TopicFilter, SubOpts}}) -> - case emqx_session:subscribe(Client, TopicFilter, SubOpts, Session) of - {ok, NSession} -> - NSession; - {error, ?RC_QUOTA_EXCEEDED} -> - Session - end; -apply_op(Session, {unsubscribe, {Client, TopicFilter}}) -> - case emqx_session:unsubscribe(Client, TopicFilter, Session) of - {ok, NSession} -> - NSession; - {error, ?RC_NO_SUBSCRIPTION_EXISTED} -> - Session - end; -apply_op(Session, {publish, {PacketId, Msg}}) -> - case emqx_session:publish(PacketId, Msg, Session) of - {ok, _Msg} -> - Session; - {ok, _Deliver, NSession} -> - NSession; - {error, _ErrorCode} -> - Session - end; -apply_op(Session, {puback, PacketId}) -> - case emqx_session:puback(PacketId, Session) of - {ok, _Msg} -> - Session; - {ok, _Deliver, NSession} -> - NSession; - {error, _ErrorCode} -> - Session - end; -apply_op(Session, {pubrec, PacketId}) -> - case emqx_session:pubrec(PacketId, Session) of - {ok, NSession} -> - NSession; - {error, _ErrorCode} -> - Session - end; -apply_op(Session, {pubrel, PacketId}) -> - case emqx_session:pubrel(PacketId, Session) of - {ok, NSession} -> - NSession; - {error, _ErrorCode} -> - Session - end; -apply_op(Session, {pubcomp, PacketId}) -> - case emqx_session:pubcomp(PacketId, Session) of - {ok, _Msgs} -> - Session; - {ok, _Msgs, NSession} -> - NSession; - {error, _ErrorCode} -> - Session - end; -apply_op(Session, {deliver, Delivers}) -> - {ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session), - NSession; -apply_op(Session, {timeout, {TRef, TimeoutMsg}}) -> - case emqx_session:timeout(TRef, TimeoutMsg, Session) of - {ok, NSession} -> - NSession; - {ok, _Msg, NSession} -> - NSession - end. - -%%%%%%%%%%%%%%%%%% -%%% Generators %%% -%%%%%%%%%%%%%%%%%% -session_op_list() -> - Union = [info, - attrs, - stats, - {subscribe, sub_args()}, - {unsubscribe, unsub_args()}, - {publish, publish_args()}, - {puback, puback_args()}, - {pubrec, pubrec_args()}, - {pubrel, pubrel_args()}, - {pubcomp, pubcomp_args()}, - {deliver, deliver_args()}, - {timeout, timeout_args()} - ], - list(?LAZY(oneof(Union))). - -deliver_args() -> - list({deliver, topic(), message()}). - -timeout_args() -> - {tref(), timeout_msg()}. - -sub_args() -> - ?LET({ClientId, TopicFilter, SubOpts}, - {clientid(), topic(), sub_opts()}, - {#{client_id => ClientId}, TopicFilter, SubOpts}). - -unsub_args() -> - ?LET({ClientId, TopicFilter}, - {clientid(), topic()}, - {#{client_id => ClientId}, TopicFilter}). - -publish_args() -> - ?LET({PacketId, Message}, - {packetid(), message()}, - {PacketId, Message}). - -puback_args() -> - packetid(). - -pubrec_args() -> - packetid(). - -pubrel_args() -> - packetid(). - -pubcomp_args() -> - packetid(). - -timeout_msg() -> - oneof([retry_delivery, check_awaiting_rel]). - -tref() -> oneof([tref, undefined]). - -sub_opts() -> - ?LET({RH, RAP, NL, QOS, SHARE, SUBID}, - {rh(), rap(), nl(), qos(), share(), subid()} - , make_subopts(RH, RAP, NL, QOS, SHARE, SUBID)). - -message() -> - ?LET({QoS, Topic, Payload}, - {qos(), topic(), payload()}, - emqx_message:make(proper, QoS, Topic, Payload)). - -subid() -> integer(). - -rh() -> oneof([0, 1, 2]). - -rap() -> oneof([0, 1]). - -nl() -> oneof([0, 1]). - -qos() -> oneof([0, 1, 2]). - -share() -> binary(). - -clientid() -> binary(). - -topic() -> ?LET(No, choose(1, 10), begin - NoBin = integer_to_binary(No), - <<"topic/", NoBin/binary>> - end). - -payload() -> binary(). - -packetid() -> choose(1, 30). - -zone() -> - ?LET(Zone, [{max_subscriptions, max_subscription()}, - {upgrade_qos, upgrade_qos()}, - {retry_interval, retry_interval()}, - {max_awaiting_rel, max_awaiting_rel()}, - {await_rel_timeout, await_rel_timeout()}] - , maps:from_list(Zone)). - -max_subscription() -> frequency([{33, 0}, - {33, 1}, - {34, choose(0,10)}]). - -upgrade_qos() -> bool(). - -retry_interval() -> ?LET(Interval, choose(0, 20), Interval*1000). - -max_awaiting_rel() -> choose(0, 10). - -await_rel_timeout() -> ?LET(Interval, choose(0, 150), Interval*1000). - -max_inflight() -> choose(0, 10). - -expiry_interval() -> ?LET(EI, choose(1, 10), EI * 3600). - -option() -> - ?LET(Option, [{max_inflight, max_inflight()}, - {expiry_interval, expiry_interval()}] - , maps:from_list(Option)). - -cleanstart() -> bool(). - -session() -> - ?LET({CleanStart, Zone, Options}, - {cleanstart(), zone(), option()}, - begin - Session = emqx_session:init(CleanStart, #{zone => Zone}, Options), - emqx_session:set_pkt_id(Session, 16#ffff) - end). - -%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Internal functions %%% -%%%%%%%%%%%%%%%%%%%%%%%%%% - -make_subopts(RH, RAP, NL, QOS, SHARE, SubId) -> - #{rh => RH, - rap => RAP, - nl => NL, - qos => QOS, - share => SHARE, - subid => SubId}. - - -load(Modules) -> - [mock(Module) || Module <- Modules], - ok. - -unload(Modules) -> - lists:foreach(fun(Module) -> - ok = meck:unload(Module) - end, Modules), - ok. - -mock(Module) -> - ok = meck:new(Module, [passthrough, no_history]), - do_mock(Module, expect(Module)). - -do_mock(emqx_metrics, Expect) -> - Expect(inc, fun(_Anything) -> ok end); -do_mock(emqx_broker, Expect) -> - Expect(subscribe, fun(_, _, _) -> ok end), - Expect(set_subopts, fun(_, _) -> ok end), - Expect(unsubscribe, fun(_) -> ok end), - Expect(publish, fun(_) -> ok end); -do_mock(emqx_misc, Expect) -> - Expect(start_timer, fun(_, _) -> tref end); -do_mock(emqx_message, Expect) -> - Expect(set_header, fun(_Hdr, _Val, Msg) -> Msg end), - Expect(is_expired, fun(_Msg) -> (rand:uniform(16) > 8) end); -do_mock(emqx_hooks, Expect) -> - Expect(run, fun(_Hook, _Args) -> ok end); -do_mock(emqx_zone, Expect) -> - Expect(get_env, fun(Env, Key, Default) -> maps:get(Key, Env, Default) end); -do_mock(emqx_pd, Expect) -> - Expect(update_counter, fun(_stats, _num) -> ok end). - -expect(Module) -> - fun(OldFun, NewFun) -> - ok = meck:expect(Module, OldFun, NewFun) - end.