diff --git a/.gitignore b/.gitignore index 16369a576..aaec950d4 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ xrefr erlang.mk *.coverdata etc/emqx.conf.rendered +Mnesia.*/ diff --git a/Makefile b/Makefile index 14790e0e9..bf1da5bc5 100644 --- a/Makefile +++ b/Makefile @@ -3,16 +3,43 @@ REBAR_GIT_CLONE_OPTIONS += --depth 1 export REBAR_GIT_CLONE_OPTIONS - SUITES_FILES := $(shell find test -name '*_SUITE.erl') CT_SUITES := $(foreach value,$(SUITES_FILES),$(shell val=$$(basename $(value) .erl); echo $${val%_*})) CT_NODE_NAME = emqxct@127.0.0.1 -.PHONY: cover -run: - @echo $(CT_TEST_SUITES) +RUN_NODE_NAME = emqxdebug@127.0.0.1 + +.PHONY: run +run: run_setup + @rebar3 as test get-deps + @rebar3 as test auto --name $(RUN_NODE_NAME) --script test/run_emqx.escript + +.PHONY: run_setup +run_setup: + @erl -noshell -eval \ + "{ok, [[HOME]]} = init:get_argument(home), \ + FilePath = HOME ++ \"/.config/rebar3/rebar.config\", \ + case file:consult(FilePath) of \ + {ok, Term} -> \ + NewTerm = case lists:keyfind(plugins, 1, Term) of \ + false -> [{plugins, [rebar3_auto]} | Term]; \ + {plugins, OldPlugins} -> \ + NewPlugins0 = OldPlugins -- [rebar3_auto], \ + NewPlugins = [rebar3_auto | NewPlugins0], \ + lists:keyreplace(plugins, 1, Term, {plugins, NewPlugins}) \ + end, \ + ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]); \ + _ -> \ + NewTerm=[{plugins, [rebar3_auto]}], \ + ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]) \ + end, \ + halt(0)." + +.PHONY: shell +shell: + @rebar3 as test auto compile: @rebar3 compile @@ -89,5 +116,6 @@ gen-clean: .PHONY: distclean distclean: gen-clean + @rm -rf Mnesia.* @rm -rf _build cover deps logs log data @rm -f rebar.lock compile_commands.json cuttlefish erl_crash.dump diff --git a/include/emqx.hrl b/include/emqx.hrl index fba079b91..e206ad51d 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -132,7 +132,8 @@ descr :: string(), vendor :: string(), active = false :: boolean(), - info :: map() + info :: map(), + type :: atom() }). %%-------------------------------------------------------------------- diff --git a/rebar.config b/rebar.config index bd017de97..22fbccaf9 100644 --- a/rebar.config +++ b/rebar.config @@ -1,12 +1,12 @@ {deps, - [ {jsx, "2.9.0"} % hex - , {cowboy, "2.6.1"} % hex - , {gproc, "0.8.0"} % hex - , {ekka, "0.5.6"} % hex - , {replayq, "0.1.1"} %hex - , {esockd, "5.5.0"} %hex - , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}} - , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} + [{jsx, "2.9.0"}, % hex + {cowboy, "2.6.1"}, % hex + {gproc, "0.8.0"}, % hex + {ekka, "0.5.6"}, % hex + {replayq, "0.1.1"}, %hex + {esockd, "5.5.0"}, %hex + {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}, + {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. {edoc_opts, [{preprocess, true}]}. @@ -29,9 +29,9 @@ {profiles, [{test, [{deps, - [ {meck, "0.8.13"} % hex - , {bbmustache, "1.7.0"} % hex - , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}} + [{meck, "0.8.13"}, % hex + {bbmustache, "1.7.0"}, % hex + {emqx_ct_helpers, "1.1.3"} % hex ]} ]} ]}. diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index d6aab26d4..7d849f493 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -29,7 +29,10 @@ -> {ok, emqx_types:credentials()} | {error, term()}). authenticate(Credentials) -> case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of - #{auth_result := success} = NewCredentials -> + #{auth_result := success, anonymous := true} = NewCredentials -> + emqx_metrics:inc('auth.mqtt.anonymous'), + {ok, NewCredentials}; + #{auth_result := success} = NewCredentials -> {ok, NewCredentials}; NewCredentials -> {error, maps:get(auth_result, NewCredentials, unknown_error)} diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index 51e112779..a7654e32c 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -98,7 +98,7 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> ?LOG(warning, "~p set", [Alarm]), case encode_alarm(Alarm) of {ok, Json} -> - emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json)); + emqx_broker:safe_publish(alarm_msg(topic(alert), Json)); {error, Reason} -> ?LOG(error, "Failed to encode alarm: ~p", [Reason]) end, @@ -106,7 +106,12 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> {ok, State}; handle_event({clear_alarm, AlarmId}, State) -> ?LOG(notice, "~p clear", [AlarmId]), - emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)), + case encode_alarm({AlarmId, undefined}) of + {ok, Json} -> + emqx_broker:safe_publish(alarm_msg(topic(clear), Json)); + {error, Reason} -> + ?LOG(error, "Failed to encode alarm: ~p", [Reason]) + end, clear_alarm_(AlarmId), {ok, State}; handle_event(_, State) -> @@ -142,19 +147,21 @@ encode_alarm({AlarmId, #alarm{severity = Severity, {title, iolist_to_binary(Title)}, {summary, iolist_to_binary(Summary)}, {ts, emqx_time:now_secs(Ts)}]}]); +encode_alarm({AlarmId, undefined}) -> + emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]); encode_alarm({AlarmId, AlarmDesc}) -> - emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, - {desc, maybe_to_binary(AlarmDesc)}]). + emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, + {description, maybe_to_binary(AlarmDesc)}]). alarm_msg(Topic, Payload) -> Msg = emqx_message:make(?MODULE, Topic, Payload), emqx_message:set_headers(#{'Content-Type' => <<"application/json">>}, emqx_message:set_flag(sys, Msg)). -topic(alert, AlarmId) -> - emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); -topic(clear, AlarmId) -> - emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>). +topic(alert) -> + emqx_topic:systop(<<"alarms/alert">>); +topic(clear) -> + emqx_topic:systop(<<"alarms/clear">>). maybe_to_binary(Data) when is_binary(Data) -> Data; diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index fdf66a493..fc04ab597 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -132,6 +132,10 @@ {counter, 'messages.forward'} % Messages forward ]). +-define(MQTT_METRICS, [ + {counter, 'auth.mqtt.anonymous'} +]). + -record(state, {next_idx = 1}). -record(metric, {name, type, idx}). @@ -355,7 +359,7 @@ init([]) -> Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)}, true = ets:insert(?TAB, Metric), ok = counters:put(CRef, Idx, 0) - end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS), + end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS), {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}. handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) -> @@ -446,4 +450,5 @@ reserved_idx('messages.retained') -> 48; reserved_idx('messages.dropped') -> 49; reserved_idx('messages.expired') -> 50; reserved_idx('messages.forward') -> 51; +reserved_idx('auth.mqtt.anonymous') -> 52; reserved_idx(_) -> undefined. diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index b88fce472..cedb5c5b0 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -151,20 +151,20 @@ stop_plugins(Names) -> -spec(list() -> [emqx_types:plugin()]). list() -> StartedApps = names(started_app), - lists:map(fun({Name, _, _}) -> - Plugin = plugin(Name), + lists:map(fun({Name, _, [Type| _]}) -> + Plugin = plugin(Name, Type), case lists:member(Name, StartedApps) of true -> Plugin#plugin{active = true}; false -> Plugin end end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))). -plugin(AppName) -> +plugin(AppName, Type) -> case application:get_all_key(AppName) of {ok, Attrs} -> Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), - #plugin{name = AppName, version = Ver, descr = Descr}; + #plugin{name = AppName, version = Ver, descr = Descr, type = plugin_type(Type)}; undefined -> error({plugin_not_found, AppName}) end. @@ -316,3 +316,10 @@ write_loaded(AppNames) -> ?LOG(error, "Open File ~p Error: ~p", [File, Error]), {error, Error} end. + +plugin_type(auth) -> auth; +plugin_type(protocol) -> protocol; +plugin_type(backend) -> backend; +plugin_type(bridge) -> bridge; +plugin_type(_) -> feature. + diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index a97cff326..771883be2 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -221,7 +221,11 @@ websocket_handle(Frame, State) {ok, ensure_stats_timer(State)}; websocket_handle({FrameType, _}, State) when FrameType =:= ping; FrameType =:= pong -> - {ok, ensure_stats_timer(State)}. + {ok, ensure_stats_timer(State)}; +%% According to mqtt spec[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901285] +websocket_handle({_OtherFrameType, _}, State) -> + ?LOG(error, "Frame error: Other type of data frame"), + shutdown(other_frame_type, State). websocket_info({call, From, info}, State) -> gen_server:reply(From, info(State)), diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index 25a8303e6..1e7223320 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -62,8 +62,8 @@ t_alarm_handler(_) -> {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), - Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>), - Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>), + Topic1 = emqx_topic:systop(<<"alarms/alert">>), + Topic2 = emqx_topic:systop(<<"alarms/clear">>), SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}, emqx_client_sock:send(Sock, raw_send_serialize( diff --git a/test/emqx_ws_channel_SUITE.erl b/test/emqx_ws_channel_SUITE.erl index 29f02e653..ada11a95a 100644 --- a/test/emqx_ws_channel_SUITE.erl +++ b/test/emqx_ws_channel_SUITE.erl @@ -31,6 +31,7 @@ all() -> [ t_ws_connect_api , t_ws_auth_failure + , t_ws_other_type_frame ]. init_per_suite(Config) -> @@ -71,6 +72,18 @@ t_ws_connect_api(_Config) -> {close, _} = rfc6455_client:close(WS), ok. +t_ws_other_type_frame(_Config) -> + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + ok = rfc6455_client:send_binary(WS, raw_send_serialize(?CLIENT)), + {binary, Bin} = rfc6455_client:recv(WS), + Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), + {ok, Connack, <<>>, _} = raw_recv_pase(Bin), + rfc6455_client:send(WS, <<"testdata">>), + timer:sleep(1000), + ?assertEqual(undefined, erlang:process_info(WS)), + ok. + raw_send_serialize(Packet) -> emqx_frame:serialize(Packet). diff --git a/test/run_emqx.escript b/test/run_emqx.escript new file mode 100644 index 000000000..e8d1e2aae --- /dev/null +++ b/test/run_emqx.escript @@ -0,0 +1,13 @@ +#!/usr/bin/env escript + +main(_) -> + start(). + +start() -> + SpecEmqxConfig = fun(_) -> ok end, + start(SpecEmqxConfig). + +start(SpecEmqxConfig) -> + SchemaPath = filename:join(["priv", "emqx.schema"]), + ConfPath = filename:join(["etc", "emqx.conf"]), + emqx_ct_helpers:start_app(emqx, SchemaPath, ConfPath, SpecEmqxConfig).