diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index c2ee5ab95..d549e2ccb 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -59,15 +59,32 @@ %% structured logging -define(SLOG(Level, Data), - %% check 'allow' here, only evaluate Data when necessary - case logger:allow(Level, ?MODULE) of - true -> - logger:log(Level, (Data), #{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} - , line => ?LINE - }); - false -> - ok - end). + ?SLOG(Level, Data, #{})). + +%% structured logging, meta is for handler's filter. +-define(SLOG(Level, Data, Meta), +%% check 'allow' here, only evaluate Data and Meta when necessary + case logger:allow(Level, ?MODULE) of + true -> + logger:log(Level, (Data), (Meta#{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} + , line => ?LINE + })); + false -> + ok + end). + +-define(TRACE_FILTER, emqx_trace_filter). + +%% Only evaluate when necessary +-define(TRACE(Event, Msg, Meta), + begin + case persistent_term:get(?TRACE_FILTER, undefined) of + undefined -> ok; + [] -> ok; + List -> + emqx_trace:log(List, Event, Msg, Meta) + end + end). %% print to 'user' group leader -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)). diff --git a/apps/emqx/src/emqx_authentication_config.erl b/apps/emqx/src/emqx_authentication_config.erl index 795dd060e..9767a2265 100644 --- a/apps/emqx/src/emqx_authentication_config.erl +++ b/apps/emqx/src/emqx_authentication_config.erl @@ -187,7 +187,7 @@ convert_certs(CertsDir, Config) -> {ok, SSL} -> new_ssl_config(Config, SSL); {error, Reason} -> - ?SLOG(error, Reason#{msg => bad_ssl_config}), + ?SLOG(error, Reason#{msg => "bad_ssl_config"}), throw({bad_ssl_config, Reason}) end. @@ -199,7 +199,7 @@ convert_certs(CertsDir, NewConfig, OldConfig) -> ok = emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL1, OldSSL), new_ssl_config(NewConfig, NewSSL1); {error, Reason} -> - ?SLOG(error, Reason#{msg => bad_ssl_config}), + ?SLOG(error, Reason#{msg => "bad_ssl_config"}), throw({bad_ssl_config, Reason}) end. diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index a82ab9b45..9dbfb0b43 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -204,9 +204,9 @@ publish(Msg) when is_record(Msg, message) -> _ = emqx_trace:publish(Msg), emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of - #message{headers = #{allow_publish := false}} -> - ?SLOG(debug, #{msg => "message_not_published", - payload => emqx_message:to_log_map(Msg)}), + #message{headers = #{allow_publish := false}, topic = Topic} -> + ?TRACE("MQTT", "msg_publish_not_allowed", #{message => emqx_message:to_log_map(Msg), + topic => Topic}), []; Msg1 = #message{topic = Topic} -> emqx_persistent_session:persist_message(Msg1), @@ -226,7 +226,9 @@ safe_publish(Msg) when is_record(Msg, message) -> reason => Reason, payload => emqx_message:to_log_map(Msg), stacktrace => Stk - }), + }, + #{topic => Msg#message.topic} + ), [] end. @@ -280,7 +282,7 @@ forward(Node, To, Delivery, async) -> msg => "async_forward_msg_to_node_failed", node => Node, reason => Reason - }), + }, #{topic => To}), {error, badrpc} end; @@ -291,7 +293,7 @@ forward(Node, To, Delivery, sync) -> msg => "sync_forward_msg_to_node_failed", node => Node, reason => Reason - }), + }, #{topic => To}), {error, badrpc}; Result -> emqx_metrics:inc('messages.forward'), Result diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index eb71aca58..290299aee 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -292,7 +292,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) -> fun check_banned/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), + ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}), NChannel1 = NChannel#channel{ will_msg = emqx_packet:will_msg(NConnPkt), alias_maximum = init_alias_maximum(NConnPkt, ClientInfo) @@ -550,9 +550,8 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> ?SLOG(warning, #{ msg => "cannot_publish_to_topic", - topic => Topic, reason => emqx_reason_codes:name(Rc) - }), + }, #{topic => Topic}), case emqx:get_config([authorization, deny_action], ignore) of ignore -> case QoS of @@ -568,9 +567,8 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> {error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} -> ?SLOG(warning, #{ msg => "cannot_publish_to_topic", - topic => Topic, reason => emqx_reason_codes:name(Rc) - }), + }, #{topic => Topic}), case QoS of ?QOS_0 -> ok = emqx_metrics:inc('packets.publish.dropped'), @@ -585,7 +583,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> msg => "cannot_publish_to_topic", topic => Topic, reason => emqx_reason_codes:name(Rc) - }), + }, #{topic => Topic}), handle_out(disconnect, Rc, NChannel) end. @@ -635,7 +633,7 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, msg => "dropped_qos2_packet", reason => emqx_reason_codes:name(RC), packet_id => PacketId - }), + }, #{topic => Msg#message.topic}), ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(pubrec, {PacketId, RC}, Channel) end. @@ -687,7 +685,7 @@ process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Ac ?SLOG(warning, #{ msg => "cannot_subscribe_topic_filter", reason => emqx_reason_codes:name(ReasonCode) - }), + }, #{topic => TopicFilter}), process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc]) end. @@ -703,7 +701,7 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = ?SLOG(warning, #{ msg => "cannot_subscribe_topic_filter", reason => emqx_reason_codes:text(RC) - }), + }, #{topic => NTopicFilter}), {RC, Channel} end. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 162cff2e0..eae8dd43d 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -375,7 +375,7 @@ discard_session(ClientId) when is_binary(ClientId) -> -spec kick_or_kill(kick | discard, module(), pid()) -> ok. kick_or_kill(Action, ConnMod, Pid) -> try - %% this is essentailly a gen_server:call implemented in emqx_connection + %% this is essentially a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) @@ -390,19 +390,12 @@ kick_or_kill(Action, ConnMod, Pid) -> ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); _ : {timeout, {gen_server, call, _}} -> ?tp(warning, "session_kick_timeout", - #{pid => Pid, - action => Action, - stale_channel => stale_channel_info(Pid) - }), + #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}), ok = force_kill(Pid); _ : Error : St -> ?tp(error, "session_kick_exception", - #{pid => Pid, - action => Action, - reason => Error, - stacktrace => St, - stale_channel => stale_channel_info(Pid) - }), + #{pid => Pid, action => Action, reason => Error, stacktrace => St, + stale_channel => stale_channel_info(Pid)}), ok = force_kill(Pid) end. @@ -448,20 +441,22 @@ kick_session(Action, ClientId, ChanPid) -> , action => Action , error => Error , reason => Reason - }) + }, + #{clientid => ClientId}) end. kick_session(ClientId) -> case lookup_channels(ClientId) of [] -> - ?SLOG(warning, #{msg => "kicked_an_unknown_session", - clientid => ClientId}), + ?SLOG(warning, #{msg => "kicked_an_unknown_session"}, + #{clientid => ClientId}), ok; ChanPids -> case length(ChanPids) > 1 of true -> ?SLOG(warning, #{msg => "more_than_one_channel_found", - chan_pids => ChanPids}); + chan_pids => ChanPids}, + #{clientid => ClientId}); false -> ok end, lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids) @@ -478,12 +473,12 @@ with_channel(ClientId, Fun) -> Pids -> Fun(lists:last(Pids)) end. -%% @doc Get all registed channel pids. Debugg/test interface +%% @doc Get all registered channel pids. Debug/test interface all_channels() -> Pat = [{{'_', '$1'}, [], ['$1']}], ets:select(?CHAN_TAB, Pat). -%% @doc Get all registed clientIDs. Debugg/test interface +%% @doc Get all registered clientIDs. Debug/test interface all_client_ids() -> Pat = [{{'$1', '_'}, [], ['$1']}], ets:select(?CHAN_TAB, Pat). @@ -511,7 +506,7 @@ lookup_channels(local, ClientId) -> rpc_call(Node, Fun, Args, Timeout) -> case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of {badrpc, Reason} -> - %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler + %% since emqx app 4.3.10, the 'kick' and 'discard' calls handler %% should catch all exceptions and always return 'ok'. %% This leaves 'badrpc' only possible when there is problem %% calling the remote node. diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index e0e6968f3..7e9e985b8 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -262,7 +262,7 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) -> {ok, RawRichConf} -> init_load(SchemaMod, RawRichConf); {error, Reason} -> - ?SLOG(error, #{msg => failed_to_load_hocon_conf, + ?SLOG(error, #{msg => "failed_to_load_hocon_conf", reason => Reason, pwd => file:get_cwd(), include_dirs => IncDir @@ -397,7 +397,7 @@ save_to_override_conf(RawConf, Opts) -> case file:write_file(FileName, hocon_pp:do(RawConf, #{})) of ok -> ok; {error, Reason} -> - ?SLOG(error, #{msg => failed_to_write_override_file, + ?SLOG(error, #{msg => "failed_to_write_override_file", filename => FileName, reason => Reason}), {error, Reason} diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 6919c6ff8..d334ac23e 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -449,14 +449,12 @@ handle_msg({'$gen_cast', Req}, State) -> {ok, NewState}; handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> - ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => Inet}), Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), when_bytes_in(Oct, Data, State); handle_msg({quic, Data, _Sock, _, _, _}, State) -> - ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => quic}), Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), @@ -528,7 +526,7 @@ handle_msg({connack, ConnAck}, State) -> handle_outgoing(ConnAck, State); handle_msg({close, Reason}, State) -> - ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}), + ?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}), handle_info({sock_closed, Reason}, close_socket(State)); handle_msg({event, connected}, State = #state{channel = Channel}) -> @@ -566,7 +564,8 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport, Channel1 = emqx_channel:set_conn_state(disconnected, Channel), emqx_congestion:cancel_alarms(Socket, Transport, Channel1), emqx_channel:terminate(Reason, Channel1), - close_socket_ok(State) + close_socket_ok(State), + ?TRACE("SOCKET", "tcp_socket_terminated", #{reason => Reason}) catch E : C : S -> ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) @@ -716,7 +715,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) -> ok = inc_incoming_stats(Packet), - ?SLOG(debug, #{msg => "RECV_packet", packet => emqx_packet:format(Packet)}), + ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}), with_channel(handle_in, [Packet], State); handle_incoming(FrameError, State) -> @@ -755,15 +754,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> <<>> -> ?SLOG(warning, #{ msg => "packet_is_discarded", reason => "frame_is_too_large", - packet => emqx_packet:format(Packet) + packet => emqx_packet:format(Packet, hidden) }), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), <<>>; - Data -> ?SLOG(debug, #{ - msg => "SEND_packet", - packet => emqx_packet:format(Packet) - }), + Data -> + ?TRACE("MQTT", "mqtt_packet_sent", #{packet => Packet}), ok = inc_outgoing_stats(Packet), Data catch @@ -875,7 +872,7 @@ check_limiter(Needs, {ok, Limiter2} -> WhenOk(Data, Msgs, State#state{limiter = Limiter2}); {pause, Time, Limiter2} -> - ?SLOG(warning, #{msg => "pause time dueto rate limit", + ?SLOG(warning, #{msg => "pause_time_dueto_rate_limit", needs => Needs, time_in_ms => Time}), @@ -915,7 +912,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> , limiter_timer = undefined }); {pause, Time, Limiter2} -> - ?SLOG(warning, #{msg => "pause time dueto rate limit", + ?SLOG(warning, #{msg => "pause_time_dueto_rate_limit", types => Types, time_in_ms => Time}), diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 600144adc..b34819e53 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -118,11 +118,10 @@ handle_cast({detected, #flapping{clientid = ClientId, true -> %% Flapping happened:( ?SLOG(warning, #{ msg => "flapping_detected", - client_id => ClientId, peer_host => fmt_host(PeerHost), detect_cnt => DetectCnt, wind_time_in_ms => WindTime - }), + }, #{clientid => ClientId}), Now = erlang:system_time(second), Banned = #banned{who = {clientid, ClientId}, by = <<"flapping detector">>, @@ -134,11 +133,10 @@ handle_cast({detected, #flapping{clientid = ClientId, false -> ?SLOG(warning, #{ msg => "client_disconnected", - client_id => ClientId, peer_host => fmt_host(PeerHost), detect_cnt => DetectCnt, interval => Interval - }) + }, #{clientid => ClientId}) end, {noreply, State}; diff --git a/apps/emqx/src/emqx_logger.erl b/apps/emqx/src/emqx_logger.erl index 79ac5e6b8..66274a711 100644 --- a/apps/emqx/src/emqx_logger.erl +++ b/apps/emqx/src/emqx_logger.erl @@ -197,15 +197,7 @@ critical(Metadata, Format, Args) when is_map(Metadata) -> set_metadata_clientid(<<>>) -> ok; set_metadata_clientid(ClientId) -> - try - %% try put string format client-id metadata so - %% so the log is not like <<"...">> - Id = unicode:characters_to_list(ClientId, utf8), - set_proc_metadata(#{clientid => Id}) - catch - _: _-> - ok - end. + set_proc_metadata(#{clientid => ClientId}). -spec(set_metadata_peername(peername_str()) -> ok). set_metadata_peername(Peername) -> diff --git a/apps/emqx/src/emqx_logger_textfmt.erl b/apps/emqx/src/emqx_logger_textfmt.erl index 986c0fd8a..4e7dbcf14 100644 --- a/apps/emqx/src/emqx_logger_textfmt.erl +++ b/apps/emqx/src/emqx_logger_textfmt.erl @@ -18,22 +18,77 @@ -export([format/2]). -export([check_config/1]). +-export([try_format_unicode/1]). check_config(X) -> logger_formatter:check_config(X). -format(#{msg := {report, Report}, meta := Meta} = Event, Config) when is_map(Report) -> - logger_formatter:format(Event#{msg := {report, enrich(Report, Meta)}}, Config); -format(#{msg := Msg, meta := Meta} = Event, Config) -> - NewMsg = enrich_fmt(Msg, Meta), - logger_formatter:format(Event#{msg := NewMsg}, Config). +format(#{msg := {report, Report0}, meta := Meta} = Event, Config) when is_map(Report0) -> + Report1 = enrich_report_mfa(Report0, Meta), + Report2 = enrich_report_clientid(Report1, Meta), + Report3 = enrich_report_peername(Report2, Meta), + Report4 = enrich_report_topic(Report3, Meta), + logger_formatter:format(Event#{msg := {report, Report4}}, Config); +format(#{msg := {string, String}} = Event, Config) -> + format(Event#{msg => {"~ts ", String}}, Config); +format(#{msg := Msg0, meta := Meta} = Event, Config) -> + Msg1 = enrich_client_info(Msg0, Meta), + Msg2 = enrich_mfa(Msg1, Meta), + Msg3 = enrich_topic(Msg2, Meta), + logger_formatter:format(Event#{msg := Msg3}, Config). -enrich(Report, #{mfa := Mfa, line := Line}) -> +try_format_unicode(Char) -> + List = + try + case unicode:characters_to_list(Char) of + {error, _, _} -> error; + {incomplete, _, _} -> error; + Binary -> Binary + end + catch _:_ -> + error + end, + case List of + error -> io_lib:format("~0p", [Char]); + _ -> List + end. + +enrich_report_mfa(Report, #{mfa := Mfa, line := Line}) -> Report#{mfa => mfa(Mfa), line => Line}; -enrich(Report, _) -> Report. +enrich_report_mfa(Report, _) -> Report. -enrich_fmt({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) -> +enrich_report_clientid(Report, #{clientid := ClientId}) -> + Report#{clientid => try_format_unicode(ClientId)}; +enrich_report_clientid(Report, _) -> Report. + +enrich_report_peername(Report, #{peername := Peername}) -> + Report#{peername => Peername}; +enrich_report_peername(Report, _) -> Report. + +%% clientid and peername always in emqx_conn's process metadata. +%% topic can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2 +enrich_report_topic(Report, #{topic := Topic}) -> + Report#{topic => try_format_unicode(Topic)}; +enrich_report_topic(Report = #{topic := Topic}, _) -> + Report#{topic => try_format_unicode(Topic)}; +enrich_report_topic(Report, _) -> Report. + +enrich_mfa({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) -> {Fmt ++ " mfa: ~ts line: ~w", Args ++ [mfa(Mfa), Line]}; -enrich_fmt(Msg, _) -> +enrich_mfa(Msg, _) -> + Msg. + +enrich_client_info({Fmt, Args}, #{clientid := ClientId, peername := Peer}) when is_list(Fmt) -> + {" ~ts@~ts " ++ Fmt, [ClientId, Peer | Args] }; +enrich_client_info({Fmt, Args}, #{clientid := ClientId}) when is_list(Fmt) -> + {" ~ts " ++ Fmt, [ClientId | Args]}; +enrich_client_info({Fmt, Args}, #{peername := Peer}) when is_list(Fmt) -> + {" ~ts " ++ Fmt, [Peer | Args]}; +enrich_client_info(Msg, _) -> + Msg. + +enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) -> + {" topic: ~ts" ++ Fmt, [Topic | Args]}; +enrich_topic(Msg, _) -> Msg. mfa({M, F, A}) -> atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A). diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 60835d4ab..23b8390e5 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -44,7 +44,11 @@ , will_msg/1 ]). --export([format/1]). +-export([ format/1 + , format/2 + ]). + +-export([encode_hex/1]). -define(TYPE_NAMES, { 'CONNECT' @@ -435,25 +439,28 @@ will_msg(#mqtt_packet_connect{clientid = ClientId, %% @doc Format packet -spec(format(emqx_types:packet()) -> iolist()). -format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) -> - format_header(Header, format_variable(Variable, Payload)). +format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()). + +%% @doc Format packet +-spec(format(emqx_types:packet(), hex | text | hidden) -> iolist()). +format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) -> + HeaderIO = format_header(Header), + case format_variable(Variable, Payload, PayloadEncode) of + "" -> HeaderIO; + VarIO -> [HeaderIO,",", VarIO] + end. format_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, - retain = Retain}, S) -> - S1 = case S == undefined of - true -> <<>>; - false -> [", ", S] - end, - io_lib:format("~ts(Q~p, R~p, D~p~ts)", [type_name(Type), QoS, i(Retain), i(Dup), S1]). + retain = Retain}) -> + io_lib:format("~ts(Q~p, R~p, D~p)", [type_name(Type), QoS, i(Retain), i(Dup)]). -format_variable(undefined, _) -> - undefined; -format_variable(Variable, undefined) -> - format_variable(Variable); -format_variable(Variable, Payload) -> - io_lib:format("~ts, Payload=~0p", [format_variable(Variable), Payload]). +format_variable(undefined, _, _) -> ""; +format_variable(Variable, undefined, PayloadEncode) -> + format_variable(Variable, PayloadEncode); +format_variable(Variable, Payload, PayloadEncode) -> + [format_variable(Variable, PayloadEncode), format_payload(Payload, PayloadEncode)]. format_variable(#mqtt_packet_connect{ proto_ver = ProtoVer, @@ -467,57 +474,140 @@ format_variable(#mqtt_packet_connect{ will_topic = WillTopic, will_payload = WillPayload, username = Username, - password = Password}) -> - Format = "ClientId=~ts, ProtoName=~ts, ProtoVsn=~p, CleanStart=~ts, KeepAlive=~p, Username=~ts, Password=~ts", - Args = [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)], - {Format1, Args1} = if - WillFlag -> {Format ++ ", Will(Q~p, R~p, Topic=~ts, Payload=~0p)", - Args ++ [WillQoS, i(WillRetain), WillTopic, WillPayload]}; - true -> {Format, Args} - end, - io_lib:format(Format1, Args1); + password = Password}, + PayloadEncode) -> + Base = io_lib:format( + "ClientId=~ts, ProtoName=~ts, ProtoVsn=~p, CleanStart=~ts, KeepAlive=~p, Username=~ts, Password=~ts", + [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)]), + case WillFlag of + true -> + [Base, io_lib:format(", Will(Q~p, R~p, Topic=~ts ", + [WillQoS, i(WillRetain), WillTopic]), + format_payload(WillPayload, PayloadEncode), ")"]; + false -> + Base + end; format_variable(#mqtt_packet_disconnect - {reason_code = ReasonCode}) -> + {reason_code = ReasonCode}, _) -> io_lib:format("ReasonCode=~p", [ReasonCode]); format_variable(#mqtt_packet_connack{ack_flags = AckFlags, - reason_code = ReasonCode}) -> + reason_code = ReasonCode}, _) -> io_lib:format("AckFlags=~p, ReasonCode=~p", [AckFlags, ReasonCode]); format_variable(#mqtt_packet_publish{topic_name = TopicName, - packet_id = PacketId}) -> + packet_id = PacketId}, _) -> io_lib:format("Topic=~ts, PacketId=~p", [TopicName, PacketId]); format_variable(#mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode}) -> + reason_code = ReasonCode}, _) -> io_lib:format("PacketId=~p, ReasonCode=~p", [PacketId, ReasonCode]); format_variable(#mqtt_packet_subscribe{packet_id = PacketId, - topic_filters = TopicFilters}) -> - io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]); + topic_filters = TopicFilters}, _) -> + [io_lib:format("PacketId=~p ", [PacketId]), "TopicFilters=", + format_topic_filters(TopicFilters)]; format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, - topic_filters = Topics}) -> - io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]); + topic_filters = Topics}, _) -> + [io_lib:format("PacketId=~p ", [PacketId]), "TopicFilters=", + format_topic_filters(Topics)]; format_variable(#mqtt_packet_suback{packet_id = PacketId, - reason_codes = ReasonCodes}) -> + reason_codes = ReasonCodes}, _) -> io_lib:format("PacketId=~p, ReasonCodes=~p", [PacketId, ReasonCodes]); -format_variable(#mqtt_packet_unsuback{packet_id = PacketId}) -> +format_variable(#mqtt_packet_unsuback{packet_id = PacketId}, _) -> io_lib:format("PacketId=~p", [PacketId]); -format_variable(#mqtt_packet_auth{reason_code = ReasonCode}) -> +format_variable(#mqtt_packet_auth{reason_code = ReasonCode}, _) -> io_lib:format("ReasonCode=~p", [ReasonCode]); -format_variable(PacketId) when is_integer(PacketId) -> +format_variable(PacketId, _) when is_integer(PacketId) -> io_lib:format("PacketId=~p", [PacketId]). -format_password(undefined) -> undefined; -format_password(_Password) -> '******'. +format_password(undefined) -> "undefined"; +format_password(_Password) -> "******". + +format_payload(Payload, text) -> ["Payload=", io_lib:format("~ts", [Payload])]; +format_payload(Payload, hex) -> ["Payload(hex)=", encode_hex(Payload)]; +format_payload(_, hidden) -> "Payload=******". i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. +format_topic_filters(Filters) -> + ["[", + lists:join(",", + lists:map( + fun({TopicFilter, SubOpts}) -> + io_lib:format("~ts(~p)", [TopicFilter, SubOpts]); + (TopicFilter) -> + io_lib:format("~ts", [TopicFilter]) + end, Filters)), + "]"]. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Hex encoding functions +%% Copy from binary:encode_hex/1 (was only introduced in OTP24). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-define(HEX(X), (hex(X)):16). +-compile({inline,[hex/1]}). +-spec encode_hex(Bin) -> Bin2 when + Bin :: binary(), + Bin2 :: <<_:_*16>>. +encode_hex(Data) when byte_size(Data) rem 8 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 7 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 6 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 5 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 4 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 3 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when byte_size(Data) rem 2 =:= 0 -> + << <> || <> <= Data >>; +encode_hex(Data) when is_binary(Data) -> + << <> || <> <= Data >>; +encode_hex(Bin) -> + erlang:error(badarg, [Bin]). + +hex(X) -> + element( + X+1, {16#3030, 16#3031, 16#3032, 16#3033, 16#3034, 16#3035, 16#3036, 16#3037, 16#3038, 16#3039, 16#3041, + 16#3042, 16#3043, 16#3044, 16#3045, 16#3046, + 16#3130, 16#3131, 16#3132, 16#3133, 16#3134, 16#3135, 16#3136, 16#3137, 16#3138, 16#3139, 16#3141, + 16#3142, 16#3143, 16#3144, 16#3145, 16#3146, + 16#3230, 16#3231, 16#3232, 16#3233, 16#3234, 16#3235, 16#3236, 16#3237, 16#3238, 16#3239, 16#3241, + 16#3242, 16#3243, 16#3244, 16#3245, 16#3246, + 16#3330, 16#3331, 16#3332, 16#3333, 16#3334, 16#3335, 16#3336, 16#3337, 16#3338, 16#3339, 16#3341, + 16#3342, 16#3343, 16#3344, 16#3345, 16#3346, + 16#3430, 16#3431, 16#3432, 16#3433, 16#3434, 16#3435, 16#3436, 16#3437, 16#3438, 16#3439, 16#3441, + 16#3442, 16#3443, 16#3444, 16#3445, 16#3446, + 16#3530, 16#3531, 16#3532, 16#3533, 16#3534, 16#3535, 16#3536, 16#3537, 16#3538, 16#3539, 16#3541, + 16#3542, 16#3543, 16#3544, 16#3545, 16#3546, + 16#3630, 16#3631, 16#3632, 16#3633, 16#3634, 16#3635, 16#3636, 16#3637, 16#3638, 16#3639, 16#3641, + 16#3642, 16#3643, 16#3644, 16#3645, 16#3646, + 16#3730, 16#3731, 16#3732, 16#3733, 16#3734, 16#3735, 16#3736, 16#3737, 16#3738, 16#3739, 16#3741, + 16#3742, 16#3743, 16#3744, 16#3745, 16#3746, + 16#3830, 16#3831, 16#3832, 16#3833, 16#3834, 16#3835, 16#3836, 16#3837, 16#3838, 16#3839, 16#3841, + 16#3842, 16#3843, 16#3844, 16#3845, 16#3846, + 16#3930, 16#3931, 16#3932, 16#3933, 16#3934, 16#3935, 16#3936, 16#3937, 16#3938, 16#3939, 16#3941, + 16#3942, 16#3943, 16#3944, 16#3945, 16#3946, + 16#4130, 16#4131, 16#4132, 16#4133, 16#4134, 16#4135, 16#4136, 16#4137, 16#4138, 16#4139, 16#4141, + 16#4142, 16#4143, 16#4144, 16#4145, 16#4146, + 16#4230, 16#4231, 16#4232, 16#4233, 16#4234, 16#4235, 16#4236, 16#4237, 16#4238, 16#4239, 16#4241, + 16#4242, 16#4243, 16#4244, 16#4245, 16#4246, + 16#4330, 16#4331, 16#4332, 16#4333, 16#4334, 16#4335, 16#4336, 16#4337, 16#4338, 16#4339, 16#4341, + 16#4342, 16#4343, 16#4344, 16#4345, 16#4346, + 16#4430, 16#4431, 16#4432, 16#4433, 16#4434, 16#4435, 16#4436, 16#4437, 16#4438, 16#4439, 16#4441, + 16#4442, 16#4443, 16#4444, 16#4445, 16#4446, + 16#4530, 16#4531, 16#4532, 16#4533, 16#4534, 16#4535, 16#4536, 16#4537, 16#4538, 16#4539, 16#4541, + 16#4542, 16#4543, 16#4544, 16#4545, 16#4546, + 16#4630, 16#4631, 16#4632, 16#4633, 16#4634, 16#4635, 16#4636, 16#4637, 16#4638, 16#4639, 16#4641, + 16#4642, 16#4643, 16#4644, 16#4645, 16#4646}). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index a0bdb5150..fc90ba1bb 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -180,6 +180,12 @@ roots(low) -> , {"latency_stats", sc(ref("latency_stats"), #{})} + , {"trace", + sc(ref("trace"), + #{desc => """ +Real-time filtering logs for the ClientID or Topic or IP for debugging. +""" + })} ]. fields("persistent_session_store") -> @@ -977,6 +983,17 @@ when deactivated, but after the retention time. fields("latency_stats") -> [ {"samples", sc(integer(), #{default => 10, desc => "the number of smaples for calculate the average latency of delivery"})} + ]; +fields("trace") -> + [ {"payload_encode", sc(hoconsc:enum([hex, text, hidden]), #{ + default => text, + desc => """ +Determine the format of the payload format in the trace file.
+`text`: Text-based protocol or plain text protocol. It is recommended when payload is json encode.
+`hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.
+`hidden`: payload is obfuscated as `******` + """ + })} ]. mqtt_listener() -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index bf79085af..1695ed6ce 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -535,16 +535,20 @@ enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> (Dropped =/= undefined) andalso log_dropped(Dropped, Session), Session#session{mqueue = NewQ}. -log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) -> - case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of +log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) -> + Payload = emqx_message:to_log_map(Msg), + #{store_qos0 := StoreQos0} = QueueInfo = emqx_mqueue:info(Q), + case (QoS == ?QOS_0) andalso (not StoreQos0) of true -> ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), ?SLOG(warning, #{msg => "dropped_qos0_msg", - payload => emqx_message:to_log_map(Msg)}); + queue => QueueInfo, + payload => Payload}, #{topic => Topic}); false -> ok = emqx_metrics:inc('delivery.dropped.queue_full'), ?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full", - payload => emqx_message:to_log_map(Msg)}) + queue => QueueInfo, + payload => Payload}, #{topic => Topic}) end. enrich_fun(Session = #session{subscriptions = Subs}) -> diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index aaaedcb12..3d3722c32 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -260,7 +260,7 @@ code_change(_OldVsn, State, _Extra) -> init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) -> case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of {error, What} -> - ?SLOG(error, #{msg => "Could not start resume worker", reason => What}), + ?SLOG(error, #{msg => "failed_to_start_resume_worker", reason => What}), error; {ok, Pid} -> Pmon1 = emqx_pmon:monitor(Pid, Pmon), diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 42e4d0baf..5af0d156e 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -26,6 +26,7 @@ -export([ publish/1 , subscribe/3 , unsubscribe/2 + , log/4 ]). -export([ start_link/0 @@ -36,6 +37,7 @@ , delete/1 , clear/0 , update/2 + , check/0 ]). -export([ format/1 @@ -50,6 +52,7 @@ -define(TRACE, ?MODULE). -define(MAX_SIZE, 30). +-define(OWN_KEYS, [level, filters, filter_default, handlers]). -ifdef(TEST). -export([ log_file/2 @@ -80,27 +83,53 @@ mnesia(boot) -> publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when is_binary(From); is_atom(From) -> - emqx_logger:info( - #{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, - "PUBLISH to ~s: ~0p", - [Topic, Payload] - ). + ?TRACE("PUBLISH", "publish_to", #{topic => Topic, payload => Payload}). subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore; subscribe(Topic, SubId, SubOpts) -> - emqx_logger:info( - #{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, - "~ts SUBSCRIBE ~ts: Options: ~0p", - [SubId, Topic, SubOpts] - ). + ?TRACE("SUBSCRIBE", "subscribe", #{topic => Topic, sub_opts => SubOpts, sub_id => SubId}). unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore; unsubscribe(Topic, SubOpts) -> - emqx_logger:info( - #{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, - "~ts UNSUBSCRIBE ~ts: Options: ~0p", - [maps:get(subid, SubOpts, ""), Topic, SubOpts] - ). + ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). + +log(List, Event, Msg, Meta0) -> + Meta = + case logger:get_process_metadata() of + undefined -> Meta0; + ProcMeta -> maps:merge(ProcMeta, Meta0) + end, + Log = #{level => trace, event => Event, meta => Meta, msg => Msg}, + log_filter(List, Log). + +log_filter([], _Log) -> ok; +log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) -> + case FilterFun(Log0, {Filter, Name}) of + stop -> stop; + ignore -> ignore; + Log -> + case logger_config:get(ets:whereis(logger), Id) of + {ok, #{module := Module} = HandlerConfig0} -> + HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0), + try Module:log(Log, HandlerConfig) + catch C:R:S -> + case logger:remove_handler(Id) of + ok -> + logger:internal_log(error, {removed_failing_handler, Id, C, R, S}); + {error,{not_found,_}} -> + %% Probably already removed by other client + %% Don't report again + ok; + {error,Reason} -> + logger:internal_log(error, + {removed_handler_failed, Id, Reason, C, R, S}) + end + end; + {error, {not_found, Id}} -> ok; + {error, Reason} -> logger:internal_log(error, {find_handle_id_failed, Id, Reason}) + end + end, + log_filter(Rest, Log0). -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> @@ -161,6 +190,9 @@ update(Name, Enable) -> end, transaction(Tran). +check() -> + gen_server:call(?MODULE, check). + -spec get_trace_filename(Name :: binary()) -> {ok, FileName :: string()} | {error, not_found}. get_trace_filename(Name) -> @@ -196,15 +228,17 @@ format(Traces) -> init([]) -> ok = mria:wait_for_tables([?TRACE]), erlang:process_flag(trap_exit, true), - OriginLogLevel = emqx_logger:get_primary_log_level(), ok = filelib:ensure_dir(trace_dir()), ok = filelib:ensure_dir(zip_dir()), {ok, _} = mnesia:subscribe({table, ?TRACE, simple}), Traces = get_enable_trace(), - ok = update_log_primary_level(Traces, OriginLogLevel), TRef = update_trace(Traces), - {ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}. + update_trace_handler(), + {ok, #{timer => TRef, monitors => #{}}}. +handle_call(check, _From, State) -> + {_, NewState} = handle_info({mnesia_table_event, check}, State), + {reply, ok, NewState}; handle_call(Req, _From, State) -> ?SLOG(error, #{unexpected_call => Req}), {reply, ok, State}. @@ -223,11 +257,10 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor lists:foreach(fun file:delete/1, Files), {noreply, State#{monitors => NewMonitors}} end; -handle_info({timeout, TRef, update_trace}, - #{timer := TRef, primary_log_level := OriginLogLevel} = State) -> +handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) -> Traces = get_enable_trace(), - ok = update_log_primary_level(Traces, OriginLogLevel), NextTRef = update_trace(Traces), + update_trace_handler(), {noreply, State#{timer => NextTRef}}; handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) -> @@ -238,11 +271,11 @@ handle_info(Info, State) -> ?SLOG(error, #{unexpected_info => Info}), {noreply, State}. -terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) -> - ok = set_log_primary_level(OriginLogLevel), +terminate(_Reason, #{timer := TRef}) -> _ = mnesia:unsubscribe({table, ?TRACE, simple}), emqx_misc:cancel_timer(TRef), stop_all_trace_handler(), + update_trace_handler(), _ = file:del_dir_r(zip_dir()), ok. @@ -270,7 +303,7 @@ update_trace(Traces) -> disable_finished(Finished), Started = emqx_trace_handler:running(), {NeedRunning, AllStarted} = start_trace(Running, Started), - NeedStop = AllStarted -- NeedRunning, + NeedStop = filter_cli_handler(AllStarted) -- NeedRunning, ok = stop_trace(NeedStop, Started), clean_stale_trace_files(), NextTime = find_closest_time(Traces, Now), @@ -308,10 +341,10 @@ disable_finished(Traces) -> start_trace(Traces, Started0) -> Started = lists:map(fun(#{name := Name}) -> Name end, Started0), - lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) -> + lists:foldl(fun(#?TRACE{name = Name} = Trace, + {Running, StartedAcc}) -> case lists:member(Name, StartedAcc) of - true -> - {[Name | Running], StartedAcc}; + true -> {[Name | Running], StartedAcc}; false -> case start_trace(Trace) of ok -> {[Name | Running], [Name | StartedAcc]}; @@ -330,9 +363,11 @@ start_trace(Trace) -> emqx_trace_handler:install(Who, debug, log_file(Name, Start)). stop_trace(Finished, Started) -> - lists:foreach(fun(#{name := Name, type := Type}) -> + lists:foreach(fun(#{name := Name, type := Type, filter := Filter}) -> case lists:member(Name, Finished) of - true -> emqx_trace_handler:uninstall(Type, Name); + true -> + ?TRACE("API", "trace_stopping", #{Type => Filter}), + emqx_trace_handler:uninstall(Type, Name); false -> ok end end, Started). @@ -419,7 +454,7 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) -> case validate_ip_address(Filter) of ok -> Trace0 = maps:without([type, ip_address], Trace), - to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = Filter}); + to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = binary_to_list(Filter)}); Error -> Error end; to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])}; @@ -481,11 +516,20 @@ transaction(Tran) -> {aborted, Reason} -> {error, Reason} end. -update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel); -update_log_primary_level(_, _) -> set_log_primary_level(debug). - -set_log_primary_level(NewLevel) -> - case NewLevel =/= emqx_logger:get_primary_log_level() of - true -> emqx_logger:set_primary_log_level(NewLevel); - false -> ok +update_trace_handler() -> + case emqx_trace_handler:running() of + [] -> persistent_term:erase(?TRACE_FILTER); + Running -> + List = lists:map(fun(#{id := Id, filter_fun := FilterFun, + filter := Filter, name := Name}) -> + {Id, FilterFun, Filter, Name} end, Running), + case List =/= persistent_term:get(?TRACE_FILTER, undefined) of + true -> persistent_term:put(?TRACE_FILTER, List); + false -> ok + end end. + +filter_cli_handler(Names) -> + lists:filter(fun(Name) -> + nomatch =:= re:run(Name, "^CLI-+.", []) + end, Names). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl new file mode 100644 index 000000000..2ef142d38 --- /dev/null +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -0,0 +1,62 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_trace_formatter). + +-export([format/2]). + +%%%----------------------------------------------------------------- +%%% API +-spec format(LogEvent, Config) -> unicode:chardata() when + LogEvent :: logger:log_event(), + Config :: logger:config(). +format(#{level := trace, event := Event, meta := Meta, msg := Msg}, + #{payload_encode := PEncode}) -> + Time = calendar:system_time_to_rfc3339(erlang:system_time(second)), + ClientId = to_iolist(maps:get(clientid, Meta, "")), + Peername = maps:get(peername, Meta, ""), + MetaBin = format_meta(Meta, PEncode), + [Time, " [", Event, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"]; + +format(Event, Config) -> + emqx_logger_textfmt:format(Event, Config). + +format_meta(Meta0, Encode) -> + Packet = format_packet(maps:get(packet, Meta0, undefined), Encode), + Payload = format_payload(maps:get(payload, Meta0, undefined), Encode), + Meta1 = maps:without([msg, clientid, peername, packet, payload], Meta0), + case Meta1 =:= #{} of + true -> [Packet, Payload]; + false -> [Packet, ", ", map_to_iolist(Meta1), Payload] + end. + +format_packet(undefined, _) -> ""; +format_packet(Packet, Encode) -> [", packet: ", emqx_packet:format(Packet, Encode)]. + +format_payload(undefined, _) -> ""; +format_payload(Payload, text) -> [", payload: ", io_lib:format("~ts", [Payload])]; +format_payload(Payload, hex) -> [", payload(hex): ", emqx_packet:encode_hex(Payload)]; +format_payload(_, hidden) -> ", payload=******". + +to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom); +to_iolist(Int) when is_integer(Int) -> integer_to_list(Int); +to_iolist(Float) when is_float(Float) -> float_to_list(Float, [{decimals, 2}]); +to_iolist(SubMap) when is_map(SubMap) -> ["[", map_to_iolist(SubMap), "]"]; +to_iolist(Char) -> emqx_logger_textfmt:try_format_unicode(Char). + +map_to_iolist(Map) -> + lists:join(",", + lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end, + maps:to_list(Map))). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index c76bf1aa9..320421309 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -25,6 +25,7 @@ -export([ running/0 , install/3 , install/4 + , install/5 , uninstall/1 , uninstall/2 ]). @@ -36,6 +37,7 @@ ]). -export([handler_id/2]). +-export([payload_encode/0]). -type tracer() :: #{ name := binary(), @@ -77,22 +79,18 @@ install(Type, Filter, Level, LogFile) -> -spec install(tracer(), logger:level() | all, string()) -> ok | {error, term()}. install(Who, all, LogFile) -> install(Who, debug, LogFile); -install(Who, Level, LogFile) -> - PrimaryLevel = emqx_logger:get_primary_log_level(), - try logger:compare_levels(Level, PrimaryLevel) of - lt -> - {error, - io_lib:format( - "Cannot trace at a log level (~s) " - "lower than the primary log level (~s)", - [Level, PrimaryLevel] - )}; - _GtOrEq -> - install_handler(Who, Level, LogFile) - catch - error:badarg -> - {error, {invalid_log_level, Level}} - end. +install(Who = #{name := Name, type := Type}, Level, LogFile) -> + HandlerId = handler_id(Name, Type), + Config = #{ + level => Level, + formatter => formatter(Who), + filter_default => stop, + filters => filters(Who), + config => ?CONFIG(LogFile) + }, + Res = logger:add_handler(HandlerId, logger_disk_log_h, Config), + show_prompts(Res, Who, "start_trace"), + Res. -spec uninstall(Type :: clientid | topic | ip_address, Name :: binary() | list()) -> ok | {error, term()}. @@ -121,83 +119,59 @@ uninstall(HandlerId) -> running() -> lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)). --spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore. +-spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log; -filter_clientid(_Log, _ExpectId) -> ignore. +filter_clientid(_Log, _ExpectId) -> stop. --spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore. +-spec filter_topic(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) -> case emqx_topic:match(Topic, TopicFilter) of true -> Log; - false -> ignore + false -> stop end; -filter_topic(_Log, _ExpectId) -> ignore. +filter_topic(_Log, _ExpectId) -> stop. --spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore. +-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop. filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) -> case lists:prefix(IP, Peername) of true -> Log; - false -> ignore + false -> stop end; -filter_ip_address(_Log, _ExpectId) -> ignore. - -install_handler(Who = #{name := Name, type := Type}, Level, LogFile) -> - HandlerId = handler_id(Name, Type), - Config = #{ - level => Level, - formatter => formatter(Who), - filter_default => stop, - filters => filters(Who), - config => ?CONFIG(LogFile) - }, - Res = logger:add_handler(HandlerId, logger_disk_log_h, Config), - show_prompts(Res, Who, "Start trace"), - Res. +filter_ip_address(_Log, _ExpectId) -> stop. filters(#{type := clientid, filter := Filter, name := Name}) -> - [{clientid, {fun ?MODULE:filter_clientid/2, {ensure_list(Filter), Name}}}]; + [{clientid, {fun ?MODULE:filter_clientid/2, {Filter, Name}}}]; filters(#{type := topic, filter := Filter, name := Name}) -> [{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}]; filters(#{type := ip_address, filter := Filter, name := Name}) -> [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}]. -formatter(#{type := Type}) -> - {logger_formatter, +formatter(#{type := _Type}) -> + {emqx_trace_formatter, #{ - template => template(Type), - single_line => false, + %% template is for ?SLOG message not ?TRACE. + template => [time," [",level,"] ", msg,"\n"], + single_line => true, max_size => unlimited, - depth => unlimited + depth => unlimited, + payload_encode => payload_encode() } }. -%% Don't log clientid since clientid only supports exact match, all client ids are the same. -%% if clientid is not latin characters. the logger_formatter restricts the output must be `~tp` -%% (actually should use `~ts`), the utf8 characters clientid will become very difficult to read. -template(clientid) -> - [time, " [", level, "] ", {peername, [peername, " "], []}, msg, "\n"]; -%% TODO better format when clientid is utf8. -template(_) -> - [time, " [", level, "] ", - {clientid, - [{peername, [clientid, "@", peername, " "], [clientid, " "]}], - [{peername, [peername, " "], []}] - }, - msg, "\n" - ]. - filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) -> Init = #{id => Id, level => Level, dst => Dst}, case Filters of - [{Type, {_FilterFun, {Filter, Name}}}] when + [{Type, {FilterFun, {Filter, Name}}}] when Type =:= topic orelse Type =:= clientid orelse Type =:= ip_address -> - [Init#{type => Type, filter => Filter, name => Name} | Acc]; + [Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc]; _ -> Acc end. +payload_encode() -> emqx_config:get([trace, payload_encode], text). + handler_id(Name, Type) -> try do_handler_id(Name, Type) diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 375b1ae2f..e2bdf6c72 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -347,7 +347,6 @@ websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, iolist_to_binary(Data)}, State); websocket_handle({binary, Data}, State) -> - ?SLOG(debug, #{msg => "RECV_data", data => Data, transport => websocket}), State2 = ensure_stats_timer(State), {Packets, State3} = parse_incoming(Data, [], State2), LenMsg = erlang:length(Packets), @@ -432,11 +431,11 @@ websocket_info(Info, State) -> websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) -> websocket_close(ReasonCode, State); websocket_close(Reason, State) -> - ?SLOG(debug, #{msg => "websocket_closed", reason => Reason}), + ?TRACE("SOCKET", "websocket_closed", #{reason => Reason}), handle_info({sock_closed, Reason}, State). terminate(Reason, _Req, #state{channel = Channel}) -> - ?SLOG(debug, #{msg => "terminated", reason => Reason}), + ?TRACE("SOCKET", "websocket_terminated", #{reason => Reason}), emqx_channel:terminate(Reason, Channel); terminate(_Reason, _Req, _UnExpectedState) -> @@ -480,7 +479,7 @@ handle_info({connack, ConnAck}, State) -> return(enqueue(ConnAck, State)); handle_info({close, Reason}, State) -> - ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}), + ?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}), return(enqueue({close, Reason}, State)); handle_info({event, connected}, State = #state{channel = Channel}) -> @@ -550,7 +549,7 @@ check_limiter(Needs, {ok, Limiter2} -> WhenOk(Data, Msgs, State#state{limiter = Limiter2}); {pause, Time, Limiter2} -> - ?SLOG(warning, #{msg => "pause time dueto rate limit", + ?SLOG(warning, #{msg => "pause_time_due_to_rate_limit", needs => Needs, time_in_ms => Time}), @@ -586,7 +585,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> , limiter_timer = undefined }); {pause, Time, Limiter2} -> - ?SLOG(warning, #{msg => "pause time dueto rate limit", + ?SLOG(warning, #{msg => "pause_time_due_to_rate_limit", types => Types, time_in_ms => Time}), @@ -663,7 +662,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> handle_incoming(Packet, State = #state{listener = {Type, Listener}}) when is_record(Packet, mqtt_packet) -> - ?SLOG(debug, #{msg => "RECV", packet => emqx_packet:format(Packet)}), + ?TRACE("WS-MQTT", "mqtt_packet_received", #{packet => Packet}), ok = inc_incoming_stats(Packet), NState = case emqx_pd:get_counter(incoming_pubs) > get_active_n(Type, Listener) of @@ -727,7 +726,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), <<>>; - Data -> ?SLOG(debug, #{msg => "SEND", packet => Packet}), + Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}), ok = inc_outgoing_stats(Packet), Data catch diff --git a/apps/emqx/test/emqx_trace_handler_SUITE.erl b/apps/emqx/test/emqx_trace_handler_SUITE.erl index abe233b58..1224fdac9 100644 --- a/apps/emqx/test/emqx_trace_handler_SUITE.erl +++ b/apps/emqx/test/emqx_trace_handler_SUITE.erl @@ -39,32 +39,29 @@ end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([]). init_per_testcase(t_trace_clientid, Config) -> + init(), Config; init_per_testcase(_Case, Config) -> - ok = emqx_logger:set_log_level(debug), _ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()], + init(), Config. end_per_testcase(_Case, _Config) -> - ok = emqx_logger:set_log_level(warning), + terminate(), ok. t_trace_clientid(_Config) -> %% Start tracing - emqx_logger:set_log_level(error), - {error, _} = emqx_trace_handler:install(clientid, <<"client">>, debug, "tmp/client.log"), - emqx_logger:set_log_level(debug), %% add list clientid - ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"), - ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"), - ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"), - {error, {invalid_log_level, bad_level}} = - emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"), + ok = emqx_trace_handler:install("CLI-client1", clientid, "client", debug, "tmp/client.log"), + ok = emqx_trace_handler:install("CLI-client2", clientid, <<"client2">>, all, "tmp/client2.log"), + ok = emqx_trace_handler:install("CLI-client3", clientid, <<"client3">>, all, "tmp/client3.log"), {error, {handler_not_added, {file_error, ".", eisdir}}} = emqx_trace_handler:install(clientid, <<"client5">>, debug, "."), - ok = filesync(<<"client">>, clientid), - ok = filesync(<<"client2">>, clientid), - ok = filesync(<<"client3">>, clientid), + emqx_trace:check(), + ok = filesync(<<"CLI-client1">>, clientid), + ok = filesync(<<"CLI-client2">>, clientid), + ok = filesync(<<"CLI-client3">>, clientid), %% Verify the tracing file exits ?assert(filelib:is_regular("tmp/client.log")), @@ -72,11 +69,11 @@ t_trace_clientid(_Config) -> ?assert(filelib:is_regular("tmp/client3.log")), %% Get current traces - ?assertMatch([#{type := clientid, filter := "client", name := <<"client">>, + ?assertMatch([#{type := clientid, filter := <<"client">>, name := <<"CLI-client1">>, level := debug, dst := "tmp/client.log"}, - #{type := clientid, filter := "client2", name := <<"client2">> + #{type := clientid, filter := <<"client2">>, name := <<"CLI-client2">> , level := debug, dst := "tmp/client2.log"}, - #{type := clientid, filter := "client3", name := <<"client3">>, + #{type := clientid, filter := <<"client3">>, name := <<"CLI-client3">>, level := debug, dst := "tmp/client3.log"} ], emqx_trace_handler:running()), @@ -85,9 +82,9 @@ t_trace_clientid(_Config) -> emqtt:connect(T), emqtt:publish(T, <<"a/b/c">>, <<"hi">>), emqtt:ping(T), - ok = filesync(<<"client">>, clientid), - ok = filesync(<<"client2">>, clientid), - ok = filesync(<<"client3">>, clientid), + ok = filesync(<<"CLI-client1">>, clientid), + ok = filesync(<<"CLI-client2">>, clientid), + ok = filesync(<<"CLI-client3">>, clientid), %% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log". {ok, Bin} = file:read_file("tmp/client.log"), @@ -98,25 +95,24 @@ t_trace_clientid(_Config) -> ?assert(filelib:file_size("tmp/client2.log") == 0), %% Stop tracing - ok = emqx_trace_handler:uninstall(clientid, <<"client">>), - ok = emqx_trace_handler:uninstall(clientid, <<"client2">>), - ok = emqx_trace_handler:uninstall(clientid, <<"client3">>), + ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client1">>), + ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client2">>), + ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client3">>), emqtt:disconnect(T), ?assertEqual([], emqx_trace_handler:running()). t_trace_clientid_utf8(_) -> - emqx_logger:set_log_level(debug), - Utf8Id = <<"client 漢字編碼"/utf8>>, - ok = emqx_trace_handler:install(clientid, Utf8Id, debug, "tmp/client-utf8.log"), + ok = emqx_trace_handler:install("CLI-UTF8", clientid, Utf8Id, debug, "tmp/client-utf8.log"), + emqx_trace:check(), {ok, T} = emqtt:start_link([{clientid, Utf8Id}]), emqtt:connect(T), [begin emqtt:publish(T, <<"a/b/c">>, <<"hi">>) end|| _ <- lists:seq(1, 10)], emqtt:ping(T), - ok = filesync(Utf8Id, clientid), - ok = emqx_trace_handler:uninstall(clientid, Utf8Id), + ok = filesync("CLI-UTF8", clientid), + ok = emqx_trace_handler:uninstall(clientid, "CLI-UTF8"), emqtt:disconnect(T), ?assertEqual([], emqx_trace_handler:running()), ok. @@ -126,11 +122,11 @@ t_trace_topic(_Config) -> emqtt:connect(T), %% Start tracing - emqx_logger:set_log_level(debug), - ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"), - ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"), - ok = filesync(<<"x/#">>, topic), - ok = filesync(<<"y/#">>, topic), + ok = emqx_trace_handler:install("CLI-TOPIC-1", topic, <<"x/#">>, all, "tmp/topic_trace_x.log"), + ok = emqx_trace_handler:install("CLI-TOPIC-2", topic, <<"y/#">>, all, "tmp/topic_trace_y.log"), + emqx_trace:check(), + ok = filesync("CLI-TOPIC-1", topic), + ok = filesync("CLI-TOPIC-2", topic), %% Verify the tracing file exits ?assert(filelib:is_regular("tmp/topic_trace_x.log")), @@ -138,9 +134,9 @@ t_trace_topic(_Config) -> %% Get current traces ?assertMatch([#{type := topic, filter := <<"x/#">>, - level := debug, dst := "tmp/topic_trace_x.log", name := <<"x/#">>}, + level := debug, dst := "tmp/topic_trace_x.log", name := <<"CLI-TOPIC-1">>}, #{type := topic, filter := <<"y/#">>, - name := <<"y/#">>, level := debug, dst := "tmp/topic_trace_y.log"} + name := <<"CLI-TOPIC-2">>, level := debug, dst := "tmp/topic_trace_y.log"} ], emqx_trace_handler:running()), @@ -149,8 +145,8 @@ t_trace_topic(_Config) -> emqtt:publish(T, <<"x/y/z">>, <<"hi2">>), emqtt:subscribe(T, <<"x/y/z">>), emqtt:unsubscribe(T, <<"x/y/z">>), - ok = filesync(<<"x/#">>, topic), - ok = filesync(<<"y/#">>, topic), + ok = filesync("CLI-TOPIC-1", topic), + ok = filesync("CLI-TOPIC-2", topic), {ok, Bin} = file:read_file("tmp/topic_trace_x.log"), ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])), @@ -161,8 +157,8 @@ t_trace_topic(_Config) -> ?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0), %% Stop tracing - ok = emqx_trace_handler:uninstall(topic, <<"x/#">>), - ok = emqx_trace_handler:uninstall(topic, <<"y/#">>), + ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-1">>), + ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-2">>), {error, _Reason} = emqx_trace_handler:uninstall(topic, <<"z/#">>), ?assertEqual([], emqx_trace_handler:running()), emqtt:disconnect(T). @@ -172,10 +168,12 @@ t_trace_ip_address(_Config) -> emqtt:connect(T), %% Start tracing - ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"), - ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"), - ok = filesync(<<"127.0.0.1">>, ip_address), - ok = filesync(<<"192.168.1.1">>, ip_address), + ok = emqx_trace_handler:install("CLI-IP-1", ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"), + ok = emqx_trace_handler:install("CLI-IP-2", ip_address, + "192.168.1.1", all, "tmp/ip_trace_y.log"), + emqx_trace:check(), + ok = filesync(<<"CLI-IP-1">>, ip_address), + ok = filesync(<<"CLI-IP-2">>, ip_address), %% Verify the tracing file exits ?assert(filelib:is_regular("tmp/ip_trace_x.log")), @@ -183,10 +181,10 @@ t_trace_ip_address(_Config) -> %% Get current traces ?assertMatch([#{type := ip_address, filter := "127.0.0.1", - name := <<"127.0.0.1">>, + name := <<"CLI-IP-1">>, level := debug, dst := "tmp/ip_trace_x.log"}, #{type := ip_address, filter := "192.168.1.1", - name := <<"192.168.1.1">>, + name := <<"CLI-IP-2">>, level := debug, dst := "tmp/ip_trace_y.log"} ], emqx_trace_handler:running()), @@ -196,8 +194,8 @@ t_trace_ip_address(_Config) -> emqtt:publish(T, <<"x/y/z">>, <<"hi2">>), emqtt:subscribe(T, <<"x/y/z">>), emqtt:unsubscribe(T, <<"x/y/z">>), - ok = filesync(<<"127.0.0.1">>, ip_address), - ok = filesync(<<"192.168.1.1">>, ip_address), + ok = filesync(<<"CLI-IP-1">>, ip_address), + ok = filesync(<<"CLI-IP-2">>, ip_address), {ok, Bin} = file:read_file("tmp/ip_trace_x.log"), ?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])), @@ -208,8 +206,8 @@ t_trace_ip_address(_Config) -> ?assert(filelib:file_size("tmp/ip_trace_y.log") =:= 0), %% Stop tracing - ok = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.1">>), - ok = emqx_trace_handler:uninstall(ip_address, <<"192.168.1.1">>), + ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-1">>), + ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-2">>), {error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>), emqtt:disconnect(T), ?assertEqual([], emqx_trace_handler:running()). @@ -221,7 +219,12 @@ filesync(Name, Type) -> %% sometime the handler process is not started yet. filesync(_Name, _Type, 0) -> ok; -filesync(Name, Type, Retry) -> +filesync(Name0, Type, Retry) -> + Name = + case is_binary(Name0) of + true -> Name0; + false -> list_to_binary(Name0) + end, try Handler = binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>), @@ -231,3 +234,9 @@ filesync(Name, Type, Retry) -> ct:sleep(100), filesync(Name, Type, Retry - 1) end. + +init() -> + emqx_trace:start_link(). + +terminate() -> + catch ok = gen_server:stop(emqx_trace, normal, 5000). diff --git a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl index 885811fec..31e5e52e1 100644 --- a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl @@ -67,7 +67,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_authn, emqx_dashboard]), + emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authn]), ok. set_special_configs(emqx_dashboard) -> diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d4fc3df2d..a6681d3f1 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -214,7 +214,7 @@ update(Type, Name, {OldConf, Conf}) -> case recreate(Type, Name, Conf) of {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, not_found} -> - ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one" + ?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one" , type => Type, name => Name, config => Conf}), create(Type, Name, Conf); {error, Reason} -> {update_bridge_failed, Reason} @@ -242,7 +242,7 @@ create_dry_run(Type, Conf) -> end. remove(Type, Name, _Conf) -> - ?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}), + ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of ok -> ok; {error, not_found} -> ok; diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 7ebe7645b..514b9156a 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -236,7 +236,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> false -> RetryMs end; {aborted, Reason} -> - ?SLOG(error, #{msg => "read_next_mfa transaction failed", error => Reason}), + ?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}), RetryMs end. @@ -248,7 +248,7 @@ read_next_mfa(Node) -> TnxId = max(LatestId - 1, 0), commit(Node, TnxId), ?SLOG(notice, #{ - msg => "New node first catch up and start commit.", + msg => "new_node_first_catch_up_and_start_commit.", node => Node, tnx_id => TnxId}), TnxId; [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 @@ -277,7 +277,7 @@ do_catch_up(ToTnxId, Node) -> io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", [Node, LastAppliedId, ToTnxId])), ?SLOG(error, #{ - msg => "catch up failed!", + msg => "catch_up_failed!", last_applied_id => LastAppliedId, to_tnx_id => ToTnxId }), diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index dec07f35c..b8a8c211d 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -144,7 +144,7 @@ multicall(M, F, Args) -> {retry, TnxId, Res, Nodes} -> %% The init MFA return ok, but other nodes failed. %% We return ok and alert an alarm. - ?SLOG(error, #{msg => "failed to update config in cluster", nodes => Nodes, + ?SLOG(error, #{msg => "failed_to_update_config_in_cluster", nodes => Nodes, tnx_id => TnxId, mfa => {M, F, Args}}), Res; {error, Error} -> %% all MFA return not ok or {ok, term()}. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 1ea368826..385953b87 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -792,16 +792,7 @@ do_formatter(json, CharsLimit, SingleLine, TimeOffSet, Depth) -> }}; do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth) -> {emqx_logger_textfmt, - #{template => - [time," [",level,"] ", - {clientid, - [{peername, - [clientid,"@",peername," "], - [clientid, " "]}], - [{peername, - [peername," "], - []}]}, - msg,"\n"], + #{template => [time," [",level,"] ", msg,"\n"], chars_limit => CharsLimit, single_line => SingleLine, time_offset => TimeOffSet, diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index ad74faf99..66b05c95a 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -74,9 +74,19 @@ t_base_test(_Config) -> ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)), {atomic, Status} = emqx_cluster_rpc:status(), - ?assertEqual(3, length(Status)), - ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)), + case length(Status) =:= 3 of + true -> ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status)); + false -> + %% wait for mnesia to write in. + ct:sleep(42), + {atomic, Status1} = emqx_cluster_rpc:status(), + ct:pal("status: ~p", Status), + ct:pal("status1: ~p", Status1), + ?assertEqual(3, length(Status1)), + ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status)) + end, ok. t_commit_fail_test(_Config) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 21c06284d..ac0847a91 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -143,7 +143,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme, retry_interval := RetryInterval, pool_type := PoolType, pool_size := PoolSize} = Config) -> - ?SLOG(info, #{msg => "starting http connector", + ?SLOG(info, #{msg => "starting_http_connector", connector => InstId, config => Config}), {Transport, TransportOpts} = case Scheme of http -> @@ -181,13 +181,13 @@ on_start(InstId, #{base_url := #{scheme := Scheme, end. on_stop(InstId, #{pool_name := PoolName}) -> - ?SLOG(info, #{msg => "stopping http connector", + ?SLOG(info, #{msg => "stopping_http_connector", connector => InstId}), ehttpc_sup:stop_pool(PoolName). on_query(InstId, {send_message, Msg}, AfterQuery, State) -> case maps:get(request, State, undefined) of - undefined -> ?SLOG(error, #{msg => "request not found", connector => InstId}); + undefined -> ?SLOG(error, #{msg => "request_not_found", connector => InstId}); Request -> #{method := Method, path := Path, body := Body, headers := Headers, request_timeout := Timeout} = process_request(Request, Msg), @@ -199,16 +199,15 @@ on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State); on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, #{pool_name := PoolName, base_path := BasePath} = State) -> - ?SLOG(debug, #{msg => "http connector received request", - request => Request, connector => InstId, - state => State}), + ?TRACE("QUERY", "http_connector_received", + #{request => Request, connector => InstId, state => State}), NRequest = update_path(BasePath, Request), case Result = ehttpc:request(case KeyOrNum of undefined -> PoolName; _ -> {PoolName, KeyOrNum} end, Method, NRequest, Timeout) of {error, Reason} -> - ?SLOG(error, #{msg => "http connector do reqeust failed", + ?SLOG(error, #{msg => "http_connector_do_reqeust_failed", request => NRequest, reason => Reason, connector => InstId}), emqx_resource:query_failed(AfterQuery); diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 8af516b82..c188837ba 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -55,7 +55,7 @@ on_start(InstId, #{servers := Servers0, pool_size := PoolSize, auto_reconnect := AutoReconn, ssl := SSL} = Config) -> - ?SLOG(info, #{msg => "starting ldap connector", + ?SLOG(info, #{msg => "starting_ldap_connector", connector => InstId, config => Config}), Servers = [begin proplists:get_value(host, S) end || S <- Servers0], SslOpts = case maps:get(enable, SSL) of @@ -81,23 +81,21 @@ on_start(InstId, #{servers := Servers0, {ok, #{poolname => PoolName}}. on_stop(InstId, #{poolname := PoolName}) -> - ?SLOG(info, #{msg => "stopping ldap connector", + ?SLOG(info, #{msg => "stopping_ldap_connector", connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) -> Request = {Base, Filter, Attributes}, - ?SLOG(debug, #{msg => "ldap connector received request", - request => Request, connector => InstId, - state => State}), + ?TRACE("QUERY", "ldap_connector_received", + #{request => Request, connector => InstId, state => State}), case Result = ecpool:pick_and_do( PoolName, {?MODULE, search, [Base, Filter, Attributes]}, no_handover) of {error, Reason} -> - ?SLOG(error, #{msg => "ldap connector do request failed", - request => Request, connector => InstId, - reason => Reason}), + ?SLOG(error, #{msg => "ldap_connector_do_request_failed", + request => Request, connector => InstId, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 6a1b15e57..5321ea459 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -128,7 +128,7 @@ on_start(InstId, Config = #{mongo_type := Type, {ok, #{poolname => PoolName, type => Type}}. on_stop(InstId, #{poolname := PoolName}) -> - ?SLOG(info, #{msg => "stopping mongodb connector", + ?SLOG(info, #{msg => "stopping_mongodb_connector", connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). @@ -137,14 +137,13 @@ on_query(InstId, AfterQuery, #{poolname := PoolName} = State) -> Request = {Action, Collection, Selector, Docs}, - ?SLOG(debug, #{msg => "mongodb connector received request", - request => Request, connector => InstId, - state => State}), + ?TRACE("QUERY", "mongodb_connector_received", + #{request => Request, connector => InstId, state => State}), case ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of {error, Reason} -> - ?SLOG(error, #{msg => "mongodb connector do query failed", + ?SLOG(error, #{msg => "mongodb_connector_do_query_failed", request => Request, reason => Reason, connector => InstId}), emqx_resource:query_failed(AfterQuery), diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index bcf558117..a9647b2c1 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -118,7 +118,7 @@ on_message_received(Msg, HookPoint) -> %% =================================================================== on_start(InstId, Conf) -> InstanceId = binary_to_atom(InstId, utf8), - ?SLOG(info, #{msg => "starting mqtt connector", + ?SLOG(info, #{msg => "starting_mqtt_connector", connector => InstanceId, config => Conf}), BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ @@ -139,19 +139,18 @@ on_start(InstId, Conf) -> end. on_stop(_InstId, #{name := InstanceId}) -> - ?SLOG(info, #{msg => "stopping mqtt connector", + ?SLOG(info, #{msg => "stopping_mqtt_connector", connector => InstanceId}), case ?MODULE:drop_bridge(InstanceId) of ok -> ok; {error, not_found} -> ok; {error, Reason} -> - ?SLOG(error, #{msg => "stop mqtt connector", + ?SLOG(error, #{msg => "stop_mqtt_connector", connector => InstanceId, reason => Reason}) end. on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> - ?SLOG(debug, #{msg => "send msg to remote node", message => Msg, - connector => InstanceId}), + ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_resource:query_success(AfterQuery). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index c93a1e350..265a6a01e 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -56,7 +56,7 @@ on_start(InstId, #{server := {Host, Port}, auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL } = Config) -> - ?SLOG(info, #{msg => "starting mysql connector", + ?SLOG(info, #{msg => "starting_mysql_connector", connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of true -> @@ -76,7 +76,7 @@ on_start(InstId, #{server := {Host, Port}, {ok, #{poolname => PoolName}}. on_stop(InstId, #{poolname := PoolName}) -> - ?SLOG(info, #{msg => "stopping mysql connector", + ?SLOG(info, #{msg => "stopping_mysql_connector", connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). @@ -85,14 +85,13 @@ on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State); on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> - ?SLOG(debug, #{msg => "mysql connector received sql query", - connector => InstId, sql => SQL, state => State}), + ?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}), case Result = ecpool:pick_and_do( PoolName, {mysql, query, [SQL, Params, Timeout]}, no_handover) of {error, Reason} -> - ?SLOG(error, #{msg => "mysql connector do sql query failed", + ?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed", connector => InstId, sql => SQL, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 2f09ae59a..81435a1c5 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -58,7 +58,7 @@ on_start(InstId, #{server := {Host, Port}, auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL } = Config) -> - ?SLOG(info, #{msg => "starting postgresql connector", + ?SLOG(info, #{msg => "starting_postgresql_connector", connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of true -> @@ -88,12 +88,12 @@ on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) -> {prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]}; {prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]} end, - ?SLOG(debug, #{msg => "postgresql connector received sql query", - connector => InstId, command => Command, args => Args, state => State}), + ?TRACE("QUERY", "postgresql_connector_received", + #{connector => InstId, command => Command, args => Args, state => State}), case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of {error, Reason} -> ?SLOG(error, #{ - msg => "postgresql connector do sql query failed", + msg => "postgresql_connector_do_sql_query_failed", connector => InstId, sql => SQL, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 2b71ab808..4ae19e85c 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -94,7 +94,7 @@ on_start(InstId, #{redis_type := Type, pool_size := PoolSize, auto_reconnect := AutoReconn, ssl := SSL } = Config) -> - ?SLOG(info, #{msg => "starting redis connector", + ?SLOG(info, #{msg => "starting_redis_connector", connector => InstId, config => Config}), Servers = case Type of single -> [{servers, [maps:get(server, Config)]}]; @@ -127,20 +127,20 @@ on_start(InstId, #{redis_type := Type, {ok, #{poolname => PoolName, type => Type}}. on_stop(InstId, #{poolname := PoolName}) -> - ?SLOG(info, #{msg => "stopping redis connector", + ?SLOG(info, #{msg => "stopping_redis_connector", connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> - ?SLOG(debug, #{msg => "redis connector received cmd query", - connector => InstId, sql => Command, state => State}), + ?TRACE("QUERY", "redis_connector_received", + #{connector => InstId, sql => Command, state => State}), Result = case Type of cluster -> eredis_cluster:q(PoolName, Command); _ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) end, case Result of {error, Reason} -> - ?SLOG(error, #{msg => "redis connector do cmd query failed", + ?SLOG(error, #{msg => "redis_connector_do_cmd_query_failed", connector => InstId, sql => Command, reason => Reason}), emqx_resource:query_failed(AfterCommand); _ -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 7d5bb1283..3ab410391 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -158,15 +158,15 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) RC =:= ?RC_NO_MATCHING_SUBSCRIBERS -> Parent ! {batch_ack, PktId}, ok; handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> - ?SLOG(warning, #{msg => "publish to remote node falied", + ?SLOG(warning, #{msg => "publish_to_remote_node_falied", packet_id => PktId, reason_code => RC}). handle_publish(Msg, undefined) -> - ?SLOG(error, #{msg => "cannot publish to local broker as" - " 'ingress' is not configured", + ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" + "_'ingress'_is_not_configured", message => Msg}); handle_publish(Msg, Vars) -> - ?SLOG(debug, #{msg => "publish to local broker", + ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), case Vars of diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 5f6f4b69f..e0d5a2d77 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -188,7 +188,7 @@ callback_mode() -> [state_functions]. %% @doc Config should be a map(). init(#{name := Name} = ConnectOpts) -> - ?SLOG(debug, #{msg => "starting bridge worker", + ?SLOG(debug, #{msg => "starting_bridge_worker", name => Name}), erlang:process_flag(trap_exit, true), Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), @@ -335,7 +335,7 @@ common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) -> NewQ = replayq:append(Q, [Msg]), {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(StateName, Type, Content, #{name := Name} = State) -> - ?SLOG(notice, #{msg => "Bridge discarded event", + ?SLOG(notice, #{msg => "bridge_discarded_event", name => Name, type => Type, state_name => StateName, content => Content}), {keep_state, State}. @@ -349,7 +349,7 @@ do_connect(#{connect_opts := ConnectOpts, {ok, State#{connection => Conn}}; {error, Reason} -> ConnectOpts1 = obfuscate(ConnectOpts), - ?SLOG(error, #{msg => "Failed to connect", + ?SLOG(error, #{msg => "failed_to_connect", config => ConnectOpts1, reason => Reason}), {error, Reason, State} end. @@ -386,8 +386,8 @@ pop_and_send_loop(#{replayq := Q} = State, N) -> end. do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) -> - ?SLOG(error, #{msg => "cannot forward messages to remote broker" - " as 'egress' is not configured", + ?SLOG(error, #{msg => "cannot_forward_messages_to_remote_broker" + "_as_'egress'_is_not_configured", messages => Msg}); do_send(#{inflight := Inflight, connection := Connection, @@ -398,7 +398,7 @@ do_send(#{inflight := Inflight, emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'), emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) end, - ?SLOG(debug, #{msg => "publish to remote broker", + ?SLOG(debug, #{msg => "publish_to_remote_broker", message => Msg, vars => Vars}), case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of {ok, Refs} -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 36f50e081..be8b4d074 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -312,6 +312,9 @@ responses(Responses, Module) -> response(Status, Bin, {Acc, RefsAcc, Module}) when is_binary(Bin) -> {Acc#{integer_to_binary(Status) => #{description => Bin}}, RefsAcc, Module}; +%% Support swagger raw object(file download). +response(Status, #{content := _} = Content, {Acc, RefsAcc, Module}) -> + {Acc#{integer_to_binary(Status) => Content}, RefsAcc, Module}; response(Status, ?REF(StructName), {Acc, RefsAcc, Module}) -> response(Status, ?R_REF(Module, StructName), {Acc, RefsAcc, Module}); response(Status, ?R_REF(_Mod, _Name) = RRef, {Acc, RefsAcc, Module}) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_app.erl b/apps/emqx_management/src/emqx_mgmt_api_app.erl index dfce3cf30..489d679be 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_app.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_app.erl @@ -149,7 +149,7 @@ api_key(post, #{body := App}) -> Desc = unicode:characters_to_binary(Desc0, unicode), case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of {ok, NewApp} -> {200, format(NewApp)}; - {error, Reason} -> {400, Reason} + {error, Reason} -> {400, io_lib:format("~p", [Reason])} end. api_key_by_name(get, #{bindings := #{name := Name}}) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index d6902d123..296cecea2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -107,9 +107,14 @@ schema("/trace/:name/download") -> get => #{ description => "Download trace log by name", parameters => [hoconsc:ref(name)], - %% todo zip file octet-stream responses => #{ - 200 => <<"TODO octet-stream">> + 200 => + #{description => "A trace zip file", + content => #{ + 'application/octet-stream' => + #{schema => #{type => "string", format => "binary"}} + } + } } } }; @@ -124,9 +129,12 @@ schema("/trace/:name/log") -> hoconsc:ref(position), hoconsc:ref(node) ], - %% todo response data responses => #{ - 200 => <<"TODO">> + 200 => + [ + {items, hoconsc:mk(binary(), #{example => "TEXT-LOG-ITEMS"})} + | fields(bytes) ++ fields(position) + ] } } }. @@ -209,6 +217,7 @@ fields(position) -> default => 0 })}]. + -define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$"). validate_name(Name) -> @@ -296,7 +305,12 @@ download_trace_log(get, #{bindings := #{name := Name}}) -> ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip", {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), emqx_trace:delete_files_after_send(ZipFileName, Zips), - {200, ZipFile}; + Headers = #{ + <<"content-type">> => <<"application/x-zip">>, + <<"content-disposition">> => + iolist_to_binary("attachment; filename=" ++ filename:basename(ZipFile)) + }, + {200, Headers, {file, ZipFile}}; {error, not_found} -> ?NOT_FOUND(Name) end. @@ -324,11 +338,10 @@ cluster_call(Mod, Fun, Args, Timeout) -> BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]), GoodRes. -stream_log_file(get, #{bindings := #{name := Name}, query_string := Query} = T) -> +stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) -> Node0 = maps:get(<<"node">>, Query, atom_to_binary(node())), Position = maps:get(<<"position">>, Query, 0), Bytes = maps:get(<<"bytes">>, Query, 1000), - logger:error("~p", [T]), case to_node(Node0) of {ok, Node} -> case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 5885a2b17..3a723f33c 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -18,6 +18,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). -include("emqx_mgmt.hrl"). @@ -386,18 +387,20 @@ trace(["list"]) -> emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst]) end, emqx_trace_handler:running()); -trace(["stop", Operation, ClientId]) -> - case trace_type(Operation) of - {ok, Type} -> trace_off(Type, ClientId); +trace(["stop", Operation, Filter0]) -> + case trace_type(Operation, Filter0) of + {ok, Type, Filter} -> trace_off(Type, Filter); error -> trace([]) end; trace(["start", Operation, ClientId, LogFile]) -> trace(["start", Operation, ClientId, LogFile, "all"]); -trace(["start", Operation, ClientId, LogFile, Level]) -> - case trace_type(Operation) of - {ok, Type} -> trace_on(Type, ClientId, list_to_existing_atom(Level), LogFile); +trace(["start", Operation, Filter0, LogFile, Level]) -> + case trace_type(Operation, Filter0) of + {ok, Type, Filter} -> + trace_on(name(Filter0), Type, Filter, + list_to_existing_atom(Level), LogFile); error -> trace([]) end; @@ -417,20 +420,23 @@ trace(_) -> "Stop tracing for a client ip on local node"} ]). -trace_on(Who, Name, Level, LogFile) -> - case emqx_trace_handler:install(Who, Name, Level, LogFile) of +trace_on(Name, Type, Filter, Level, LogFile) -> + case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile) of ok -> - emqx_ctl:print("trace ~s ~s successfully~n", [Who, Name]); + emqx_trace:check(), + emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]); {error, Error} -> - emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Who, Name, Error]) + emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Filter, Name, Error]) end. -trace_off(Who, Name) -> - case emqx_trace_handler:uninstall(Who, Name) of +trace_off(Type, Filter) -> + ?TRACE("CLI", "trace_stopping", #{Type => Filter}), + case emqx_trace_handler:uninstall(Type, name(Filter)) of ok -> - emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Name]); + emqx_trace:check(), + emqx_ctl:print("stop tracing ~s ~s successfully~n", [Type, Filter]); {error, Error} -> - emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Name, Error]) + emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Type, Filter, Error]) end. %%-------------------------------------------------------------------- @@ -459,9 +465,9 @@ traces(["delete", Name]) -> traces(["start", Name, Operation, Filter]) -> traces(["start", Name, Operation, Filter, "900"]); -traces(["start", Name, Operation, Filter, DurationS]) -> - case trace_type(Operation) of - {ok, Type} -> trace_cluster_on(Name, Type, Filter, DurationS); +traces(["start", Name, Operation, Filter0, DurationS]) -> + case trace_type(Operation, Filter0) of + {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS); error -> traces([]) end; @@ -503,10 +509,10 @@ trace_cluster_off(Name) -> {error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error]) end. -trace_type("client") -> {ok, clientid}; -trace_type("topic") -> {ok, topic}; -trace_type("ip_address") -> {ok, ip_address}; -trace_type(_) -> error. +trace_type("client", ClientId) -> {ok, clientid, list_to_binary(ClientId)}; +trace_type("topic", Topic) -> {ok, topic, list_to_binary(Topic)}; +trace_type("ip_address", IP) -> {ok, ip_address, IP}; +trace_type(_, _) -> error. %%-------------------------------------------------------------------- %% @doc Listeners Command @@ -716,3 +722,6 @@ format_listen_on({Addr, Port}) when is_list(Addr) -> io_lib:format("~ts:~w", [Addr, Port]); format_listen_on({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]). + +name(Filter) -> + iolist_to_binary(["CLI-", Filter]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl index 61a520e81..d02f62d70 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl @@ -85,7 +85,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}}, Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), - ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}), + ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}), safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload); %% in case this is a "$events/" event @@ -99,7 +99,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}}, Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), - ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}), + ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}), safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload). %%-------------------------------------------------------------------- diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 4225c6f72..60a7cbaad 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -248,7 +248,7 @@ handle_output(OutId, Selected, Envs) -> end. do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> - ?SLOG(debug, #{msg => "output to bridge", bridge_id => BridgeId}), + ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}), emqx_bridge:send_message(BridgeId, Selected); do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> Mod:Func(Selected, Envs, Args). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 74ec1bb1c..cd4d0ce6b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -77,7 +77,7 @@ flatten([D1 | L]) when is_list(D1) -> D1 ++ flatten(L). echo_action(Data, Envs) -> - ?SLOG(debug, #{msg => "testing_rule_sql_ok", data => Data, envs => Envs}), + ?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}), Data. fill_default_values(Event, Context) -> diff --git a/rebar.config b/rebar.config index 017a73095..7b08d38a0 100644 --- a/rebar.config +++ b/rebar.config @@ -55,7 +55,7 @@ , {mria, {git, "https://github.com/emqx/mria", {tag, "0.1.5"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.7"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {replayq, "0.3.3"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}