diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index d1e70b184..27dd8b6c8 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -38,4 +38,10 @@ -define(SHARD, ?COMMON_SHARD). -define(MAX_SIZE, 30). +-define(EMQX_TRACE_STOP_ACTION(REASON), + {unrecoverable_error, {action_stopped_after_template_rendering, REASON}} +). + +-define(EMQX_TRACE_STOP_ACTION_MATCH, ?EMQX_TRACE_STOP_ACTION(_)). + -endif. diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 7bbe59b2b..91de65b39 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -29,7 +29,9 @@ unsubscribe/2, log/3, log/4, - rendered_action_template/2 + rendered_action_template/2, + make_rendered_action_template_trace_context/1, + rendered_action_template_with_ctx/2 ]). -export([ @@ -70,6 +72,12 @@ -export_type([ruleid/0]). -type ruleid() :: binary(). +-export_type([rendered_action_template_ctx/0]). +-opaque rendered_action_template_ctx() :: #{ + trace_ctx := map(), + action_id := any() +}. + publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when @@ -87,7 +95,7 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). -rendered_action_template(ActionID, RenderResult) -> +rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) -> TraceResult = ?TRACE( "QUERY_RENDER", "action_template_rendered", @@ -107,11 +115,55 @@ rendered_action_template(ActionID, RenderResult) -> ) ), MsgBin = unicode:characters_to_binary(StopMsg), - error({unrecoverable_error, {action_stopped_after_template_rendering, MsgBin}}); + error(?EMQX_TRACE_STOP_ACTION(MsgBin)); _ -> ok end, - TraceResult. + TraceResult; +rendered_action_template(_ActionID, _RenderResult) -> + %% We do nothing if we don't get a valid Action ID. This can happen when + %% called from connectors that are used for actions as well as authz and + %% authn. + ok. + +%% The following two functions are used for connectors that don't do the +%% rendering in the main process (the one that called on_*query). In this case +%% we need to pass the trace context to the sub process that do the rendering +%% so that the result of the rendering can be traced correctly. It is also +%% important to ensure that the error that can be thrown from +%% rendered_action_template_with_ctx is handled in the appropriate way in the +%% sub process. +-spec make_rendered_action_template_trace_context(any()) -> rendered_action_template_ctx(). +make_rendered_action_template_trace_context(ActionID) -> + MetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + #{trace_ctx => MetaData, action_id => ActionID}. + +-spec rendered_action_template_with_ctx(rendered_action_template_ctx(), Result :: term()) -> term(). +rendered_action_template_with_ctx( + #{ + trace_ctx := LogMetaData, + action_id := ActionID + }, + RenderResult +) -> + OldMetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. log(List, Msg, Meta) -> log(debug, List, Msg, Meta). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index 35b09b9b0..8f748ed9f 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -48,6 +48,21 @@ prepare_log_map(LogMap, PEncode) -> NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)], maps:from_list(NewKeyValuePairs). +prepare_key_value(K, {Formatter, V}, PEncode) when is_function(Formatter, 1) -> + %% A cusom formatter is provided with the value + try + NewV = Formatter(V), + prepare_key_value(K, NewV, PEncode) + catch + _:_ -> + {K, V} + end; +prepare_key_value(K, {ok, Status, Headers, Body}, PEncode) when + is_integer(Status), is_list(Headers), is_binary(Body) +-> + %% This is unlikely anything else then info about a HTTP request so we make + %% it more structured + prepare_key_value(K, #{status => Status, headers => Headers, body => Body}, PEncode); prepare_key_value(payload = K, V, PEncode) -> NewV = try diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index eb12cbaae..ef79f78fe 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -223,6 +223,11 @@ do_single_query(InstId, Request, Async, #{pool_name := PoolName} = State) -> } ), {PreparedKeyOrCQL1, Data} = proc_cql_params(Type, PreparedKeyOrCQL, Params, State), + emqx_trace:rendered_action_template(PreparedKeyOrCQL, #{ + type => Type, + key_or_cql => PreparedKeyOrCQL1, + data => Data + }), Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrCQL1, Data), handle_result(Res). @@ -261,6 +266,14 @@ do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) -> state => State } ), + ChannelID = + case Requests of + [{CID, _} | _] -> CID; + _ -> none + end, + emqx_trace:rendered_action_template(ChannelID, #{ + cqls => CQLs + }), Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs), handle_result(Res). diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 942f7590b..2c824aa95 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -386,7 +386,7 @@ on_query( SimplifiedRequestType = query_type(RequestType), Templates = get_templates(RequestType, State), SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL), - ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL), + ClickhouseResult = execute_sql_in_clickhouse_server(RequestType, PoolName, SQL), transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL). get_templates(ChannId, State) -> @@ -398,7 +398,7 @@ get_templates(ChannId, State) -> end. get_sql(channel_message, #{send_message_template := PreparedSQL}, Data) -> - emqx_placeholder:proc_tmpl(PreparedSQL, Data); + emqx_placeholder:proc_tmpl(PreparedSQL, Data, #{return => full_binary}); get_sql(_, _, SQL) -> SQL. @@ -425,7 +425,7 @@ on_batch_query(ResourceID, BatchReq, #{pool_name := PoolName} = State) -> %% Create batch insert SQL statement SQL = objects_to_sql(ObjectsToInsert, Templates), %% Do the actual query in the database - ResultFromClickhouse = execute_sql_in_clickhouse_server(PoolName, SQL), + ResultFromClickhouse = execute_sql_in_clickhouse_server(ChannId, PoolName, SQL), %% Transform the result to a better format transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL). @@ -464,7 +464,8 @@ objects_to_sql(_, _) -> %% This function is used by on_query/3 and on_batch_query/3 to send a query to %% the database server and receive a result -execute_sql_in_clickhouse_server(PoolName, SQL) -> +execute_sql_in_clickhouse_server(Id, PoolName, SQL) -> + emqx_trace:rendered_action_template(Id, #{rendered_sql => SQL}), ecpool:pick_and_do( PoolName, {?MODULE, execute_sql_in_clickhouse_server_using_connection, [SQL]}, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 372472dda..598b3342d 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -9,6 +9,7 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -246,12 +247,16 @@ do_query( table := Table, templates := Templates } = ChannelState, + TraceRenderedCTX = + emqx_trace:make_rendered_action_template_trace_context(ChannelId), Result = case ensuare_dynamo_keys(Query, ChannelState) of true -> ecpool:pick_and_do( PoolName, - {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]}, + {emqx_bridge_dynamo_connector_client, query, [ + Table, QueryTuple, Templates, TraceRenderedCTX + ]}, no_handover ); _ -> @@ -259,6 +264,8 @@ do_query( end, case Result of + {error, ?EMQX_TRACE_STOP_ACTION(_)} = Error -> + Error; {error, Reason} -> ?tp( dynamo_connector_query_return, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index 4f924ef67..f257ae389 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -10,7 +10,7 @@ -export([ start_link/1, is_connected/2, - query/4 + query/5 ]). %% gen_server callbacks @@ -40,8 +40,8 @@ is_connected(Pid, Timeout) -> {false, Error} end. -query(Pid, Table, Query, Templates) -> - gen_server:call(Pid, {query, Table, Query, Templates}, infinity). +query(Pid, Table, Query, Templates, TraceRenderedCTX) -> + gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedCTX}, infinity). %%-------------------------------------------------------------------- %% @doc @@ -77,14 +77,14 @@ handle_call(is_connected, _From, State) -> {false, Error} end, {reply, IsConnected, State}; -handle_call({query, Table, Query, Templates}, _From, State) -> - Result = do_query(Table, Query, Templates), +handle_call({query, Table, Query, Templates, TraceRenderedCTX}, _From, State) -> + Result = do_query(Table, Query, Templates, TraceRenderedCTX), {reply, Result, State}; handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) -> - Result = do_query(Table, Query, Templates), + Result = do_query(Table, Query, Templates, {fun(_, _) -> ok end, none}), ReplyFun(Context, Result), {noreply, State}; handle_cast(_Request, State) -> @@ -102,15 +102,29 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -do_query(Table, Query0, Templates) -> +do_query(Table, Query0, Templates, TraceRenderedCTX) -> try Query = apply_template(Query0, Templates), + emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ + table => Table, + query => {fun trace_format_query/1, Query} + }), execute(Query, Table) catch + error:{unrecoverable_error, Reason} -> + {error, {unrecoverable_error, Reason}}; _Type:Reason -> {error, {unrecoverable_error, {invalid_request, Reason}}} end. +trace_format_query({Type, Data}) -> + #{type => Type, data => Data}; +trace_format_query([_ | _] = Batch) -> + BatchData = [trace_format_query(Q) || Q <- Batch], + #{type => batch, data => BatchData}; +trace_format_query(Query) -> + Query. + %% some simple query commands for authn/authz or test execute({insert_item, Msg}, Table) -> Item = convert_to_item(Msg), diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 13040dccf..12d5d1f2f 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -284,6 +284,13 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) -> Method = post, ReqOpts = #{request_ttl => RequestTTL}, Request = {prepared_request, {Method, Path, Body}, ReqOpts}, + emqx_trace:rendered_action_template(MessageTag, #{ + method => Method, + path => Path, + body => Body, + options => ReqOpts, + is_async => false + }), Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client), QueryMode = sync, handle_result(Result, Request, QueryMode, InstanceId). @@ -312,6 +319,13 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) -> ReqOpts = #{request_ttl => RequestTTL}, Request = {prepared_request, {Method, Path, Body}, ReqOpts}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]}, + emqx_trace:rendered_action_template(MessageTag, #{ + method => Method, + path => Path, + body => Body, + options => ReqOpts, + is_async => true + }), emqx_bridge_gcp_pubsub_client:query_async( Request, ReplyFunAndArgs, Client ). diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 6bdd8a4cd..97eedf3f6 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -128,7 +128,7 @@ on_query(InstId, {Channel, Message}, State) -> greptimedb_connector_send_query, #{points => Points, batch => false, mode => sync} ), - do_query(InstId, Client, Points); + do_query(InstId, Channel, Client, Points); {error, ErrorPoints} -> ?tp( greptimedb_connector_send_query_error, @@ -152,7 +152,7 @@ on_batch_query(InstId, [{Channel, _} | _] = BatchData, State) -> greptimedb_connector_send_query, #{points => Points, batch => true, mode => sync} ), - do_query(InstId, Client, Points); + do_query(InstId, Channel, Client, Points); {error, Reason} -> ?tp( greptimedb_connector_send_query_error, @@ -173,7 +173,7 @@ on_query_async(InstId, {Channel, Message}, {ReplyFun, Args}, State) -> greptimedb_connector_send_query, #{points => Points, batch => false, mode => async} ), - do_async_query(InstId, Client, Points, {ReplyFun, Args}); + do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args}); {error, ErrorPoints} = Err -> ?tp( greptimedb_connector_send_query_error, @@ -195,7 +195,7 @@ on_batch_query_async(InstId, [{Channel, _} | _] = BatchData, {ReplyFun, Args}, S greptimedb_connector_send_query, #{points => Points, batch => true, mode => async} ), - do_async_query(InstId, Client, Points, {ReplyFun, Args}); + do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args}); {error, Reason} -> ?tp( greptimedb_connector_send_query_error, @@ -420,7 +420,8 @@ is_auth_key(_) -> %% ------------------------------------------------------------------------------------------------- %% Query -do_query(InstId, Client, Points) -> +do_query(InstId, Channel, Client, Points) -> + emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}), case greptimedb:write_batch(Client, Points) of {ok, #{response := {affected_rows, #{value := Rows}}}} -> ?SLOG(debug, #{ @@ -452,12 +453,13 @@ do_query(InstId, Client, Points) -> end end. -do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> +do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) -> ?SLOG(info, #{ msg => "greptimedb_write_point_async", connector => InstId, points => Points }), + emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}), WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]}, ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs). diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index cf195fc9f..cf53291b2 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -134,8 +134,11 @@ on_query( #{ producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate } = maps:get(ChannelID, Channels), - try to_record(PartitionKey, HRecordTemplate, Data) of - Record -> append_record(InstId, Producer, Record, false) + try + KeyAndRawRecord = to_key_and_raw_record(PartitionKey, HRecordTemplate, Data), + emqx_trace:rendered_action_template(ChannelID, #{record => KeyAndRawRecord}), + Record = to_record(KeyAndRawRecord), + append_record(InstId, Producer, Record, false) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -148,8 +151,13 @@ on_batch_query( #{ producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate } = maps:get(ChannelID, Channels), - try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of - Records -> append_record(InstId, Producer, Records, true) + try + KeyAndRawRecordList = to_multi_part_key_and_partition_key( + PartitionKey, HRecordTemplate, BatchList + ), + emqx_trace:rendered_action_template(ChannelID, #{records => KeyAndRawRecordList}), + Records = [to_record(Item) || Item <- KeyAndRawRecordList], + append_record(InstId, Producer, Records, true) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -348,20 +356,20 @@ ensure_start_producer(ProducerName, ProducerOptions) -> produce_name(ActionId) -> list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)). -to_record(PartitionKeyTmpl, HRecordTmpl, Data) -> +to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data) -> PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data), RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data), - to_record(PartitionKey, RawRecord). + #{partition_key => PartitionKey, raw_record => RawRecord}. -to_record(PartitionKey, RawRecord) when is_binary(PartitionKey) -> - to_record(binary_to_list(PartitionKey), RawRecord); -to_record(PartitionKey, RawRecord) -> +to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) when is_binary(PartitionKey) -> + to_record(#{partition_key => binary_to_list(PartitionKey), raw_record => RawRecord}); +to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) -> hstreamdb:to_record(PartitionKey, raw, RawRecord). -to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> +to_multi_part_key_and_partition_key(PartitionKeyTmpl, HRecordTmpl, BatchList) -> lists:map( fun({_, Data}) -> - to_record(PartitionKeyTmpl, HRecordTmpl, Data) + to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data) end, BatchList ). diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 9be7457e1..88e4d9c36 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -359,7 +359,7 @@ on_query(InstId, {Method, Request, Timeout}, State) -> on_query( InstId, {ActionId, KeyOrNum, Method, Request, Timeout, Retry}, - #{base_path := BasePath} = State + #{base_path := BasePath, host := Host} = State ) -> ?TRACE( "QUERY", @@ -373,7 +373,7 @@ on_query( } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(ActionId, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), Worker = resolve_pool_worker(State, KeyOrNum), Result0 = ehttpc:request( Worker, @@ -469,7 +469,7 @@ on_query_async( InstId, {ActionId, KeyOrNum, Method, Request, Timeout}, ReplyFunAndArgs, - #{base_path := BasePath} = State + #{base_path := BasePath, host := Host} = State ) -> Worker = resolve_pool_worker(State, KeyOrNum), ?TRACE( @@ -483,7 +483,7 @@ on_query_async( } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(ActionId, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), MaxAttempts = maps:get(max_attempts, State, 3), Context = #{ attempt => 1, @@ -503,15 +503,16 @@ on_query_async( ), {ok, Worker}. -trace_rendered_action_template(ActionId, Method, NRequest, Timeout) -> +trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) -> case NRequest of {Path, Headers} -> emqx_trace:rendered_action_template( ActionId, #{ + host => Host, path => Path, method => Method, - headers => emqx_utils_redact:redact_headers(Headers), + headers => {fun emqx_utils_redact:redact_headers/1, Headers}, timeout => Timeout } ); @@ -519,15 +520,19 @@ trace_rendered_action_template(ActionId, Method, NRequest, Timeout) -> emqx_trace:rendered_action_template( ActionId, #{ + host => Host, path => Path, method => Method, - headers => emqx_utils_redact:redact_headers(Headers), + headers => {fun emqx_utils_redact:redact_headers/1, Headers}, timeout => Timeout, - body => Body + body => {fun log_format_body/1, Body} } ) end. +log_format_body(Body) -> + unicode:characters_to_binary(Body). + resolve_pool_worker(State, undefined) -> resolve_pool_worker(State, self()); resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 94419c7d9..f239d3735 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -130,7 +130,7 @@ on_query(InstId, {Channel, Message}, #{channels := ChannelConf}) -> influxdb_connector_send_query, #{points => Points, batch => false, mode => sync} ), - do_query(InstId, Client, Points); + do_query(InstId, Channel, Client, Points); {error, ErrorPoints} -> ?tp( influxdb_connector_send_query_error, @@ -152,7 +152,7 @@ on_batch_query(InstId, BatchData, #{channels := ChannelConf}) -> influxdb_connector_send_query, #{points => Points, batch => true, mode => sync} ), - do_query(InstId, Client, Points); + do_query(InstId, Channel, Client, Points); {error, Reason} -> ?tp( influxdb_connector_send_query_error, @@ -175,7 +175,7 @@ on_query_async( influxdb_connector_send_query, #{points => Points, batch => false, mode => async} ), - do_async_query(InstId, Client, Points, {ReplyFun, Args}); + do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args}); {error, ErrorPoints} = Err -> ?tp( influxdb_connector_send_query_error, @@ -200,7 +200,7 @@ on_batch_query_async( influxdb_connector_send_query, #{points => Points, batch => true, mode => async} ), - do_async_query(InstId, Client, Points, {ReplyFun, Args}); + do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args}); {error, Reason} -> ?tp( influxdb_connector_send_query_error, @@ -496,7 +496,8 @@ is_auth_key(_) -> %% ------------------------------------------------------------------------------------------------- %% Query -do_query(InstId, Client, Points) -> +do_query(InstId, Channel, Client, Points) -> + emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}), case influxdb:write(Client, Points) of ok -> ?SLOG(debug, #{ @@ -527,12 +528,13 @@ do_query(InstId, Client, Points) -> end end. -do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> +do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) -> ?SLOG(info, #{ msg => "influxdb_write_point_async", connector => InstId, points => Points }), + emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}), WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]}, {ok, _WorkerPid} = influxdb:write_async(Client, Points, WrappedReplyFunAndArgs). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 6bb1690ff..16bca153a 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -319,6 +319,9 @@ on_query( emqx_bridge_kafka_impl_producer_sync_query, #{headers_config => KafkaHeaders, instance_id => InstId} ), + emqx_trace:rendered_action_template(MessageTag, #{ + message => KafkaMessage, send_type => sync + }), do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) catch throw:{bad_kafka_header, _} = Error -> @@ -376,6 +379,9 @@ on_query_async( emqx_bridge_kafka_impl_producer_async_query, #{headers_config => KafkaHeaders, instance_id => InstId} ), + emqx_trace:rendered_action_template(MessageTag, #{ + message => KafkaMessage, send_type => async + }), do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) catch error:{invalid_partition_count, _Count, _Partitioner} -> diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index c8a522e01..8744dfd71 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -261,6 +261,11 @@ do_send_requests_sync( stream_name := StreamName } = maps:get(ChannelId, InstalledChannels), Records = render_records(Requests, Templates), + StructuredRecords = [ + #{data => Data, partition_key => PartitionKey} + || {Data, PartitionKey} <- Records + ], + emqx_trace:rendered_action_template(ChannelId, StructuredRecords), Result = ecpool:pick_and_do( PoolName, {emqx_bridge_kinesis_connector_client, query, [Records, StreamName]}, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index a0d53d454..69c2242e4 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -66,10 +66,15 @@ on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_stat payload_template := PayloadTemplate, collection_template := CollectionTemplate } = ChannelState0 = maps:get(Channel, Channels), + Collection = emqx_placeholder:proc_tmpl(CollectionTemplate, Message0), ChannelState = ChannelState0#{ - collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0) + collection => Collection }, Message = render_message(PayloadTemplate, Message0), + emqx_trace:rendered_action_template(Channel, #{ + collection => Collection, + data => Message + }), Res = emqx_mongodb:on_query( InstanceId, {Channel, Message}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 900f6143f..f133bf334 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -264,7 +264,7 @@ on_query( ), Channels = maps:get(installed_channels, State), ChannelConfig = maps:get(ChannelId, Channels), - handle_send_result(with_egress_client(PoolName, send, [Msg, ChannelConfig])); + handle_send_result(with_egress_client(ChannelId, PoolName, send, [Msg, ChannelConfig])); on_query(ResourceId, {_ChannelId, Msg}, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", @@ -283,7 +283,7 @@ on_query_async( Callback = {fun on_async_result/2, [CallbackIn]}, Channels = maps:get(installed_channels, State), ChannelConfig = maps:get(ChannelId, Channels), - Result = with_egress_client(PoolName, send_async, [Msg, Callback, ChannelConfig]), + Result = with_egress_client(ChannelId, PoolName, send_async, [Msg, Callback, ChannelConfig]), case Result of ok -> ok; @@ -300,8 +300,11 @@ on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) -> reason => "Egress is not configured" }). -with_egress_client(ResourceId, Fun, Args) -> - ecpool:pick_and_do(ResourceId, {emqx_bridge_mqtt_egress, Fun, Args}, no_handover). +with_egress_client(ActionID, ResourceId, Fun, Args) -> + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ActionID), + ecpool:pick_and_do( + ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedCTX | Args]}, no_handover + ). on_async_result(Callback, Result) -> apply_callback_function(Callback, handle_send_result(Result)). @@ -337,6 +340,8 @@ classify_error({shutdown, _} = Reason) -> {recoverable_error, Reason}; classify_error(shutdown = Reason) -> {recoverable_error, Reason}; +classify_error({unrecoverable_error, _Reason} = Error) -> + Error; classify_error(Reason) -> {unrecoverable_error, Reason}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index d23899ef1..a4a0b0d37 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -22,8 +22,8 @@ -export([ config/1, - send/3, - send_async/4 + send/4, + send_async/5 ]). -type message() :: emqx_types:message() | map(). @@ -42,25 +42,40 @@ config(#{remote := RC = #{}} = Conf) -> Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}. --spec send(pid(), message(), egress()) -> ok. -send(Pid, MsgIn, Egress) -> - emqtt:publish(Pid, export_msg(MsgIn, Egress)). +-spec send(pid(), emqx_trace:rendered_action_template_ctx(), message(), egress()) -> + ok | {error, {unrecoverable_error, term()}}. +send(Pid, TraceRenderedCTX, MsgIn, Egress) -> + try + emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedCTX)) + catch + error:{unrecoverable_error, Reason} -> + {error, {unrecoverable_error, Reason}} + end. --spec send_async(pid(), message(), callback(), egress()) -> - ok | {ok, pid()}. -send_async(Pid, MsgIn, Callback, Egress) -> - ok = emqtt:publish_async(Pid, export_msg(MsgIn, Egress), _Timeout = infinity, Callback), - {ok, Pid}. +-spec send_async(pid(), emqx_trace:rendered_action_template_ctx(), message(), callback(), egress()) -> + {ok, pid()} | {error, {unrecoverable_error, term()}}. +send_async(Pid, TraceRenderedCTX, MsgIn, Callback, Egress) -> + try + ok = emqtt:publish_async( + Pid, export_msg(MsgIn, Egress, TraceRenderedCTX), _Timeout = infinity, Callback + ), + {ok, Pid} + catch + error:{unrecoverable_error, Reason} -> + {error, {unrecoverable_error, Reason}} + end. -export_msg(Msg, #{remote := Remote}) -> - to_remote_msg(Msg, Remote). +export_msg(Msg, #{remote := Remote}, TraceRenderedCTX) -> + to_remote_msg(Msg, Remote, TraceRenderedCTX). --spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars()) -> +-spec to_remote_msg( + message(), emqx_bridge_mqtt_msg:msgvars(), emqx_trace:rendered_action_template_ctx() +) -> remote_message(). -to_remote_msg(#message{flags = Flags} = Msg, Vars) -> +to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedCTX) -> {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), - to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars); -to_remote_msg(Msg = #{}, Remote) -> + to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedCTX); +to_remote_msg(Msg = #{}, Remote, TraceRenderedCTX) -> #{ topic := Topic, payload := Payload, @@ -68,6 +83,13 @@ to_remote_msg(Msg = #{}, Remote) -> retain := Retain } = emqx_bridge_mqtt_msg:render(Msg, Remote), PubProps = maps:get(pub_props, Msg, #{}), + emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ + qos => QoS, + retain => Retain, + topic => Topic, + props => PubProps, + payload => Payload + }), #mqtt_msg{ qos = QoS, retain = Retain, diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl index 6720e1fb7..da9377814 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl @@ -104,10 +104,12 @@ on_query( #{channels := Channels, connector_state := ConnectorState} ) when is_binary(Channel) -> ChannelConfig = maps:get(Channel, Channels), + MergedState0 = maps:merge(ConnectorState, ChannelConfig), + MergedState1 = MergedState0#{channel_id => Channel}, Result = emqx_mysql:on_query( InstanceId, Request, - maps:merge(ConnectorState, ChannelConfig) + MergedState1 ), ?tp(mysql_connector_on_query_return, #{instance_id => InstanceId, result => Result}), Result; @@ -121,10 +123,12 @@ on_batch_query( ) when is_binary(element(1, Req)) -> Channel = element(1, Req), ChannelConfig = maps:get(Channel, Channels), + MergedState0 = maps:merge(ConnectorState, ChannelConfig), + MergedState1 = MergedState0#{channel_id => Channel}, Result = emqx_mysql:on_batch_query( InstanceId, BatchRequest, - maps:merge(ConnectorState, ChannelConfig) + MergedState1 ), ?tp(mysql_connector_on_batch_query_return, #{instance_id => InstanceId, result => Result}), Result; diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index abef958ff..509d53284 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -167,9 +167,10 @@ on_batch_query( BatchReq, #{channels := Channels} = State ) -> + [{ChannelId, _} | _] = BatchReq, case try_render_messages(BatchReq, Channels) of {ok, Datas} -> - do_query(InstanceId, Datas, State); + do_query(InstanceId, ChannelId, Datas, State); Error -> Error end. @@ -222,12 +223,13 @@ on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) -> %% Helper fns %%======================================================================================== -do_query(InstanceId, Query, #{pool_name := PoolName} = State) -> +do_query(InstanceId, ChannelID, Query, #{pool_name := PoolName} = State) -> ?TRACE( "QUERY", "opents_connector_received", #{connector => InstanceId, query => Query, state => State} ), + emqx_trace:rendered_action_template(ChannelID, #{query => Query}), ?tp(opents_bridge_on_query, #{instance_id => InstanceId}), diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 8c39c3671..0cddfab66 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -196,6 +196,11 @@ on_query(_InstanceId, {ChannelId, Message}, State) -> {error, channel_not_found}; {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} -> PulsarMessage = render_message(Message, MessageTmpl), + emqx_trace:rendered_action_template(ChannelId, #{ + message => PulsarMessage, + sync_timeout => SyncTimeout, + is_async => false + }), try pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) catch @@ -217,12 +222,16 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> ?tp_span( pulsar_producer_on_query_async, #{instance_id => _InstanceId, message => Message}, - on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) + on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ) end. -on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) -> +on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) -> PulsarMessage = render_message(Message, MessageTmpl), + emqx_trace:rendered_action_template(ChannelId, #{ + message => PulsarMessage, + is_async => true + }), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). %%------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 1ef1c6617..dacb47a57 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -9,6 +9,7 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -41,7 +42,7 @@ -export([connect/1]). %% Internal callbacks --export([publish_messages/4]). +-export([publish_messages/5]). namespace() -> "rabbitmq". @@ -214,9 +215,10 @@ on_query(ResourceID, {ChannelId, Data} = MsgReq, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId), Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq]]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedCTX]}, no_handover ), handle_result(Res); @@ -234,9 +236,10 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId), Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedCTX]}, no_handover ), handle_result(Res); @@ -255,7 +258,8 @@ publish_messages( wait_for_publish_confirmations := WaitForPublishConfirmations, publish_confirmation_timeout := PublishConfirmationTimeout }, - Messages + Messages, + TraceRenderedCTX ) -> try publish_messages( @@ -267,15 +271,18 @@ publish_messages( PayloadTmpl, Messages, WaitForPublishConfirmations, - PublishConfirmationTimeout + PublishConfirmationTimeout, + TraceRenderedCTX ) catch + error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason -> + {error, Reason}; %% if send a message to a non-existent exchange, RabbitMQ client will crash %% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>} %% so we catch and return {recoverable_error, Reason} to increase metrics _Type:Reason -> Msg = iolist_to_binary(io_lib:format("RabbitMQ: publish_failed: ~p", [Reason])), - erlang:error({recoverable_error, Msg}) + {error, {recoverable_error, Msg}} end. publish_messages( @@ -287,7 +294,8 @@ publish_messages( PayloadTmpl, Messages, WaitForPublishConfirmations, - PublishConfirmationTimeout + PublishConfirmationTimeout, + TraceRenderedCTX ) -> case maps:find(Conn, RabbitMQ) of {ok, Channel} -> @@ -299,18 +307,33 @@ publish_messages( exchange = Exchange, routing_key = RoutingKey }, + FormattedMsgs = [ + format_data(PayloadTmpl, M) + || {_, M} <- Messages + ], + emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ + messages => FormattedMsgs, + properties => #{ + headers => [], + delivery_mode => DeliveryMode + }, + method => #{ + exchange => Exchange, + routing_key => RoutingKey + } + }), lists:foreach( - fun({_, MsgRaw}) -> + fun(Msg) -> amqp_channel:cast( Channel, Method, #amqp_msg{ - payload = format_data(PayloadTmpl, MsgRaw), + payload = Msg, props = MessageProperties } ) end, - Messages + FormattedMsgs ), case WaitForPublishConfirmations of true -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index b7477b385..3f7c4897c 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -107,7 +107,7 @@ on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) -> Result; on_query( InstId, - {_MessageTag, _Data} = Msg, + {MessageTag, _Data} = Msg, #{channels := Channels, conn_st := RedisConnSt} ) -> case try_render_message([Msg], Channels) of @@ -116,6 +116,10 @@ on_query( redis_bridge_connector_cmd, #{cmd => Cmd, batch => false, mode => sync} ), + emqx_trace:rendered_action_template( + MessageTag, + #{command => Cmd, batch => false, mode => sync} + ), Result = query(InstId, {cmd, Cmd}, RedisConnSt), ?tp( redis_bridge_connector_send_done, @@ -135,6 +139,11 @@ on_batch_query( redis_bridge_connector_send, #{batch_data => BatchData, batch => true, mode => sync} ), + [{ChannelID, _} | _] = BatchData, + emqx_trace:rendered_action_template( + ChannelID, + #{commands => Cmds, batch => ture, mode => sync} + ), Result = query(InstId, {cmds, Cmds}, RedisConnSt), ?tp( redis_bridge_connector_send_done, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index f9b4ec5d4..314afb350 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -264,7 +264,11 @@ do_query( TopicKey = get_topic_key(Query, TopicTks), Data = apply_template(Query, Templates, DispatchStrategy), - + emqx_trace:rendered_action_template(ChannelId, #{ + topic_key => TopicKey, + data => Data, + request_timeout => RequestTimeout + }), Result = safe_do_produce( ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout ), diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 4407222d5..5d3ed19f8 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -168,13 +168,14 @@ init_channel_state(#{parameters := Parameters}) -> on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) -> case maps:get(Tag, Channels, undefined) of ChannelState = #{} -> - run_simple_upload(InstId, Data, ChannelState, Config); + run_simple_upload(InstId, Tag, Data, ChannelState, Config); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. run_simple_upload( InstId, + ChannelID, Data, #{ bucket := BucketTemplate, @@ -188,6 +189,11 @@ run_simple_upload( Client = emqx_s3_client:create(Bucket, Config), Key = render_key(KeyTemplate, Data), Content = render_content(ContentTemplate, Data), + emqx_trace:rendered_action_template(ChannelID, #{ + bucket => Bucket, + key => Key, + content => Content + }), case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of ok -> ?tp(s3_bridge_connector_upload_ok, #{ diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 1eb9746dc..683551316 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -413,6 +413,9 @@ do_query( %% only insert sql statement for single query and batch query case apply_template(QueryTuple, Templates) of {?ACTION_SEND_MESSAGE, SQL} -> + emqx_trace:rendered_action_template(ChannelId, #{ + sql => SQL + }), Result = ecpool:pick_and_do( PoolName, {?MODULE, worker_do_insert, [SQL, State]}, diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 045af348f..a6d47229c 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -273,6 +273,8 @@ do_query( Result = case try_render_message(Query, Channels) of {ok, Msg} -> + [{ChannelID, _} | _] = Query, + emqx_trace:rendered_action_template(ChannelID, #{message => Msg}), ecpool:pick_and_do( PoolName, {emqx_bridge_syskeeper_client, forward, [Msg, AckTimeout + ?EXTRA_CALL_TIMEOUT]}, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 7bb342ed1..67b0e77bc 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -10,6 +10,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -32,7 +33,7 @@ -export([connector_examples/1]). --export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]). +-export([connect/1, do_get_status/1, execute/3, do_batch_insert/5]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -186,6 +187,7 @@ on_query(InstanceId, {ChannelId, Data}, #{channels := Channels} = State) -> case maps:find(ChannelId, Channels) of {ok, #{insert := Tokens, opts := Opts}} -> Query = emqx_placeholder:proc_tmpl(Tokens, Data), + emqx_trace:rendered_action_template(ChannelId, #{query => Query}), do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State); _ -> {error, {unrecoverable_error, {invalid_channel_id, InstanceId}}} @@ -199,9 +201,10 @@ on_batch_query( ) -> case maps:find(ChannelId, Channels) of {ok, #{batch := Tokens, opts := Opts}} -> + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId), do_query_job( InstanceId, - {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]}, + {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedCTX]}, State ); _ -> @@ -338,9 +341,18 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> execute(Conn, Query, Opts) -> tdengine:insert(Conn, Query, Opts). -do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> +do_batch_insert(Conn, Tokens, BatchReqs, Opts, TraceRenderedCTX) -> SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>), - execute(Conn, SQL, Opts). + try + emqx_trace:rendered_action_template_with_ctx( + TraceRenderedCTX, + #{query => SQL} + ), + execute(Conn, SQL, Opts) + catch + error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason -> + {error, Reason} + end. aggregate_query(BatchTks, BatchReqs, Acc) -> lists:foldl( diff --git a/apps/emqx_mysql/src/emqx_mysql.app.src b/apps/emqx_mysql/src/emqx_mysql.app.src index f23d7b092..9637cc473 100644 --- a/apps/emqx_mysql/src/emqx_mysql.app.src +++ b/apps/emqx_mysql/src/emqx_mysql.app.src @@ -1,6 +1,6 @@ {application, emqx_mysql, [ {description, "EMQX MySQL Database Connector"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index e77965d67..ff851558a 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -498,6 +498,8 @@ on_sql_query( ) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), + ChannelID = maps:get(channel_id, State, no_channel), + emqx_trace:rendered_action_template(ChannelID, #{sql => SQLOrKey}), Worker = ecpool:get_client(PoolName), case ecpool_worker:client(Worker) of {ok, Conn} -> diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 4e88a3d96..e90665cc4 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -210,7 +210,7 @@ on_query( }), Type = query, {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), - Res = on_sql_query(InstId, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data), + Res = on_sql_query(InstId, TypeOrKey, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data), handle_result(Res). on_batch_query( @@ -244,7 +244,9 @@ on_batch_query( Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), case - on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2) + on_sql_query( + InstId, BinKey, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2 + ) of {ok, Results} -> handle_batch_result(Results, 0); @@ -281,7 +283,13 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{ end end. -on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) -> +on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) -> + emqx_trace:rendered_action_template(ChannelID, #{ + type => Type, + apply_mode => ApplyMode, + name_or_sql => NameOrSQL, + data => Data + }), case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of {error, Reason} = Result -> ?tp( diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index f27ec8615..761c9c0f6 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -304,7 +304,7 @@ on_query( }), Type = pgsql_query_type(TypeOrKey), {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), - Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data), + Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data), ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}), handle_result(Res). @@ -337,7 +337,7 @@ on_batch_query( {_Statement, RowTemplate} -> PrepStatement = get_prepared_statement(BinKey, State), Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], - case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of + case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of {error, _Error} = Result -> handle_result(Result); {_Column, Results} -> @@ -386,7 +386,15 @@ get_prepared_statement(Key, #{prepares := PrepStatements}) -> BinKey = to_bin(Key), maps:get(BinKey, PrepStatements). -on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> +on_sql_query(Key, InstId, PoolName, Type, NameOrSQL, Data) -> + emqx_trace:rendered_action_template( + Key, + #{ + statement_type => Type, + statement_or_name => NameOrSQL, + data => Data + } + ), try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of {error, Reason} -> ?tp( diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index f99341a9b..5ec4bdc6e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -18,6 +18,7 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("emqx_resource/include/emqx_resource_errors.hrl"). -export([ @@ -141,22 +142,24 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) -> logger:update_process_metadata(#{ - clientid => ClientID - }), - set_process_trace_metadata(RuleID, maps:remove(clientid, Columns)); + clientid => ClientID, + rule_id => RuleID, + rule_trigger_time => rule_trigger_time(Columns) + }); set_process_trace_metadata(RuleID, Columns) -> - EventTimestamp = - case Columns of - #{timestamp := Timestamp} -> - Timestamp; - _ -> - erlang:system_time(millisecond) - end, logger:update_process_metadata(#{ rule_id => RuleID, - rule_trigger_time => EventTimestamp + rule_trigger_time => rule_trigger_time(Columns) }). +rule_trigger_time(Columns) -> + case Columns of + #{timestamp := Timestamp} -> + Timestamp; + _ -> + erlang:system_time(millisecond) + end. + reset_process_trace_metadata(#{clientid := _ClientID}) -> Meta = logger:get_process_metadata(), Meta1 = maps:remove(clientid, Meta), @@ -722,7 +725,7 @@ inc_action_metrics(TraceCtx, Result) -> do_inc_action_metrics( #{rule_id := RuleId, action_id := ActId} = TraceContext, - {error, {unrecoverable_error, {action_stopped_after_template_rendering, Explanation}} = _Reason} + {error, ?EMQX_TRACE_STOP_ACTION(Explanation) = _Reason} ) -> TraceContext1 = maps:remove(action_id, TraceContext), trace_action(