diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index a9647b2c1..c216e905c 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -29,7 +29,7 @@ , bridges/0 ]). --export([on_message_received/2]). +-export([on_message_received/3]). %% callbacks of behaviour emqx_resource -export([ on_start/2 @@ -105,14 +105,17 @@ drop_bridge(Name) -> case supervisor:terminate_child(?MODULE, Name) of ok -> supervisor:delete_child(?MODULE, Name); + {error, not_found} -> + ok; {error, Error} -> {error, Error} end. %% =================================================================== -%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called +%% When use this bridge as a data source, ?MODULE:on_message_received will be called %% if the bridge received msgs from the remote broker. -on_message_received(Msg, HookPoint) -> +on_message_received(Msg, HookPoint, InstId) -> + _ = emqx_resource:query(InstId, {message_received, Msg}), emqx:run_hook(HookPoint, [Msg]). %% =================================================================== @@ -123,8 +126,8 @@ on_start(InstId, Conf) -> BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ name => InstanceId, - clientid => clientid(maps:get(clientid, Conf, InstId)), - subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)), + clientid => clientid(InstId), + subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), InstId), forwards => make_forward_confs(maps:get(egress, Conf, undefined)) }, case ?MODULE:create_bridge(BridgeConf) of @@ -149,6 +152,9 @@ on_stop(_InstId, #{name := InstanceId}) -> connector => InstanceId, reason => Reason}) end. +on_query(_InstId, {message_received, _Msg}, AfterQuery, _State) -> + emqx_resource:query_success(AfterQuery); + on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), @@ -166,15 +172,15 @@ ensure_mqtt_worker_started(InstanceId) -> {error, Reason} -> {error, Reason} end. -make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 -> +make_sub_confs(EmptyMap, _) when map_size(EmptyMap) == 0 -> undefined; -make_sub_confs(undefined) -> +make_sub_confs(undefined, _) -> undefined; -make_sub_confs(SubRemoteConf) -> +make_sub_confs(SubRemoteConf, InstId) -> case maps:take(hookpoint, SubRemoteConf) of error -> SubRemoteConf; {HookPoint, SubConf} -> - MFA = {?MODULE, on_message_received, [HookPoint]}, + MFA = {?MODULE, on_message_received, [HookPoint, InstId]}, SubConf#{on_message_received => MFA} end. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 3ab410391..d7abcda84 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -168,7 +168,6 @@ handle_publish(Msg, undefined) -> handle_publish(Msg, Vars) -> ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), - emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), case Vars of #{on_message_received := {Mod, Func, Args}} -> _ = erlang:apply(Mod, Func, [Msg | Args]); diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 1357037ee..a0dd9eec1 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -61,7 +61,7 @@ make_pub_vars(Mountpoint, Conf) when is_map(Conf) -> -> exp_msg(). to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> Retain0 = maps:get(retain, Flags0, false), - MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), + MapMsg = maps:put(retain, Retain0, emqx_rule_events:eventmsg_publish(Msg)), to_remote_msg(MapMsg, Vars); to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> @@ -78,9 +78,10 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection -to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, +to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, #{local_topic := TopicToken, payload := PayloadToken, local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> + MapMsg = format_msg_received(MapMsg0), Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), @@ -89,6 +90,33 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). +format_msg_received(#{dup := Dup, payload := Payload, properties := Props, + qos := QoS, retain := Retain, topic := Topic}) -> + #{event => '$bridges/mqtt', + id => emqx_guid:to_hexstr(emqx_guid:gen()), + payload => Payload, + topic => Topic, + qos => QoS, + flags => #{dup => Dup, retain => Retain}, + pub_props => printable_maps(Props), + timestamp => erlang:system_time(millisecond), + node => node() + }. + +printable_maps(undefined) -> #{}; +printable_maps(Headers) -> + maps:fold( + fun ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [#{ + key => Key, + value => Value + } || {Key, Value} <- V0] + }; + (K, V0, AccIn) -> AccIn#{K => V0} + end, #{}, Headers). + process_payload([], Msg) -> emqx_json:encode(Msg); process_payload(Tks, Msg) ->