diff --git a/.ci/build_packages/tests.sh b/.ci/build_packages/tests.sh index 1f9ca2651..bbc107d37 100755 --- a/.ci/build_packages/tests.sh +++ b/.ci/build_packages/tests.sh @@ -31,7 +31,7 @@ emqx_test(){ echo "running ${packagename} start" "${PACKAGE_PATH}"/emqx/bin/emqx start || ( tail "${PACKAGE_PATH}"/emqx/log/emqx.log.1 && exit 1 ) IDLE_TIME=0 - while [ -z "$("${PACKAGE_PATH}"/emqx/bin/emqx_ctl status |grep 'is running'|awk '{print $1}')" ] + while ! "${PACKAGE_PATH}"/emqx/bin/emqx_ctl status | grep -qE 'Node\s.*@.*\sis\sstarted' do if [ $IDLE_TIME -gt 10 ] then @@ -103,7 +103,7 @@ running_test(){ emqx start || ( tail /var/log/emqx/emqx.log.1 && exit 1 ) IDLE_TIME=0 - while [ -z "$(emqx_ctl status |grep 'is running'|awk '{print $1}')" ] + while ! emqx_ctl status | grep -qE 'Node\s.*@.*\sis\sstarted' do if [ $IDLE_TIME -gt 10 ] then @@ -121,7 +121,7 @@ running_test(){ || [ "$(sed -n '/^ID=/p' /etc/os-release | sed -r 's/ID=(.*)/\1/g' | sed 's/"//g')" = debian ] ;then service emqx start || ( tail /var/log/emqx/emqx.log.1 && exit 1 ) IDLE_TIME=0 - while [ -z "$(emqx_ctl status |grep 'is running'|awk '{print $1}')" ] + while ! emqx_ctl status | grep -E 'Node\s.*@.*\sis\sstarted' do if [ $IDLE_TIME -gt 10 ] then diff --git a/.ci/fvt_tests/relup.lux b/.ci/fvt_tests/relup.lux index b75e0fa94..cbecb9e14 100644 --- a/.ci/fvt_tests/relup.lux +++ b/.ci/fvt_tests/relup.lux @@ -1,3 +1,4 @@ +[config var=PROFILE] [config var=PACKAGE_PATH] [config var=BENCH_PATH] [config var=ONE_MORE_EMQX_PATH] @@ -21,7 +22,7 @@ [shell emqx] !cd $PACKAGE_PATH - !unzip -q -o emqx-ubuntu20.04-$(echo $old_vsn | sed -r 's/[v|e]//g')-amd64.zip + !unzip -q -o $PROFILE-ubuntu20.04-$(echo $old_vsn | sed -r 's/[v|e]//g')-amd64.zip ?SH-PROMPT !cd emqx @@ -33,8 +34,8 @@ [shell emqx2] !cd $PACKAGE_PATH - !cp -f $ONE_MORE_EMQX_PATH/one_more_emqx.sh . - !./one_more_emqx.sh emqx2 + !cp -f $ONE_MORE_EMQX_PATH/one_more_$(echo $PROFILE | sed 's/-/_/g').sh . + !./one_more_$(echo $PROFILE | sed 's/-/_/g').sh emqx2 ?SH-PROMPT !cd emqx2 @@ -75,7 +76,7 @@ ???sent [shell emqx] - !cp -f ../emqx-ubuntu20.04-$VSN-amd64.zip releases/ + !cp -f ../$PROFILE-ubuntu20.04-$VSN-amd64.zip releases/ !./bin/emqx install $VSN ?SH-PROMPT !./bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]" @@ -90,7 +91,7 @@ ?SH-PROMPT [shell emqx2] - !cp -f ../emqx-ubuntu20.04-$VSN-amd64.zip releases/ + !cp -f ../$PROFILE-ubuntu20.04-$VSN-amd64.zip releases/ !./bin/emqx install $VSN ?SH-PROMPT !./bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]" @@ -111,13 +112,6 @@ ???{"data":600,"code":0} ?SH-PROMPT -[shell http_server] - !http_server:stop(). - ?ok - ?> - !halt(3). - ?SH-PROMPT: - [shell emqx2] !cat log/emqx.log.1 |grep -v 691c29ba |tail -n 100 -error @@ -142,6 +136,13 @@ !rm -rf $PACKAGE_PATH/emqx ?SH-PROMPT: +[shell http_server] + !http_server:stop(). + ?ok + ?> + !halt(3). + ?SH-PROMPT: + [endloop] [cleanup] diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml index eea7acde5..12f9da95c 100644 --- a/.github/workflows/run_fvt_tests.yaml +++ b/.github/workflows/run_fvt_tests.yaml @@ -16,6 +16,10 @@ jobs: steps: - uses: actions/checkout@v1 + - uses: gleam-lang/setup-erlang@v1.1.2 + id: install_erlang + with: + otp-version: 23.2 - name: prepare run: | if make emqx-ee --dry-run > /dev/null 2>&1; then @@ -52,7 +56,7 @@ jobs: output=$(docker exec -i node1.emqx.io bash -c "cat data/loaded_plugins" | tail -n1) if [ "$expected" != "$output" ]; then exit 1 - fi + fi - name: make paho tests run: | if ! docker exec -i python /scripts/pytest.sh; then @@ -66,6 +70,10 @@ jobs: steps: - uses: actions/checkout@v1 + - uses: gleam-lang/setup-erlang@v1.1.2 + id: install_erlang + with: + otp-version: 23.2 - name: prepare run: | if make emqx-ee --dry-run > /dev/null 2>&1; then @@ -252,10 +260,11 @@ jobs: set -e -x -u if [ -n "$OLD_VSNS" ]; then mkdir -p packages - cp emqx/_packages/emqx/*.zip packages + cp emqx/_packages/${PROFILE}/*.zip packages cp emqx/_upgrade_base/*.zip packages lux -v \ --timeout 600000 \ + --var PROFILE=$PROFILE \ --var PACKAGE_PATH=$(pwd)/packages \ --var BENCH_PATH=$(pwd)/emqtt-bench \ --var ONE_MORE_EMQX_PATH=$(pwd)/one_more_emqx \ diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 430e73594..07afdcde2 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -128,6 +128,9 @@ jobs: printenv > .env docker exec -i erlang bash -c "make cover" docker exec --env-file .env -i erlang bash -c "make coveralls" + - name: cat rebar.crashdump + if: failure() + run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump' fi - uses: actions/upload-artifact@v1 if: failure() with: diff --git a/.tool-versions b/.tool-versions index e7e734a7b..b87853803 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1 @@ -erlang 23.2.7.2-emqx-1 +erlang 24.0.1-emqx-1 diff --git a/apps/emqx_auth_http/etc/emqx_auth_http.conf b/apps/emqx_auth_http/etc/emqx_auth_http.conf index d4e62ce9c..56e2055c0 100644 --- a/apps/emqx_auth_http/etc/emqx_auth_http.conf +++ b/apps/emqx_auth_http/etc/emqx_auth_http.conf @@ -97,6 +97,7 @@ auth.http.acl_req.headers.content-type = "application/x-www-form-urlencoded" ## When the request method is POST, the final format is determined by content-type ## ## Available Variables: +## - %A: access (1 - subscribe, 2 - publish) ## - %u: username ## - %c: clientid ## - %a: ipaddress @@ -105,6 +106,7 @@ auth.http.acl_req.headers.content-type = "application/x-www-form-urlencoded" ## - %p: sockport of server accepted ## - %C: common name of client TLS cert ## - %d: subject of client TLS cert +## - %t: topic ## ## Value: =,=,... auth.http.acl_req.params = "access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m" diff --git a/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl b/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl index f123d1037..d0a4a34a0 100644 --- a/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl +++ b/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl @@ -37,11 +37,11 @@ groups() -> ]. init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx, emqx_auth_jwt], fun set_special_configs/1), + emqx_ct_helpers:start_apps([emqx_auth_jwt], fun set_special_configs/1), Config. end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([emqx_auth_jwt, emqx]). + emqx_ct_helpers:stop_apps([emqx_auth_jwt]). set_special_configs(emqx) -> application:set_env(emqx, allow_anonymous, false), @@ -97,6 +97,8 @@ t_check_auth(_) -> t_check_claims(_) -> application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]), + application:stop(emqx_auth_jwt), application:start(emqx_auth_jwt), + Plain = #{clientid => <<"client1">>, username => <<"plain">>, zone => external}, Jwt = sign([{client_id, <<"client1">>}, {username, <<"plain">>}, @@ -113,8 +115,9 @@ t_check_claims(_) -> t_check_claims_clientid(_) -> application:set_env(emqx_auth_jwt, verify_claims, [{clientid, <<"%c">>}]), + application:stop(emqx_auth_jwt), application:start(emqx_auth_jwt), Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external}, - Jwt = sign([{client_id, <<"client23">>}, + Jwt = sign([{clientid, <<"client23">>}, {username, <<"plain">>}, {exp, os:system_time(seconds) + 3}], <<"HS256">>, <<"emqxsecret">>), Result0 = emqx_access_control:authenticate(Plain#{password => Jwt}), @@ -128,6 +131,8 @@ t_check_claims_clientid(_) -> t_check_claims_username(_) -> application:set_env(emqx_auth_jwt, verify_claims, [{username, <<"%u">>}]), + application:stop(emqx_auth_jwt), application:start(emqx_auth_jwt), + Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external}, Jwt = sign([{client_id, <<"client23">>}, {username, <<"plain">>}, diff --git a/apps/emqx_coap/test/emqx_coap_SUITE.erl b/apps/emqx_coap/test/emqx_coap_SUITE.erl index ed59b309c..444bcc064 100644 --- a/apps/emqx_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_coap/test/emqx_coap_SUITE.erl @@ -28,16 +28,16 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_coap], fun set_sepecial_cfg/1), + emqx_ct_helpers:start_apps([emqx_coap], fun set_special_cfg/1), Config. -set_sepecial_cfg(emqx_coap) -> +set_special_cfg(emqx_coap) -> Opts = application:get_env(emqx_coap, dtls_opts,[]), Opts2 = [{keyfile, emqx_ct_helpers:deps_path(emqx, "etc/certs/key.pem")}, {certfile, emqx_ct_helpers:deps_path(emqx, "etc/certs/cert.pem")}], application:set_env(emqx_coap, dtls_opts, emqx_misc:merge_opts(Opts, Opts2)), application:set_env(emqx_coap, enable_stats, true); -set_sepecial_cfg(_) -> +set_special_cfg(_) -> ok. end_per_suite(Config) -> diff --git a/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl b/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl index 7f49ece7b..1aaf6cb69 100644 --- a/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl +++ b/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl @@ -28,12 +28,12 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_coap], fun set_sepecial_cfg/1), + emqx_ct_helpers:start_apps([emqx_coap], fun set_special_cfg/1), Config. -set_sepecial_cfg(emqx_coap) -> +set_special_cfg(emqx_coap) -> application:set_env(emqx_coap, enable_stats, true); -set_sepecial_cfg(_) -> +set_special_cfg(_) -> ok. end_per_suite(Config) -> diff --git a/apps/emqx_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl index f1fdfa9f8..70484c30e 100644 --- a/apps/emqx_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl @@ -55,7 +55,7 @@ metrics() -> init_per_group(GrpName, Cfg) -> put(grpname, GrpName), Svrs = emqx_exproto_echo_svr:start(), - emqx_ct_helpers:start_apps([emqx_exproto], fun set_sepecial_cfg/1), + emqx_ct_helpers:start_apps([emqx_exproto], fun set_special_cfg/1), emqx_logger:set_log_level(debug), [{servers, Svrs}, {listener_type, GrpName} | Cfg]. @@ -63,7 +63,7 @@ end_per_group(_, Cfg) -> emqx_ct_helpers:stop_apps([emqx_exproto]), emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). -set_sepecial_cfg(emqx_exproto) -> +set_special_cfg(emqx_exproto) -> LisType = get(grpname), Listeners = application:get_env(emqx_exproto, listeners, []), SockOpts = socketopts(LisType), @@ -77,7 +77,7 @@ set_sepecial_cfg(emqx_exproto) -> NListeners = [{Proto, LisType, LisOn, UpgradeOpts(Opts)} || {Proto, _Type, LisOn, Opts} <- Listeners], application:set_env(emqx_exproto, listeners, NListeners); -set_sepecial_cfg(emqx) -> +set_special_cfg(emqx) -> application:set_env(emqx, allow_anonymous, true), application:set_env(emqx, enable_acl_cache, false), ok. diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index fe65052cf..3604d3505 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -1,6 +1,6 @@ {application, emqx_management, [{description, "EMQ X Management API and CLI"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_management_sup]}, {applications, [kernel,stdlib,minirest]}, diff --git a/apps/emqx_management/src/emqx_management.appup.src b/apps/emqx_management/src/emqx_management.appup.src index 5048e4f0f..3206ce31b 100644 --- a/apps/emqx_management/src/emqx_management.appup.src +++ b/apps/emqx_management/src/emqx_management.appup.src @@ -1,12 +1,14 @@ %% -*-: erlang -*- -{"4.3.1", - [ {"4.3.0", +{"4.3.2", + [ {<<"4.3.[0-1]">>, [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []} + , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []} ]} ], [ - {"4.3.0", + {<<"4.3.[0-1]">>, [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []} + , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []} ]} ] -}. \ No newline at end of file +}. diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 252ac3857..77fe96182 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -116,13 +116,7 @@ mgmt(_) -> status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), - emqx_ctl:print("Node ~p ~s is ~p~n", [node(), emqx_app:get_release(), InternalStatus]), - case lists:keysearch(?APP, 1, application:which_applications()) of - false -> - emqx_ctl:print("Application ~s is not running~n", [?APP]); - {value, {?APP, _Desc, Vsn}} -> - emqx_ctl:print("Application ~s ~s is running~n", [?APP, Vsn]) - end; + emqx_ctl:print("Node ~p ~s is ~p~n", [node(), emqx_app:get_release(), InternalStatus]); status(_) -> emqx_ctl:usage("status", "Show broker status"). diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 1eb6f8245..71a92686e 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -503,6 +503,7 @@ do_import_acl_mnesia(Acls) -> end. -ifdef(EMQX_ENTERPRISE). +-dialyzer({nowarn_function, [import_modules/1]}). import_modules(Modules) -> case ets:info(emqx_modules) of undefined -> diff --git a/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl b/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl index aca60a687..5667cb73f 100644 --- a/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl +++ b/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl @@ -28,7 +28,9 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Cfg) -> - emqx_ct_helpers:start_apps([emqx_bridge_mqtt, emqx_rule_engine]), + application:load(emqx_modules), + application:load(emqx_bridge_mqtt), + emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]), Cfg. end_per_suite(Cfg) -> @@ -179,4 +181,4 @@ remove_resources() -> lists:foreach(fun(#resource{id = Id}) -> emqx_rule_engine:delete_resource(Id) end, emqx_rule_registry:get_resources()), - timer:sleep(500). \ No newline at end of file + timer:sleep(500). diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl index 42578baf7..6bac9b4c7 100644 --- a/apps/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -133,7 +133,8 @@ t_mgmt_cmd(_) -> t_status_cmd(_) -> % ct:pal("start testing status command"), mock_print(), - ?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([]), "is running")), + %% init internal status seem to be always 'starting' when running ct tests + ?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([]), "Node\s.*@.*\sis\sstart(ed|ing)")), meck:unload(). t_broker_cmd(_) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index 86b65939c..bf3727ff5 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -34,55 +34,22 @@ -define(BASE_PATH, "api"). all() -> - [{group, rest_api}]. - -groups() -> - [{rest_api, - [sequence], - [ alarms - , apps - , banned - , brokers - , clients - , listeners - , metrics - , nodes - , plugins - , acl_cache - , pubsub - , routes_and_subscriptions - , stats - , data - ] - }]. + emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_management, emqx_auth_mnesia, emqx_modules]), - ekka_mnesia:start(), - emqx_mgmt_auth:mnesia(boot), + application:load(emqx_modules), + emqx_ct_helpers:start_apps([emqx_management]), Config. -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([emqx_auth_mnesia, emqx_management, emqx_modules]), - ekka_mnesia:ensure_stopped(). - -init_per_testcase(data, Config) -> - ok = emqx_dashboard_admin:mnesia(boot), - application:ensure_all_started(emqx_dashboard), - ok = emqx_rule_registry:mnesia(boot), - application:ensure_all_started(emqx_rule_engine), - Config; +end_per_suite(Config) -> + emqx_ct_helpers:stop_apps([emqx_management]), + Config. init_per_testcase(_, Config) -> Config. -end_per_testcase(data, _Config) -> - application:stop(emqx_dahboard), - application:stop(emqx_rule_engine), - ok; - -end_per_testcase(_, _Config) -> - ok. +end_per_testcase(_, Config) -> + Config. get(Key, ResponseBody) -> maps:get(Key, jiffy:decode(list_to_binary(ResponseBody), [return_maps])). @@ -101,7 +68,7 @@ is_existing(Name, [_Alarm | More]) -> is_existing(_Name, []) -> false. -alarms(_) -> +t_alarms(_) -> emqx_alarm:activate(alarm1), emqx_alarm:activate(alarm2), @@ -134,7 +101,7 @@ alarms(_) -> ?assertNot(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return5))))), ?assertNot(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return5))))). -apps(_) -> +t_apps(_) -> AppId = <<"123456">>, meck:new(emqx_mgmt_auth, [passthrough, no_history]), meck:expect(emqx_mgmt_auth, add_app, 6, fun(_, _, _, _, _, _) -> {error, undefined} end), @@ -172,7 +139,7 @@ apps(_) -> [App] = get(<<"data">>, Result), ?assertEqual(<<"admin">>, maps:get(<<"app_id">>, App)). -banned(_) -> +t_banned(_) -> Who = <<"myclient">>, {ok, _} = request_api(post, api_path(["banned"]), [], auth_header_(), #{<<"who">> => Who, @@ -190,7 +157,7 @@ banned(_) -> {ok, Result2} = request_api(get, api_path(["banned"]), auth_header_()), ?assertEqual([], get(<<"data">>, Result2)). -brokers(_) -> +t_brokers(_) -> {ok, _} = request_api(get, api_path(["brokers"]), auth_header_()), {ok, _} = request_api(get, api_path(["brokers", atom_to_list(node())]), auth_header_()), meck:new(emqx_mgmt, [passthrough, no_history]), @@ -199,7 +166,7 @@ brokers(_) -> ?assertEqual(<<"undefined">>, get(<<"message">>, Error)), meck:unload(emqx_mgmt). -clients(_) -> +t_clients(_) -> process_flag(trap_exit, true), Username1 = <<"user1">>, Username2 = <<"user2">>, @@ -288,7 +255,7 @@ receive_exit(Count) -> ct:log("timeout") end. -listeners(_) -> +t_listeners(_) -> {ok, _} = request_api(get, api_path(["listeners"]), auth_header_()), {ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "listeners"]), auth_header_()), meck:new(emqx_mgmt, [passthrough, no_history]), @@ -299,7 +266,7 @@ listeners(_) -> maps:get(<<"error">>, maps:get(<<"listeners">>, Error))), meck:unload(emqx_mgmt). -metrics(_) -> +t_metrics(_) -> {ok, _} = request_api(get, api_path(["metrics"]), auth_header_()), {ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "metrics"]), auth_header_()), meck:new(emqx_mgmt, [passthrough, no_history]), @@ -307,7 +274,7 @@ metrics(_) -> {ok, "{\"message\":\"undefined\"}"} = request_api(get, api_path(["nodes", atom_to_list(node()), "metrics"]), auth_header_()), meck:unload(emqx_mgmt). -nodes(_) -> +t_nodes(_) -> {ok, _} = request_api(get, api_path(["nodes"]), auth_header_()), {ok, _} = request_api(get, api_path(["nodes", atom_to_list(node())]), auth_header_()), meck:new(emqx_mgmt, [passthrough, no_history]), @@ -317,7 +284,8 @@ nodes(_) -> ?assertEqual(<<"undefined">>, maps:get(<<"error">>, Error)), meck:unload(emqx_mgmt). -plugins(_) -> +t_plugins(_) -> + application:ensure_all_started(emqx_auth_mnesia), {ok, Plugins1} = request_api(get, api_path(["plugins"]), auth_header_()), [Plugins11] = filter(get(<<"data">>, Plugins1), <<"node">>, atom_to_binary(node(), utf8)), [Plugin1] = filter(maps:get(<<"plugins">>, Plugins11), <<"name">>, <<"emqx_auth_mnesia">>), @@ -354,7 +322,7 @@ plugins(_) -> auth_header_()), [Plugin3] = filter(get(<<"data">>, Plugins3), <<"name">>, <<"emqx_auth_mnesia">>), ?assertEqual(<<"emqx_auth_mnesia">>, maps:get(<<"name">>, Plugin3)), - ?assertEqual(false, maps:get(<<"active">>, Plugin3)), + ?assertEqual(true, maps:get(<<"active">>, Plugin3)), {ok, _} = request_api(put, api_path(["nodes", @@ -370,9 +338,10 @@ plugins(_) -> atom_to_list(emqx_auth_mnesia), "unload"]), auth_header_()), - ?assertEqual(<<"not_started">>, get(<<"message">>, Error2)). + ?assertEqual(<<"not_started">>, get(<<"message">>, Error2)), + application:stop(emqx_auth_mnesia). -acl_cache(_) -> +t_acl_cache(_) -> ClientId = <<"client1">>, Topic = <<"mytopic">>, {ok, C1} = emqtt:start_link(#{clientid => ClientId}), @@ -395,7 +364,7 @@ acl_cache(_) -> ?assertEqual(0, length(Caches3)), ok = emqtt:disconnect(C1). -pubsub(_) -> +t_pubsub(_) -> Qos1Received = emqx_metrics:val('messages.qos1.received'), Qos2Received = emqx_metrics:val('messages.qos2.received'), Received = emqx_metrics:val('messages.received'), @@ -514,7 +483,7 @@ loop(Data) -> ?assertEqual(0, maps:get(<<"code">>, H)), loop(T). -routes_and_subscriptions(_) -> +t_routes_and_subscriptions(_) -> ClientId = <<"myclient">>, Topic = <<"mytopic">>, {ok, NonRoute} = request_api(get, api_path(["routes"]), auth_header_()), @@ -559,7 +528,7 @@ routes_and_subscriptions(_) -> ok = emqtt:disconnect(C1). -stats(_) -> +t_stats(_) -> {ok, _} = request_api(get, api_path(["stats"]), auth_header_()), {ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "stats"]), auth_header_()), meck:new(emqx_mgmt, [passthrough, no_history]), @@ -568,7 +537,11 @@ stats(_) -> ?assertEqual(<<"undefined">>, get(<<"message">>, Return)), meck:unload(emqx_mgmt). -data(_) -> +t_data(_) -> + ok = emqx_rule_registry:mnesia(boot), + ok = emqx_dashboard_admin:mnesia(boot), + application:ensure_all_started(emqx_rule_engine), + application:ensure_all_started(emqx_dashboard), {ok, Data} = request_api(post, api_path(["data","export"]), [], auth_header_(), [#{}]), #{<<"filename">> := Filename, <<"node">> := Node} = emqx_ct_http:get_http_data(Data), {ok, DataList} = request_api(get, api_path(["data","export"]), auth_header_()), @@ -576,7 +549,8 @@ data(_) -> ?assertMatch({ok, _}, request_api(post, api_path(["data","import"]), [], auth_header_(), #{<<"filename">> => Filename, <<"node">> => Node})), ?assertMatch({ok, _}, request_api(post, api_path(["data","import"]), [], auth_header_(), #{<<"filename">> => Filename})), - + application:stop(emqx_rule_engine), + application:stop(emqx_dahboard), ok. request_api(Method, Url, Auth) -> diff --git a/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl b/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl index 03d348d2a..2965b7ad0 100644 --- a/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl +++ b/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl @@ -28,24 +28,15 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Cfg) -> - emqx_ct_helpers:start_apps([emqx_web_hook, - emqx_bridge_mqtt, - emqx_rule_engine, - emqx_modules, - emqx_management, - emqx_dashboard]), - ok = ekka_mnesia:start(), + application:load(emqx_modules), + application:load(emqx_web_hook), + emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]), ok = emqx_rule_registry:mnesia(boot), ok = emqx_rule_engine:load_providers(), Cfg. end_per_suite(Cfg) -> - emqx_ct_helpers:stop_apps([emqx_dashboard, - emqx_management, - emqx_modules, - emqx_rule_engine, - emqx_bridge_mqtt, - emqx_web_hook]), + emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]), Cfg. get_data_path() -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 86846ccde..83b2d7632 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {applications, [kernel,stdlib,rulesql,getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 17cecac68..446e082b7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,13 +1,21 @@ %% -*-: erlang -*- -{"4.3.1", +{"4.3.2", [ {"4.3.0", - [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []} + [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}, + {load_module, emqx_rule_engine, brutal_purge, soft_purge, []} + ]}, + {"4.3.1", + [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], [ {"4.3.0", - [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []} + [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}, + {load_module, emqx_rule_engine, brutal_purge, soft_purge, []} + ]}, + {"4.3.1", + [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 07827ed1f..c2ccf2c29 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -408,7 +408,7 @@ refresh_resource_status() -> fun(#resource{id = ResId, type = ResType}) -> case emqx_rule_registry:find_resource_type(ResType) of {ok, #resource_type{on_status = {Mod, OnStatus}}} -> - fetch_resource_status(Mod, OnStatus, ResId); + _ = fetch_resource_status(Mod, OnStatus, ResId); _ -> ok end end, emqx_rule_registry:get_resources()). @@ -588,27 +588,26 @@ clear_action(Module, Destroy, ActionInstId) -> fetch_resource_status(Module, OnStatus, ResId) -> case emqx_rule_registry:find_resource_params(ResId) of {ok, ResParams = #resource_params{params = Params, status = #{is_alive := LastIsAlive}}} -> - try - NewStatus = - case Module:OnStatus(ResId, Params) of - #{is_alive := LastIsAlive} = Status -> Status; - #{is_alive := true} = Status -> - {ok, Type} = find_type(ResId), - Name = alarm_name_of_resource_down(Type, ResId), - emqx_alarm:deactivate(Name), - Status; - #{is_alive := false} = Status -> - {ok, Type} = find_type(ResId), - Name = alarm_name_of_resource_down(Type, ResId), - emqx_alarm:activate(Name, #{id => ResId, type => Type}), - Status - end, - emqx_rule_registry:add_resource_params(ResParams#resource_params{status = NewStatus}), - NewStatus + NewStatus = try + case Module:OnStatus(ResId, Params) of + #{is_alive := LastIsAlive} = Status -> Status; + #{is_alive := true} = Status -> + {ok, Type} = find_type(ResId), + Name = alarm_name_of_resource_down(Type, ResId), + emqx_alarm:deactivate(Name), + Status; + #{is_alive := false} = Status -> + {ok, Type} = find_type(ResId), + Name = alarm_name_of_resource_down(Type, ResId), + emqx_alarm:activate(Name, #{id => ResId, type => Type}), + Status + end catch _Error:Reason:STrace -> ?LOG(error, "get resource status for ~p failed: ~0p", [ResId, {Reason, STrace}]), #{is_alive => false} - end; + end, + emqx_rule_registry:add_resource_params(ResParams#resource_params{status = NewStatus}), + NewStatus; not_found -> #{is_alive => false} end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 4c2b661ea..f9e210ab3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -31,6 +31,8 @@ , range_get/3 ]). +-compile({no_auto_import,[alias/1]}). + -type(input() :: map()). -type(alias() :: atom()). -type(collection() :: {alias(), [term()]}). diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src index b0453c774..3d7db1b02 100644 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ b/apps/emqx_sn/src/emqx_sn.app.src @@ -1,6 +1,6 @@ {application, emqx_sn, [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,esockd]}, diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src new file mode 100644 index 000000000..499664fe1 --- /dev/null +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -0,0 +1,17 @@ +%% -*-: erlang -*- +{VSN, + [ + {"4.3.0", [ + {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []}, + {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]} + ]}, + {<<".*">>, []} + ], + [ + {"4.3.0", [ + {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []}, + {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]} + ]}, + {<<".*">>, []} + ] +}. diff --git a/apps/emqx_sn/src/emqx_sn_asleep_timer.erl b/apps/emqx_sn/src/emqx_sn_asleep_timer.erl index 56a63ee2f..37ea67689 100644 --- a/apps/emqx_sn/src/emqx_sn_asleep_timer.erl +++ b/apps/emqx_sn/src/emqx_sn_asleep_timer.erl @@ -18,6 +18,7 @@ -export([ init/0 , ensure/2 + , cancel/1 ]). -record(asleep_state, { @@ -42,8 +43,8 @@ init() -> -spec(ensure(undefined | integer(), asleep_state()) -> asleep_state()). ensure(undefined, State = #asleep_state{duration = Duration}) -> ensure(Duration, State); -ensure(Duration, State = #asleep_state{tref = TRef}) -> - _ = cancel(TRef), +ensure(Duration, State) -> + cancel(State), State#asleep_state{duration = Duration, tref = start(Duration)}. %%-------------------------------------------------------------------- @@ -55,6 +56,10 @@ ensure(Duration, State = #asleep_state{tref = TRef}) -> start(Duration) -> erlang:send_after(timer:seconds(Duration), self(), asleep_timeout). -cancel(undefined) -> ok; -cancel(TRef) when is_reference(TRef) -> - erlang:cancel_timer(TRef). +cancel(#asleep_state{tref = Timer}) when is_reference(Timer) -> + case erlang:cancel_timer(Timer) of + false -> + receive {timeout, Timer, _} -> ok after 0 -> ok end; + _ -> ok + end; +cancel(_) -> ok. \ No newline at end of file diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index bfb2f28df..96f849974 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -439,12 +439,11 @@ asleep(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State) % 4) emq-sn regard this CONNECT as a signal to connected state, not a bootup CONNECT. For this reason, will procedure is lost % this should be a bug in mqtt-sn channel. asleep(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, - State = #state{keepalive_interval = _Interval}) -> - % device wakeup and goto connected state - % keepalive timer may timeout in asleep state and delete itself, need to restart keepalive - % TODO: Fixme later. - %% self() ! {keepalive, start, Interval}, - {next_state, connected, send_connack(State)}; + State = #state{channel = Channel, asleep_timer = Timer}) -> + NChannel = emqx_channel:ensure_keepalive(#{}, Channel), + emqx_sn_asleep_timer:cancel(Timer), + {next_state, connected, send_connack(State#state{channel = NChannel, + asleep_timer = emqx_sn_asleep_timer:init()})}; asleep(EventType, EventContent, State) -> handle_event(EventType, EventContent, asleep, State). @@ -771,10 +770,13 @@ send_message(Msg = #mqtt_sn_message{type = Type}, goto_asleep_state(State) -> goto_asleep_state(undefined, State). -goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) -> +goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer, + channel = Channel}) -> ?LOG(debug, "goto_asleep_state Duration=~p", [Duration]), NewTimer = emqx_sn_asleep_timer:ensure(Duration, AsleepTimer), - {next_state, asleep, State#state{asleep_timer = NewTimer}, hibernate}. + NChannel = emqx_channel:clear_keepalive(Channel), + {next_state, asleep, State#state{asleep_timer = NewTimer, + channel = NChannel}, hibernate}. %%-------------------------------------------------------------------- %% Helper funcs diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index ad0c5f032..2972571be 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -856,7 +856,7 @@ t_will_test2(_) -> send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), - timer:sleep(10000), + timer:sleep(4000), receive_response(Socket), % ignore PUBACK receive_response(Socket), % ignore PUBCOMP @@ -878,7 +878,7 @@ t_will_test3(_) -> send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), - timer:sleep(10000), + timer:sleep(4000), ?assertEqual(udp_receive_timeout, receive_response(Socket)), @@ -906,7 +906,7 @@ t_will_test4(_) -> send_willmsgupd_msg(Socket, <<"1A2B3C">>), ?assertEqual(<<3, ?SN_WILLMSGRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)), - timer:sleep(10000), + timer:sleep(4000), receive_response(Socket), % ignore PUBACK @@ -1359,7 +1359,7 @@ t_asleep_test07_to_connected(_) -> timer:sleep(1500), % asleep timer should get timeout, without any effect - timer:sleep(9000), + timer:sleep(4000), % keepalive timer should get timeout gen_udp:close(Socket). @@ -1517,7 +1517,7 @@ t_awake_test01_to_connected(_) -> timer:sleep(1500), % asleep timer should get timeout - timer:sleep(9000), + timer:sleep(4000), % keepalive timer should get timeout gen_udp:close(Socket). diff --git a/bin/emqx.cmd b/bin/emqx.cmd index bef7da355..8b9686462 100644 --- a/bin/emqx.cmd +++ b/bin/emqx.cmd @@ -16,6 +16,7 @@ :: Set variables that describe the release @set rel_name=emqx @set rel_vsn={{ release_version }} +@set REL_VSN=%rel_vsn% @set erts_vsn={{ erts_vsn }} @set erl_opts={{ erl_opts }} @@ -30,6 +31,7 @@ set rel_root_dir=%%~fA ) @set rel_dir=%rel_root_dir%\releases\%rel_vsn% +@set RUNNER_ROOT_DIR=%rel_root_dir% @set etc_dir=%rel_root_dir%\etc @set lib_dir=%rel_root_dir%\lib diff --git a/bin/nodetool b/bin/nodetool index 431121148..0e89ac278 100755 --- a/bin/nodetool +++ b/bin/nodetool @@ -292,25 +292,11 @@ join([H|T], Sep) -> add_libs_dir() -> [_ | _] = RootDir = os:getenv("RUNNER_ROOT_DIR"), - RelFile = filename:join([RootDir, "releases", - os:getenv("REL_VSN"), - "emqx.rel" - ]), - case file:consult(RelFile) of - {ok, [{release, {_, _RelVsn}, {erts, _ErtsVsn}, Libs}]} -> - lists:foreach( - fun({Name, Vsn}) -> add_lib_dir(RootDir, Name, Vsn); - ({Name, Vsn, _}) -> add_lib_dir(RootDir, Name, Vsn) - end, Libs); - {error, enoent} -> - %% rel file is deleted by release handler - add_libs_dir2(RootDir) - end. - -add_libs_dir2(RootDir) -> + CurrentVsn = os:getenv("REL_VSN"), RelFile = filename:join([RootDir, "releases", "RELEASES"]), case file:consult(RelFile) of - {ok, [[Release]]} -> + {ok, [Releases]} -> + Release = lists:keyfind(CurrentVsn, 3, Releases), {release, _Name, _AppVsn, _ErtsVsn, Libs, _State} = Release, lists:foreach( fun({Name, Vsn, _}) -> diff --git a/lib-ce/emqx_modules/test/emqx_modules_SUITE.erl b/lib-ce/emqx_modules/test/emqx_modules_SUITE.erl index 48bd0bd5b..dc76e8eb7 100644 --- a/lib-ce/emqx_modules/test/emqx_modules_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_modules_SUITE.erl @@ -32,11 +32,11 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_management, emqx_modules], fun set_sepecial_cfg/1), + emqx_ct_helpers:start_apps([emqx_management, emqx_modules], fun set_special_cfg/1), emqx_ct_http:create_default_app(), Config. -set_sepecial_cfg(_) -> +set_special_cfg(_) -> application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")), ok. diff --git a/priv/emqx.schema b/priv/emqx.schema index e7a96abcb..f107798a2 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2307,7 +2307,9 @@ end}. [random, %% randomly pick a subscriber round_robin, %% round robin alive subscribers one message after another sticky, %% pick a random subscriber and stick to it - hash %% hash client ID to a group member + hash, %% hash client ID to a group member + hash_clientid, + hash_topic ]}} ]}. diff --git a/rebar.config b/rebar.config index 84603617a..2059854dd 100644 --- a/rebar.config +++ b/rebar.config @@ -35,7 +35,7 @@ {erl_first_files, ["src/emqx_logger.erl", "src/emqx_rule_actions_trans.erl"]}. {deps, - [ {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.3"}}} + [ {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.5"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} diff --git a/rebar.config.erl b/rebar.config.erl index c4c0419b9..26526e1e1 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -194,7 +194,7 @@ overlay_vars_pkg(bin) -> , {platform_etc_dir, "etc"} , {platform_lib_dir, "lib"} , {platform_log_dir, "log"} - , {platform_plugins_dir, "plugins"} + , {platform_plugins_dir, "etc/plugins"} , {runner_root_dir, "$(cd $(dirname $(readlink $0 || echo $0))/..; pwd -P)"} , {runner_bin_dir, "$RUNNER_ROOT_DIR/bin"} , {runner_etc_dir, "$RUNNER_ROOT_DIR/etc"} diff --git a/scripts/apps-version-check.sh b/scripts/apps-version-check.sh index 49350b007..098653444 100755 --- a/scripts/apps-version-check.sh +++ b/scripts/apps-version-check.sh @@ -16,7 +16,6 @@ while read -r app; do now_app_version=$(grep -E 'vsn' "$src_file" | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"') if [ "$old_app_version" = "$now_app_version" ]; then changed="$(git diff --name-only "$latest_release"...HEAD \ - -- "$app_path/etc" \ -- "$app_path/src" \ -- "$app_path/priv" \ -- "$app_path/c_src" | wc -l)" diff --git a/src/emqx.app.src b/src/emqx.app.src index 449ffd311..b195d7a1b 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -1,7 +1,7 @@ {application, emqx, [{id, "emqx"}, {description, "EMQ X"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]}, diff --git a/src/emqx.appup.src b/src/emqx.appup.src index b10c14a9c..3bf40272c 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,19 +1,57 @@ %% -*-: erlang -*- {VSN, [ + {"4.3.1", [ + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_congestion, brutal_purge, soft_purge, []}, + {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} + ]}, {"4.3.0", [ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, + {load_module, emqx_congestion, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_trie, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []}, + %% {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}} ]}, {<<".*">>, []} ], [ + {"4.3.1", [ + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_congestion, brutal_purge, soft_purge, []}, + {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []} + ]}, {"4.3.0", [ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_congestion, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_trie, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_app, brutal_purge, soft_purge, []}, + {load_module, emqx_plugins, brutal_purge, soft_purge, []}, %% Just load the module. We don't need to change the 'messages.retained' %% and 'messages.retained' counter type. - {load_module, emqx_metrics, brutal_purge, soft_purge, []} + {load_module, emqx_metrics, brutal_purge, soft_purge, []}, + {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}} ]}, {<<".*">>, []} ] diff --git a/src/emqx_broker_bench.erl b/src/emqx_broker_bench.erl index 45ef0eab6..5aad43cc9 100644 --- a/src/emqx_broker_bench.erl +++ b/src/emqx_broker_bench.erl @@ -18,32 +18,82 @@ -ifdef(EMQX_BENCHMARK). --export([start/1, run1/0, run1/2]). +-export([run/1, run1/0, run1/4]). -run1() -> run1(4, 1000). +-define(T(Expr), timer:tc(fun() -> Expr end)). -run1(Factor, Limit) -> - start(#{factor => Factor, - limit => Limit, - sub_ptn => <<"device/{{id}}/+/{{num}}/#">>, - pub_ptn => <<"device/{{id}}/xays/{{num}}/foo/bar/baz">>}). +run1() -> run1(80, 1000, 80, 10000). + +run1(Subs, SubOps, Pubs, PubOps) -> + run(#{subscribers => Subs, + publishers => Pubs, + sub_ops => SubOps, + pub_ops => PubOps, + sub_ptn => <<"device/{{id}}/+/{{num}}/#">>, + pub_ptn => <<"device/{{id}}/foo/{{num}}/bar/1/2/3/4/5">> + }). %% setting fields: -%% - factor: spawn broker-pool-size * factor number of callers -%% - limit: limit the total number of topics for each caller +%% - subscribers: spawn this number of subscriber workers +%% - publishers: spawn this number of publisher workers +%% - sub_ops: the number of subscribes (route insert) each subscriber runs +%% - pub_ops: the number of publish (route lookups) each publisher runs %% - sub_ptn: subscribe topic pattern like a/+/b/+/c/# %% or a/+/{{id}}/{{num}}/# to generate pattern with {{id}} %% replaced by worker id and {{num}} replaced by topic number. %% - pub_ptn: topic pattern used to benchmark publish (match) performance %% e.g. a/x/{{id}}/{{num}}/foo/bar -start(#{factor := Factor} = Settings) -> - BrokerPoolSize = emqx_vm:schedulers() * 2, - Pids = start_callers(BrokerPoolSize * Factor, Settings), - R = collect_results(Pids, #{subscribe => 0, match => 0}), +run(#{subscribers := Subs, + publishers := Pubs, + sub_ops := SubOps, + pub_ops := PubOps + } = Settings) -> + SubsPids = start_callers(Subs, fun start_subscriber/1, Settings), + PubsPids = start_callers(Pubs, fun start_publisher/1, Settings), + _ = collect_results(SubsPids, subscriber_ready), + io:format(user, "subscribe ...~n", []), + {T1, SubsTime} = + ?T(begin + lists:foreach(fun(Pid) -> Pid ! start_subscribe end, SubsPids), + collect_results(SubsPids, subscribe_time) + end), + io:format(user, "InsertTotalTime: ~s~n", [ns(T1)]), + io:format(user, "InsertTimeAverage: ~s~n", [ns(SubsTime / Subs)]), + io:format(user, "InsertRps: ~p~n", [rps(Subs * SubOps, T1)]), + + io:format(user, "lookup ...~n", []), + {T2, PubsTime} = + ?T(begin + lists:foreach(fun(Pid) -> Pid ! start_lookup end, PubsPids), + collect_results(PubsPids, lookup_time) + end), + io:format(user, "LookupTotalTime: ~s~n", [ns(T2)]), + io:format(user, "LookupTimeAverage: ~s~n", [ns(PubsTime / Pubs)]), + io:format(user, "LookupRps: ~p~n", [rps(Pubs * PubOps, T2)]), + io:format(user, "mnesia table(s) RAM: ~p~n", [ram_bytes()]), - io:format(user, "~p~n", [erlang:memory()]), - io:format(user, "~p~n", [R]), - lists:foreach(fun(Pid) -> Pid ! stop end, Pids). + + io:format(user, "unsubscribe ...~n", []), + {T3, ok} = + ?T(begin + lists:foreach(fun(Pid) -> Pid ! stop end, SubsPids), + wait_until_empty() + end), + io:format(user, "TimeToUnsubscribeAll: ~s~n", [ns(T3)]). + +wait_until_empty() -> + case emqx_trie:empty() of + true -> ok; + false -> + timer:sleep(5), + wait_until_empty() + end. + +rps(N, NanoSec) -> N * 1_000_000 / NanoSec. + +ns(T) when T > 1_000_000 -> io_lib:format("~p(s)", [T / 1_000_000]); +ns(T) when T > 1_000 -> io_lib:format("~p(ms)", [T / 1_000]); +ns(T) -> io_lib:format("~p(ns)", [T]). ram_bytes() -> Wordsize = erlang:system_info(wordsize), @@ -56,48 +106,69 @@ ram_bytes() -> 0 end. -start_callers(0, _) -> []; -start_callers(N, Settings) -> - [start_caller(Settings#{id => N}) | start_callers(N - 1, Settings)]. +start_callers(N, F, Settings) -> + start_callers(N, F, Settings, []). -collect_results([], R) -> R; -collect_results([Pid | Pids], Acc = #{subscribe := Sr, match := Mr}) -> +start_callers(0, _F, _Settings, Acc) -> + lists:reverse(Acc); +start_callers(N, F, Settings, Acc) -> + start_callers(N - 1, F, Settings, [F(Settings#{id => N}) | Acc]). + +collect_results(Pids, Tag) -> + collect_results(Pids, Tag, 0). + +collect_results([], _Tag, R) -> R; +collect_results([Pid | Pids], Tag, R) -> receive - {Pid, #{subscribe := Srd, match := Mrd}} -> - collect_results(Pids, Acc#{subscribe := Sr + Srd, match := Mr + Mrd}) + {Pid, Tag, N} -> + collect_results(Pids, Tag, N + R) end. -%% ops per second -rps(T, N) -> round(N / (T / 1000000)). - -start_caller(#{id := Id, limit := N, sub_ptn := SubPtn, pub_ptn := PubPtn}) -> +start_subscriber(#{id := Id, sub_ops := N, sub_ptn := SubPtn}) -> Parent = self(), proc_lib:spawn_link( fun() -> SubTopics = make_topics(SubPtn, Id, N), - {Ts, _} = timer:tc(fun() -> subscribe(SubTopics) end), - PubTopics = make_topics(PubPtn, Id, N), - {Tm, _} = timer:tc(fun() -> match(PubTopics) end), - _ = erlang:send(Parent, {self(), #{subscribe => rps(Ts, N), match => rps(Tm, N)}}), + Parent ! {self(), subscriber_ready, 0}, + receive + start_subscribe -> + ok + end, + {Ts, _} = ?T(subscribe(SubTopics)), + _ = erlang:send(Parent, {self(), subscribe_time, Ts/ N}), + %% subscribers should not exit before publish test is done receive stop -> ok end end). -match([]) -> ok; -match([Topic | Topics]) -> - _ = emqx_router:lookup_routes(Topic), - match(Topics). +start_publisher(#{id := Id, pub_ops := N, pub_ptn := PubPtn, subscribers := Subs}) -> + Parent = self(), + proc_lib:spawn_link( + fun() -> + L = lists:seq(1, N), + [Topic] = make_topics(PubPtn, (Id rem Subs) + 1, 1), + receive + start_lookup -> + ok + end, + {Tm, ok} = ?T(lists:foreach(fun(_) -> match(Topic) end, L)), + _ = erlang:send(Parent, {self(), lookup_time, Tm / N}), + ok + end). + +match(Topic) -> + [_] = emqx_router:match_routes(Topic). subscribe([]) -> ok; subscribe([Topic | Rest]) -> ok = emqx_broker:subscribe(Topic), subscribe(Rest). -make_topics(SubPtn0, Id, Limit) -> - SubPtn = emqx_topic:words(SubPtn0), - F = fun(N) -> render(Id, N, SubPtn) end, +make_topics(Ptn0, Id, Limit) -> + Ptn = emqx_topic:words(Ptn0), + F = fun(N) -> render(Id, N, Ptn) end, lists:map(F, lists:seq(1, Limit)). render(ID, N, Ptn) -> diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c8acccce9..3a818b9a7 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -49,7 +49,10 @@ ]). %% Export for emqx_sn --export([do_deliver/2]). +-export([ do_deliver/2 + , ensure_keepalive/2 + , clear_keepalive/1 + ]). %% Exports for CT -export([set_field/3]). @@ -1562,6 +1565,14 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone} Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). +clear_keepalive(Channel = #channel{timers = Timers}) -> + case maps:get(alive_timer, Timers, undefined) of + undefined -> + Channel; + TRef -> + emqx_misc:cancel_timer(TRef), + Channel#channel{timers = maps:without([alive_timer], Timers)} + end. %%-------------------------------------------------------------------- %% Maybe Resume Session diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 7f324e322..b15a2ff79 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -22,6 +22,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -logger_header("[CM]"). @@ -110,6 +111,7 @@ start_link() -> insert_channel_info(ClientId, Info, Stats) -> Chan = {ClientId, self()}, true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), + ?tp(debug, insert_channel_info, #{client_id => ClientId}), ok. %% @private @@ -279,18 +281,25 @@ takeover_session(ClientId, ChanPid) -> discard_session(ClientId) when is_binary(ClientId) -> case lookup_channels(ClientId) of [] -> ok; - ChanPids -> - lists:foreach( - fun(ChanPid) -> - try - discard_session(ClientId, ChanPid) - catch - _:{noproc,_}:_Stk -> ok; - _:{{shutdown,_},_}:_Stk -> ok; - _:Error:_Stk -> - ?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error]) - end - end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids) + end. + +do_discard_session(ClientId, Pid) -> + try + discard_session(ClientId, Pid) + catch + _ : noproc -> % emqx_ws_connection: call + ?tp(debug, "session_already_gone", #{pid => Pid}), + ok; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ?tp(debug, "session_already_gone", #{pid => Pid}), + ok; + _ : {{shutdown, _}, _} -> + ?tp(debug, "session_already_shutdown", #{pid => Pid}), + ok; + _ : Error : St -> + ?tp(error, "failed_to_discard_session", + #{pid => Pid, reason => Error, stacktrace=>St}) end. discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> diff --git a/src/emqx_congestion.erl b/src/emqx_congestion.erl index ea99a63b1..4ec20034d 100644 --- a/src/emqx_congestion.erl +++ b/src/emqx_congestion.erl @@ -48,7 +48,10 @@ maybe_alarm_conn_congestion(Socket, Transport, Channel) -> cancel_alarms(Socket, Transport, Channel) -> lists:foreach(fun(Reason) -> - do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) + case has_alarm_sent(Reason) of + true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason); + false -> ok + end end, ?ALL_ALARM_REASONS). is_alarm_enabled(Channel) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 69ffddf06..814fd9007 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -41,8 +41,13 @@ , stats/1 ]). +-export([ async_set_keepalive/4 + , async_set_socket_options/2 + ]). + -export([ call/2 , call/3 + , cast/2 ]). %% Callback @@ -56,7 +61,7 @@ ]). %% Internal callback --export([wakeup_from_hib/2, recvloop/2]). +-export([wakeup_from_hib/2, recvloop/2, get_state/1]). %% Export for CT -export([set_field/3]). @@ -184,6 +189,35 @@ stats(#state{transport = Transport, ProcStats = emqx_misc:proc_stats(), lists:append([SockStats, ConnStats, ChanStats, ProcStats]). +%% @doc Set TCP keepalive socket options to override system defaults. +%% Idle: The number of seconds a connection needs to be idle before +%% TCP begins sending out keep-alive probes (Linux default 7200). +%% Interval: The number of seconds between TCP keep-alive probes +%% (Linux default 75). +%% Probes: The maximum number of TCP keep-alive probes to send before +%% giving up and killing the connection if no response is +%% obtained from the other end (Linux default 9). +%% +%% NOTE: This API sets TCP socket options, which has nothing to do with +%% the MQTT layer's keepalive (PINGREQ and PINGRESP). +async_set_keepalive(Pid, Idle, Interval, Probes) -> + Options = [ {keepalive, true} + , {raw, 6, 4, <>} + , {raw, 6, 5, <>} + , {raw, 6, 6, <>} + ], + async_set_socket_options(Pid, Options). + +%% @doc Set custom socket options. +%% This API is made async because the call might be originated from +%% a hookpoint callback (otherwise deadlock). +%% If failed to set, the error message is logged. +async_set_socket_options(Pid, Options) -> + cast(Pid, {async_set_socket_options, Options}). + +cast(Pid, Req) -> + gen_server:cast(Pid, Req). + call(Pid, Req) -> call(Pid, Req, infinity). call(Pid, Req, Timeout) -> @@ -366,6 +400,9 @@ handle_msg({'$gen_call', From, Req}, State) -> gen_server:reply(From, Reply), stop(Reason, NState) end; +handle_msg({'$gen_cast', Req}, State) -> + NewState = handle_cast(Req, State), + {ok, NewState}; handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> ?LOG(debug, "RECV ~0p", [Data]), @@ -475,7 +512,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport, E : C : S -> ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) end, - ?tp(debug, terminate, #{}), + ?tp(info, terminate, #{reason => Reason}), maybe_raise_excption(Reason). %% close socket, discard new state, always return ok. @@ -683,12 +720,31 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) -> end; handle_info({sock_error, Reason}, State) -> - Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]), + case Reason =/= closed andalso Reason =/= einval of + true -> ?LOG(warning, "socket_error: ~p", [Reason]); + false -> ok + end, handle_info({sock_closed, Reason}, close_socket(State)); handle_info(Info, State) -> with_channel(handle_info, [Info], State). +%%-------------------------------------------------------------------- +%% Handle Info + +handle_cast({async_set_socket_options, Opts}, + State = #state{transport = Transport, + socket = Socket + }) -> + case Transport:setopts(Socket, Opts) of + ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts}); + Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err}) + end, + State; +handle_cast(Req, State) -> + ?tp(error, "received_unknown_cast", #{cast => Req}), + State. + %%-------------------------------------------------------------------- %% Ensure rate limit @@ -817,3 +873,7 @@ set_field(Name, Value, State) -> Pos = emqx_misc:index_of(Name, record_info(fields, state)), setelement(Pos+1, State, Value). +get_state(Pid) -> + State = sys:get_state(Pid), + maps:from_list(lists:zip(record_info(fields, state), + tl(tuple_to_list(State)))). diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 7e5aae788..c4a2f6ac0 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -40,6 +40,8 @@ , serialize_opts/0 ]). +-define(Q(BYTES, Q), {BYTES, Q}). + -type(options() :: #{strict_mode => boolean(), max_size => 1..?MAX_PACKET_SIZE, version => emqx_types:version() @@ -50,12 +52,12 @@ -type(parse_result() :: {more, parse_state()} | {ok, emqx_types:packet(), binary(), parse_state()}). --type(cont_state() :: {Stage :: len | body, - State :: #{hdr := #mqtt_packet_header{}, - len := {pos_integer(), non_neg_integer()} | non_neg_integer(), - rest => binary() - } - }). +-type(cont_state() :: + {Stage :: len | body, + State :: #{hdr := #mqtt_packet_header{}, + len := {pos_integer(), non_neg_integer()} | non_neg_integer(), + rest => binary() | ?Q(non_neg_integer(), queue:queue(binary())) + }}). -type(serialize_opts() :: options()). @@ -117,9 +119,19 @@ parse(Bin, {{len, #{hdr := Header, parse_remaining_len(Bin, Header, Multiplier, Length, Options); parse(Bin, {{body, #{hdr := Header, len := Length, - rest := Rest} + rest := Body} }, Options}) when is_binary(Bin) -> - parse_frame(<>, Header, Length, Options). + BodyBytes = body_bytes(Body), + {NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin), + NewBody = append_body(Body, NewBodyPart), + parse_frame(NewBody, Tail, Header, Length, Options). + +%% split given binary with the first N bytes +split(N, Bin) when N =< 0 -> + {Bin, <<>>}; +split(N, Bin) when N =< size(Bin) -> + <> = Bin, + {H, T}. parse_remaining_len(<<>>, Header, Options) -> {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; @@ -132,7 +144,8 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; %% Match DISCONNECT without payload -parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> +parse_remaining_len(<<0:8, Rest/binary>>, + Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), {ok, Packet, Rest, ?none(Options)}; %% Match PINGREQ. @@ -149,16 +162,35 @@ parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Opti parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options = #{max_size := MaxSize}) -> FrameLen = Value + Len * Multiplier, - if - FrameLen > MaxSize -> error(frame_too_large); - true -> parse_frame(Rest, Header, FrameLen, Options) + case FrameLen > MaxSize of + true -> error(frame_too_large); + false -> parse_frame(Rest, Header, FrameLen, Options) end. -parse_frame(Bin, Header, 0, Options) -> - {ok, packet(Header), Bin, ?none(Options)}; -parse_frame(Bin, Header, Length, Options) -> - case Bin of - <> -> +body_bytes(B) when is_binary(B) -> size(B); +body_bytes(?Q(Bytes, _)) -> Bytes. + +append_body(H, T) when is_binary(H) andalso size(H) < 1024 -> + <>; +append_body(H, T) when is_binary(H) -> + Bytes = size(H) + size(T), + ?Q(Bytes, queue:from_list([H, T])); +append_body(?Q(Bytes, Q), T) -> + ?Q(Bytes + iolist_size(T), queue:in(T, Q)). + +flatten_body(Body, Tail) when is_binary(Body) -> <>; +flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]). + +parse_frame(Body, Header, Length, Options) -> + %% already appended + parse_frame(Body, _SplitTail = <<>>, Header, Length, Options). + +parse_frame(Body, Tail, Header, 0, Options) -> + {ok, packet(Header), flatten_body(Body, Tail), ?none(Options)}; +parse_frame(Body, Tail, Header, Length, Options) -> + case body_bytes(Body) >= Length of + true -> + <> = flatten_body(Body, Tail), case parse_packet(Header, FrameBin, Options) of {Variable, Payload} -> {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; @@ -167,8 +199,11 @@ parse_frame(Bin, Header, Length, Options) -> Variable -> {ok, packet(Header, Variable), Rest, ?none(Options)} end; - TooShortBin -> - {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}} + false -> + {more, {{body, #{hdr => Header, + len => Length, + rest => append_body(Body, Tail) + }}, Options}} end. -compile({inline, [packet/1, packet/2, packet/3]}). diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 3b96bfbad..c3ce14d83 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -201,16 +201,8 @@ stop() -> gen_server:stop(?SERVER). %% BACKW: v4.3.0 upgrade_retained_delayed_counter_type() -> - case ets:info(?TAB, name) of - ?TAB -> - [M1] = ets:lookup(?TAB, 'messages.retained'), - [M2] = ets:lookup(?TAB, 'messages.delayed'), - true = ets:insert(?TAB, M1#metric{type = counter}), - true = ets:insert(?TAB, M2#metric{type = counter}), - ok; - _ -> - ok - end. + Ks = ['messages.retained', 'messages.delayed'], + gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity). %%-------------------------------------------------------------------- %% Metrics API @@ -467,6 +459,13 @@ handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) -> {reply, {ok, NextIdx}, State#state{next_idx = NextIdx + 1}} end; +handle_call({set_type_to_counter, Keys}, _From, State) -> + lists:foreach( + fun(K) -> + ets:update_element(?TAB, K, {#metric.type, counter}) + end, Keys), + {reply, ok, State}; + handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. diff --git a/src/emqx_node_dump.erl b/src/emqx_node_dump.erl index 7134684e1..18189bb57 100644 --- a/src/emqx_node_dump.erl +++ b/src/emqx_node_dump.erl @@ -45,16 +45,28 @@ censor(Path, M) when is_map(M) -> maps:map(Fun, M); censor(Path, L = [Fst|_]) when is_tuple(Fst) -> [censor(Path, I) || I <- L]; -censor(Path, Val) -> - case Path of - [password|_] -> - obfuscate_value(Val); - [secret|_] -> - obfuscate_value(Val); - _ -> - Val +censor([Key | _], Val) -> + case is_sensitive(Key) of + true -> obfuscate_value(Val); + false -> Val end. +is_sensitive(Key) when is_atom(Key) -> + is_sensitive(atom_to_binary(Key)); +is_sensitive(Key) when is_list(Key) -> + try iolist_to_binary(Key) of + Bin -> + is_sensitive(Bin) + catch + _ : _ -> + false + end; +is_sensitive(Key) when is_binary(Key) -> + lists:any(fun(Pattern) -> re:run(Key, Pattern) =/= nomatch end, + ["passwd", "password", "secret"]); +is_sensitive(Key) when is_tuple(Key) -> + false. + obfuscate_value(Val) when is_binary(Val) -> <<"********">>; obfuscate_value(_Val) -> diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index bf8de4a59..f05da760e 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -61,7 +61,7 @@ init() -> %% @doc Load all plugins when the broker started. -spec(load() -> ok | ignore | {error, term()}). load() -> - load_expand_plugins(), + ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)), case emqx:get_env(plugins_loaded_file) of undefined -> ignore; %% No plugins available File -> @@ -148,46 +148,61 @@ init_config(CfgFile) -> [application:set_env(App, Par, Val) || {Par, Val} <- Envs] end, AppsEnv). -load_expand_plugins() -> - case emqx:get_env(expand_plugins_dir) of - undefined -> ok; - ExpandPluginsDir -> - Plugins = filelib:wildcard("*", ExpandPluginsDir), - lists:foreach(fun(Plugin) -> - PluginDir = filename:join(ExpandPluginsDir, Plugin), +%% load external plugins which are placed in etc/plugins dir +load_ext_plugins(undefined) -> ok; +load_ext_plugins(Dir) -> + lists:foreach( + fun(Plugin) -> + PluginDir = filename:join(Dir, Plugin), case filelib:is_dir(PluginDir) of - true -> load_expand_plugin(PluginDir); + true -> load_ext_plugin(PluginDir); false -> ok end - end, Plugins) - end. + end, filelib:wildcard("*", Dir)). -load_expand_plugin(PluginDir) -> - init_expand_plugin_config(PluginDir), +load_ext_plugin(PluginDir) -> + ?LOG(debug, "loading_extra_plugin: ~s", [PluginDir]), Ebin = filename:join([PluginDir, "ebin"]), + AppFile = filename:join([Ebin, "*.app"]), + AppName = case filelib:wildcard(AppFile) of + [App] -> + list_to_atom(filename:basename(App, ".app")); + [] -> + ?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]), + error({plugin_app_file_not_found, AppFile}) + end, + ok = load_plugin_app(AppName, Ebin), + ok = load_plugin_conf(AppName, PluginDir). + +load_plugin_app(AppName, Ebin) -> _ = code:add_patha(Ebin), Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])), - lists:foreach(fun(Mod) -> - Module = list_to_atom(filename:basename(Mod, ".beam")), - code:load_file(Module) - end, Modules), - case filelib:wildcard(Ebin ++ "/*.app") of - [App|_] -> application:load(list_to_atom(filename:basename(App, ".app"))); - _ -> ?LOG(alert, "Plugin not found."), - {error, load_app_fail} + lists:foreach( + fun(BeamFile) -> + Module = list_to_atom(filename:basename(BeamFile, ".beam")), + case code:ensure_loaded(Module) of + {module, Module} -> ok; + {error, Reason} -> error({failed_to_load_plugin_beam, BeamFile, Reason}) + end + end, Modules), + case application:load(AppName) of + ok -> ok; + {error, {already_loaded, _}} -> ok end. -init_expand_plugin_config(PluginDir) -> - Priv = PluginDir ++ "/priv", - Etc = PluginDir ++ "/etc", - Schema = filelib:wildcard(Priv ++ "/*.schema"), - Conf = case filelib:wildcard(Etc ++ "/*.conf") of - [] -> []; - [Conf1] -> cuttlefish_conf:file(Conf1) - end, +load_plugin_conf(AppName, PluginDir) -> + Priv = filename:join([PluginDir, "priv"]), + Etc = filename:join([PluginDir, "etc"]), + Schema = filelib:wildcard(filename:join([Priv, "*.schema"])), + ConfFile = filename:join([Etc, atom_to_list(AppName) ++ ".conf"]), + Conf = case filelib:is_file(ConfFile) of + true -> cuttlefish_conf:file(ConfFile); + false -> error({conf_file_not_found, ConfFile}) + end, + ?LOG(debug, "loading_extra_plugin_config conf=~s, schema=~s", [ConfFile, Schema]), AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf), - lists:foreach(fun({AppName, Envs}) -> - [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs] + lists:foreach(fun({AppName1, Envs}) -> + [application:set_env(AppName1, Par, Val) || {Par, Val} <- Envs] end, AppsEnv). ensure_file(File) -> @@ -210,10 +225,11 @@ filter_plugins(Names) -> end, Names). load_plugins(Names, Persistent) -> - Plugins = list(), NotFound = Names -- names(Plugins), + Plugins = list(), + NotFound = Names -- names(Plugins), case NotFound of [] -> ok; - NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound]) + NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound]) end, NeedToLoad = Names -- NotFound -- names(started_app), lists:foreach(fun(Name) -> @@ -223,19 +239,31 @@ load_plugins(Names, Persistent) -> generate_configs(App) -> ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config", - ConfFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf", - SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema", - case {filelib:is_file(ConfigFile), filelib:is_file(ConfFile) andalso filelib:is_file(SchemaFile)} of - {true, _} -> + case filelib:is_file(ConfigFile) of + true -> {ok, [Configs]} = file:consult(ConfigFile), Configs; - {_, true} -> + false -> + do_generate_configs(App) + end. + +do_generate_configs(App) -> + Name1 = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf", + Name2 = filename:join([code:lib_dir(App), "etc", App]) ++ ".conf", + ConfFile = case {filelib:is_file(Name1), filelib:is_file(Name2)} of + {true, _} -> Name1; + {false, true} -> Name2; + {false, false} -> error({config_not_found, [Name1, Name2]}) + end, + SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema", + case filelib:is_file(SchemaFile) of + true -> Schema = cuttlefish_schema:files([SchemaFile]), Conf = cuttlefish_conf:file(ConfFile), LogFun = fun(Key, Value) -> ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]) end, cuttlefish_generator:map(Schema, Conf, undefined, LogFun); - {false, false} -> - error({config_not_found, {ConfigFile, ConfFile, SchemaFile}}) + false -> + error({schema_not_found, SchemaFile}) end. apply_configs([]) -> diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 8ece333b0..7146feb74 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -194,6 +194,11 @@ delete_key(Key) -> ok end. +%% micro-optimization: no need to lookup when topic is not wildcard +%% because we only insert wildcards to emqx_trie +lookup_topic(_Topic, false) -> []; +lookup_topic(Topic, true) -> lookup_topic(Topic). + lookup_topic(Topic) when is_binary(Topic) -> case ets:lookup(?TRIE, ?TOPIC(Topic)) of [#?TRIE{count = C}] -> [Topic || C > 0]; @@ -219,15 +224,22 @@ do_match(Words) -> do_match(Words, empty). do_match(Words, Prefix) -> - match(is_compact(), Words, Prefix, []). + case is_compact() of + true -> match_compact(Words, Prefix, false, []); + false -> match_no_compact(Words, Prefix, false, []) + end. -match(_IsCompact, [], Topic, Acc) -> - 'match_#'(Topic) ++ %% try match foo/bar/# - lookup_topic(Topic) ++ %% try match foo/bar +match_no_compact([], Topic, IsWildcard, Acc) -> + 'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/# + lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+ Acc; -match(IsCompact, [Word | Words], Prefix, Acc0) -> - case {has_prefix(Prefix), IsCompact} of - {false, false} -> +match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) -> + case has_prefix(Prefix) of + true -> + Acc1 = 'match_#'(Prefix) ++ Acc0, + Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1), + match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc); + false -> %% non-compact paths in database %% if there is no prefix matches the current topic prefix %% we can simpliy return from here @@ -240,21 +252,24 @@ match(IsCompact, [Word | Words], Prefix, Acc0) -> %% then at the second level, we lookup prefix a/x, %% no such prefix to be found, meaning there is no point %% searching for 'a/x/y', 'a/x/+' or 'a/x/#' - Acc0; - _ -> - %% compact paths in database - %% we have to enumerate all possible prefixes - %% e.g. a/+/b/# results with below entries in database - %% - a/+ - %% - a/+/b/# - %% when matching a/x/y, we need to enumerate - %% - a - %% - a/x - %% - a/x/y - %% *with '+', '#' replaced at each level - Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match(IsCompact, Words, join(Prefix, '+'), Acc1), - match(IsCompact, Words, join(Prefix, Word), Acc) + Acc0 + end. + +match_compact([], Topic, IsWildcard, Acc) -> + 'match_#'(Topic) ++ %% try match foo/bar/# + lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar + Acc; +match_compact([Word | Words], Prefix, IsWildcard, Acc0) -> + Acc1 = 'match_#'(Prefix) ++ Acc0, + Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1), + WildcardPrefix = join(Prefix, '+'), + %% go deeper to match current_prefix/+ only when: + %% 1. current word is the last + %% OR + %% 2. there is a prefix = 'current_prefix/+' + case Words =:= [] orelse has_prefix(WildcardPrefix) of + true -> match_compact(Words, WildcardPrefix, true, Acc); + false -> Acc end. 'match_#'(Prefix) -> diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index b8e0f9066..0cfba4737 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(CM, emqx_cm). -define(ChanInfo,#{conninfo => @@ -179,6 +180,18 @@ t_discard_session(_) -> ok = emqx_cm:unregister_channel(<<"clientid">>), ok = meck:unload(emqx_connection). +t_discard_session_race(_) -> + ok = snabbkaffe:start_trace(), + #{conninfo := ConnInfo0} = ?ChanInfo, + ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection}, + {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end), + ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo), + Pid ! stop, + receive {'DOWN', Ref, process, Pid, normal} -> ok end, + ok = emqx_cm:discard_session(<<"clientid">>), + {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000), + snabbkaffe:stop(). + t_takeover_session(_) -> #{conninfo := ConnInfo} = ?ChanInfo, {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), diff --git a/test/emqx_mqtt_SUITE.erl b/test/emqx_mqtt_SUITE.erl index cb6174712..c86d6334a 100644 --- a/test/emqx_mqtt_SUITE.erl +++ b/test/emqx_mqtt_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg, recv_oct, recv_cnt, send_oct, send_cnt, @@ -38,6 +39,19 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +init_per_testcase(TestCase, Config) -> + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase(init, Config); + false -> Config + end. + +end_per_testcase(TestCase, Config) -> + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase('end', Config); + false -> ok + end, + Config. + t_conn_stats(_) -> with_client(fun(CPid) -> Stats = emqx_connection:stats(CPid), @@ -134,3 +148,41 @@ with_client(TestFun, _Options) -> emqtt:stop(C) end. +t_async_set_keepalive(init, Config) -> + ok = snabbkaffe:start_trace(), + Config; +t_async_set_keepalive('end', _Config) -> + snabbkaffe:stop(), + ok. + +t_async_set_keepalive(_) -> + ClientID = <<"client-tcp-keepalive">>, + {ok, Client} = emqtt:start_link([{host, "localhost"}, + {proto_ver,v5}, + {clientid, ClientID}, + {clean_start, false}]), + {ok, _} = emqtt:connect(Client), + {ok, _} = ?block_until(#{?snk_kind := insert_channel_info, + client_id := ClientID}, 2000, 100), + [Pid] = emqx_cm:lookup_channels(ClientID), + State = emqx_connection:get_state(Pid), + Transport = maps:get(transport, State), + Socket = maps:get(socket, State), + ?assert(is_port(Socket)), + Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}], + {ok, [ {raw, 6, 4, <>} + , {raw, 6, 5, <>} + , {raw, 6, 6, <>} + ]} = Transport:getopts(Socket, Opts), + ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]), + emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1), + {ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000), + {ok, [ {raw, 6, 4, <>} + , {raw, 6, 5, <>} + , {raw, 6, 6, <>} + ]} = Transport:getopts(Socket, Opts), + ?assertEqual(NewIdle, Idle + 1), + ?assertEqual(NewInterval, Interval + 1), + ?assertEqual(NewProbes, Probes + 1), + emqtt:stop(Client), + ok. diff --git a/test/emqx_plugins_SUITE.erl b/test/emqx_plugins_SUITE.erl index 401cf87dd..6d8847f43 100644 --- a/test/emqx_plugins_SUITE.erl +++ b/test/emqx_plugins_SUITE.erl @@ -30,24 +30,20 @@ init_per_suite(Config) -> DataPath = proplists:get_value(data_dir, Config), AppPath = filename:join([DataPath, "emqx_mini_plugin"]), - Cmd = lists:flatten(io_lib:format("cd ~s && make && cp -r etc _build/default/lib/emqx_mini_plugin/", [AppPath])), + Cmd = lists:flatten(io_lib:format("cd ~s && make", [AppPath])), ct:pal("Executing ~s~n", [Cmd]), ct:pal("~n ~s~n", [os:cmd(Cmd)]), - code:add_path(filename:join([AppPath, "_build", "default", "lib", "emqx_mini_plugin", "ebin"])), - put(loaded_file, filename:join([DataPath, "loaded_plugins"])), emqx_ct_helpers:boot_modules([]), - emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1), + emqx_ct_helpers:start_apps([], fun(_) -> set_special_cfg(DataPath) end), Config. - -set_sepecial_cfg(_) -> - ExpandPath = filename:dirname(code:lib_dir(emqx_mini_plugin)), +set_special_cfg(PluginsDir) -> application:set_env(emqx, plugins_loaded_file, get(loaded_file)), - application:set_env(emqx, expand_plugins_dir, ExpandPath), + application:set_env(emqx, expand_plugins_dir, PluginsDir), ok. end_per_suite(_Config) -> @@ -58,7 +54,6 @@ t_load(_) -> ?assertEqual(ok, emqx_plugins:unload()), ?assertEqual({error, not_found}, emqx_plugins:load(not_existed_plugin)), - ?assertEqual({error, parse_config_file_failed}, emqx_plugins:load(emqx_mini_plugin)), ?assertEqual({error, not_started}, emqx_plugins:unload(emqx_mini_plugin)), application:set_env(emqx, expand_plugins_dir, undefined), @@ -75,8 +70,9 @@ t_init_config(_) -> file:delete(ConfFile), ?assertEqual({ok,test}, application:get_env(emqx_mini_plugin, mininame)). -t_load_expand_plugin(_) -> - ?assertEqual({error, load_app_fail}, emqx_plugins:load_expand_plugin("./not_existed_path/")). +t_load_ext_plugin(_) -> + ?assertError({plugin_app_file_not_found, _}, + emqx_plugins:load_ext_plugin("./not_existed_path/")). t_list(_) -> ?assertMatch([{plugin, _, _, _, _, _, _, _} | _ ], emqx_plugins:list()). diff --git a/test/emqx_plugins_SUITE_data/emqx_mini_plugin/Makefile b/test/emqx_plugins_SUITE_data/emqx_mini_plugin/Makefile index ad02951a3..fd38ff640 100644 --- a/test/emqx_plugins_SUITE_data/emqx_mini_plugin/Makefile +++ b/test/emqx_plugins_SUITE_data/emqx_mini_plugin/Makefile @@ -8,6 +8,7 @@ all: compile compile: $(REBAR) compile + cp -r _build/default/lib/emqx_mini_plugin/ebin ./ clean: distclean @@ -22,14 +23,4 @@ xref: distclean: @rm -rf _build - @rm -f data/app.*.config data/vm.*.args rebar.lock - -CUTTLEFISH_SCRIPT = _build/default/lib/cuttlefish/cuttlefish - -$(CUTTLEFISH_SCRIPT): - @${REBAR} get-deps - @if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi - -app.config: $(CUTTLEFISH_SCRIPT) etc/emqx_mini_plugin.conf - $(verbose) $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/emqx_mini_plugin.conf -i priv/emqx_mini_plugin.schema -d data - + @rm -f ebin/ data/app.*.config data/vm.*.args rebar.lock diff --git a/test/emqx_plugins_SUITE_data/emqx_mini_plugin/rebar.config b/test/emqx_plugins_SUITE_data/emqx_mini_plugin/rebar.config index 0bc2e3d93..4c49da1dc 100644 --- a/test/emqx_plugins_SUITE_data/emqx_mini_plugin/rebar.config +++ b/test/emqx_plugins_SUITE_data/emqx_mini_plugin/rebar.config @@ -1,5 +1,4 @@ -{deps, - []}. +{deps, []}. {edoc_opts, [{preprocess, true}]}. {erl_opts, [warn_unused_vars, @@ -15,3 +14,10 @@ {cover_enabled, true}. {cover_opts, [verbose]}. {cover_export_enabled, true}. + +{profiles, + [{test, [ + {deps, [ {emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.4"}}} + ]} + ]} +]}. diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 7516b58a0..7ae23b4c6 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -102,10 +102,13 @@ t_match2(_) -> ?assertEqual([], ?TRIE:match(<<"$SYS/broker/zenmq">>)). t_match3(_) -> - Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>], + Topics = [<<"d/#">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>], trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end), Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]), - ?assertEqual(4, length(Matched)), + case length(Matched) of + 3 -> ok; + _ -> error({unexpected, Matched}) + end, SysMatched = emqx_trie:match(<<"$SYS/a/b/c">>), ?assertEqual([<<"$SYS/#">>], SysMatched). @@ -114,6 +117,26 @@ t_match4(_) -> trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end), ?assertEqual([<<"/#">>, <<"/+/a/b/c">>], lists:sort(emqx_trie:match(<<"/0/a/b/c">>))). +t_match5(_) -> + T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, + Topics = [<<"#">>, <>, <>], + trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end), + ?assertEqual([<<"#">>, <>], lists:sort(emqx_trie:match(T))), + ?assertEqual([<<"#">>, <>, <>], + lists:sort(emqx_trie:match(<>))). + +t_match6(_) -> + T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, + W = <<"+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/#">>, + trans(fun() -> emqx_trie:insert(W) end), + ?assertEqual([W], emqx_trie:match(T)). + +t_match7(_) -> + T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, + W = <<"a/+/c/+/e/+/g/+/i/+/k/+/m/+/o/+/q/+/s/+/u/+/w/+/y/+/#">>, + trans(fun() -> emqx_trie:insert(W) end), + ?assertEqual([W], emqx_trie:match(T)). + t_empty(_) -> ?assert(?TRIE:empty()), trans(fun ?TRIE:insert/1, [<<"topic/x/#">>]),