692 lines
27 KiB
Erlang
692 lines
27 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_events).
|
|
|
|
-include("rule_engine.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-logger_header("[RuleEvents]").
|
|
|
|
-export([ load/1
|
|
, unload/0
|
|
, unload/1
|
|
, event_name/1
|
|
, eventmsg_publish/1
|
|
]).
|
|
|
|
-export([ on_client_connected/3
|
|
, on_client_disconnected/4
|
|
, on_session_subscribed/4
|
|
, on_session_unsubscribed/4
|
|
, on_message_publish/2
|
|
, on_message_dropped/4
|
|
, on_message_delivered/3
|
|
, on_message_acked/3
|
|
, on_delivery_dropped/4
|
|
]).
|
|
|
|
-export([ event_info/0
|
|
, columns/1
|
|
, columns_with_exam/1
|
|
]).
|
|
|
|
-define(SUPPORTED_HOOK,
|
|
[ 'client.connected'
|
|
, 'client.disconnected'
|
|
, 'session.subscribed'
|
|
, 'session.unsubscribed'
|
|
, 'message.publish'
|
|
, 'message.delivered'
|
|
, 'message.acked'
|
|
, 'message.dropped'
|
|
, 'delivery.dropped'
|
|
]).
|
|
|
|
-ifdef(TEST).
|
|
-export([ reason/1
|
|
, hook_fun/1
|
|
, printable_maps/1
|
|
]).
|
|
-endif.
|
|
|
|
load(Topic) ->
|
|
HookPoint = event_name(Topic),
|
|
emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint),
|
|
[hook_conf(HookPoint, env())]}).
|
|
|
|
unload() ->
|
|
lists:foreach(fun(HookPoint) ->
|
|
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
|
|
end, ?SUPPORTED_HOOK).
|
|
|
|
unload(Topic) ->
|
|
HookPoint = event_name(Topic),
|
|
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}).
|
|
|
|
env() ->
|
|
application:get_all_env(?APP).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_message_publish(Message = #message{flags = #{event := true}},
|
|
_Env) ->
|
|
{ok, Message};
|
|
on_message_publish(Message = #message{flags = #{sys := true}},
|
|
#{ignore_sys_message := true}) ->
|
|
{ok, Message};
|
|
on_message_publish(Message = #message{topic = Topic}, _Env) ->
|
|
case emqx_rule_registry:get_rules_for(Topic) of
|
|
[] -> ok;
|
|
Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
|
|
end,
|
|
{ok, Message}.
|
|
|
|
on_client_connected(ClientInfo, ConnInfo, Env) ->
|
|
may_publish_and_apply('client.connected',
|
|
fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env).
|
|
|
|
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
|
|
may_publish_and_apply('client.disconnected',
|
|
fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env).
|
|
|
|
on_session_subscribed(ClientInfo, Topic, SubOpts, Env) ->
|
|
may_publish_and_apply('session.subscribed',
|
|
fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env).
|
|
|
|
on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) ->
|
|
may_publish_and_apply('session.unsubscribed',
|
|
fun() -> eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) end, Env).
|
|
|
|
on_message_dropped(Message = #message{flags = #{sys := true}},
|
|
_, _, #{ignore_sys_message := true}) ->
|
|
{ok, Message};
|
|
on_message_dropped(Message, _, Reason, Env) ->
|
|
may_publish_and_apply('message.dropped',
|
|
fun() -> eventmsg_dropped(Message, Reason) end, Env),
|
|
{ok, Message}.
|
|
|
|
on_message_delivered(_ClientInfo, Message = #message{flags = #{sys := true}},
|
|
#{ignore_sys_message := true}) ->
|
|
{ok, Message};
|
|
on_message_delivered(ClientInfo, Message, Env) ->
|
|
may_publish_and_apply('message.delivered',
|
|
fun() -> eventmsg_delivered(ClientInfo, Message) end, Env),
|
|
{ok, Message}.
|
|
|
|
on_message_acked(_ClientInfo, Message = #message{flags = #{sys := true}},
|
|
#{ignore_sys_message := true}) ->
|
|
{ok, Message};
|
|
on_message_acked(ClientInfo, Message, Env) ->
|
|
may_publish_and_apply('message.acked',
|
|
fun() -> eventmsg_acked(ClientInfo, Message) end, Env),
|
|
{ok, Message}.
|
|
|
|
on_delivery_dropped(_ClientInfo, Message = #message{flags = #{sys := true}},
|
|
_Reason, #{ignore_sys_message := true}) ->
|
|
{ok, Message};
|
|
on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
|
|
may_publish_and_apply('delivery.dropped',
|
|
fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Env),
|
|
{ok, Message}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Event Messages
|
|
%%--------------------------------------------------------------------
|
|
|
|
eventmsg_publish(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
|
|
with_basic_columns('message.publish',
|
|
#{id => emqx_guid:to_hexstr(Id),
|
|
clientid => ClientId,
|
|
username => emqx_message:get_header(username, Message, undefined),
|
|
payload => Payload,
|
|
peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
|
|
topic => Topic,
|
|
qos => QoS,
|
|
flags => Flags,
|
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
|
%% the column 'headers' will be removed in the next major release
|
|
headers => printable_maps(Headers),
|
|
publish_received_at => Timestamp
|
|
}).
|
|
|
|
eventmsg_connected(_ClientInfo = #{
|
|
clientid := ClientId,
|
|
username := Username,
|
|
is_bridge := IsBridge,
|
|
mountpoint := Mountpoint
|
|
},
|
|
_ConnInfo = #{
|
|
peername := PeerName,
|
|
sockname := SockName,
|
|
clean_start := CleanStart,
|
|
proto_name := ProtoName,
|
|
proto_ver := ProtoVer,
|
|
keepalive := Keepalive,
|
|
connected_at := ConnectedAt,
|
|
conn_props := ConnProps,
|
|
receive_maximum := RcvMax,
|
|
expiry_interval := ExpiryInterval
|
|
}) ->
|
|
with_basic_columns('client.connected',
|
|
#{clientid => ClientId,
|
|
username => Username,
|
|
mountpoint => Mountpoint,
|
|
peername => ntoa(PeerName),
|
|
sockname => ntoa(SockName),
|
|
proto_name => ProtoName,
|
|
proto_ver => ProtoVer,
|
|
keepalive => Keepalive,
|
|
clean_start => CleanStart,
|
|
receive_maximum => RcvMax,
|
|
expiry_interval => ExpiryInterval,
|
|
is_bridge => IsBridge,
|
|
conn_props => printable_maps(ConnProps),
|
|
connected_at => ConnectedAt
|
|
}).
|
|
|
|
eventmsg_disconnected(_ClientInfo = #{
|
|
clientid := ClientId,
|
|
username := Username
|
|
},
|
|
ConnInfo = #{
|
|
peername := PeerName,
|
|
sockname := SockName,
|
|
disconnected_at := DisconnectedAt
|
|
}, Reason) ->
|
|
with_basic_columns('client.disconnected',
|
|
#{reason => reason(Reason),
|
|
clientid => ClientId,
|
|
username => Username,
|
|
peername => ntoa(PeerName),
|
|
sockname => ntoa(SockName),
|
|
disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
|
|
disconnected_at => DisconnectedAt
|
|
}).
|
|
|
|
eventmsg_sub_or_unsub(Event, _ClientInfo = #{
|
|
clientid := ClientId,
|
|
username := Username,
|
|
peerhost := PeerHost
|
|
}, Topic, SubOpts = #{qos := QoS}) ->
|
|
PropKey = sub_unsub_prop_key(Event),
|
|
with_basic_columns(Event,
|
|
#{clientid => ClientId,
|
|
username => Username,
|
|
peerhost => ntoa(PeerHost),
|
|
PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
|
|
topic => Topic,
|
|
qos => QoS
|
|
}).
|
|
|
|
eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}, Reason) ->
|
|
with_basic_columns('message.dropped',
|
|
#{id => emqx_guid:to_hexstr(Id),
|
|
reason => Reason,
|
|
clientid => ClientId,
|
|
username => emqx_message:get_header(username, Message, undefined),
|
|
payload => Payload,
|
|
peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
|
|
topic => Topic,
|
|
qos => QoS,
|
|
flags => Flags,
|
|
%% the column 'headers' will be removed in the next major release
|
|
headers => printable_maps(Headers),
|
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
|
publish_received_at => Timestamp
|
|
}).
|
|
|
|
eventmsg_delivery_dropped(_ClientInfo = #{
|
|
peerhost := PeerHost,
|
|
clientid := ReceiverCId,
|
|
username := ReceiverUsername
|
|
},
|
|
Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic,
|
|
headers = Headers, payload = Payload, timestamp = Timestamp},
|
|
Reason) ->
|
|
with_basic_columns('delivery.dropped',
|
|
#{id => emqx_guid:to_hexstr(Id),
|
|
reason => Reason,
|
|
from_clientid => ClientId,
|
|
from_username => emqx_message:get_header(username, Message, undefined),
|
|
clientid => ReceiverCId,
|
|
username => ReceiverUsername,
|
|
payload => Payload,
|
|
peerhost => ntoa(PeerHost),
|
|
topic => Topic,
|
|
qos => QoS,
|
|
flags => Flags,
|
|
%% the column 'headers' will be removed in the next major release
|
|
headers => printable_maps(Headers),
|
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
|
publish_received_at => Timestamp
|
|
}).
|
|
|
|
eventmsg_delivered(_ClientInfo = #{
|
|
peerhost := PeerHost,
|
|
clientid := ReceiverCId,
|
|
username := ReceiverUsername
|
|
}, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
|
|
with_basic_columns('message.delivered',
|
|
#{id => emqx_guid:to_hexstr(Id),
|
|
from_clientid => ClientId,
|
|
from_username => emqx_message:get_header(username, Message, undefined),
|
|
clientid => ReceiverCId,
|
|
username => ReceiverUsername,
|
|
payload => Payload,
|
|
peerhost => ntoa(PeerHost),
|
|
topic => Topic,
|
|
qos => QoS,
|
|
flags => Flags,
|
|
%% the column 'headers' will be removed in the next major release
|
|
headers => printable_maps(Headers),
|
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
|
publish_received_at => Timestamp
|
|
}).
|
|
|
|
eventmsg_acked(_ClientInfo = #{
|
|
peerhost := PeerHost,
|
|
clientid := ReceiverCId,
|
|
username := ReceiverUsername
|
|
}, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
|
|
with_basic_columns('message.acked',
|
|
#{id => emqx_guid:to_hexstr(Id),
|
|
from_clientid => ClientId,
|
|
from_username => emqx_message:get_header(username, Message, undefined),
|
|
clientid => ReceiverCId,
|
|
username => ReceiverUsername,
|
|
payload => Payload,
|
|
peerhost => ntoa(PeerHost),
|
|
topic => Topic,
|
|
qos => QoS,
|
|
flags => Flags,
|
|
%% the column 'headers' will be removed in the next major release
|
|
headers => printable_maps(Headers),
|
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
|
puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
|
|
publish_received_at => Timestamp
|
|
}).
|
|
|
|
sub_unsub_prop_key('session.subscribed') -> sub_props;
|
|
sub_unsub_prop_key('session.unsubscribed') -> unsub_props.
|
|
|
|
with_basic_columns(EventName, Data) when is_map(Data) ->
|
|
Data#{event => EventName,
|
|
timestamp => erlang:system_time(millisecond),
|
|
node => node()
|
|
}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Events publishing and rules applying
|
|
%%--------------------------------------------------------------------
|
|
|
|
may_publish_and_apply(EventName, GenEventMsg, #{enabled := true, qos := QoS}) ->
|
|
EventTopic = event_topic(EventName),
|
|
EventMsg = GenEventMsg(),
|
|
case emqx_json:safe_encode(EventMsg) of
|
|
{ok, Payload} ->
|
|
_ = emqx_broker:safe_publish(make_msg(QoS, EventTopic, Payload)),
|
|
ok;
|
|
{error, _Reason} ->
|
|
?LOG(error, "Failed to encode event msg for ~p, msg: ~p", [EventName, EventMsg])
|
|
end,
|
|
emqx_rule_runtime:apply_rules(emqx_rule_registry:get_rules_for(EventTopic), EventMsg);
|
|
may_publish_and_apply(EventName, GenEventMsg, _Env) ->
|
|
EventTopic = event_topic(EventName),
|
|
case emqx_rule_registry:get_rules_for(EventTopic) of
|
|
[] -> ok;
|
|
Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
|
|
end.
|
|
|
|
make_msg(QoS, Topic, Payload) ->
|
|
emqx_message:set_flags(#{sys => true, event => true},
|
|
emqx_message:make(emqx_events, QoS, Topic, iolist_to_binary(Payload))).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Columns
|
|
%%--------------------------------------------------------------------
|
|
columns(Event) ->
|
|
[Key || {Key, _ExampleVal} <- columns_with_exam(Event)].
|
|
|
|
event_info() ->
|
|
[ event_info_message_publish()
|
|
, event_info_message_deliver()
|
|
, event_info_message_acked()
|
|
, event_info_message_dropped()
|
|
, event_info_delivery_dropped()
|
|
, event_info_client_connected()
|
|
, event_info_client_disconnected()
|
|
, event_info_session_subscribed()
|
|
, event_info_session_unsubscribed()
|
|
].
|
|
|
|
event_info_message_publish() ->
|
|
event_info_common(
|
|
'message.publish',
|
|
{<<"message publish">>, <<"消息发布"/utf8>>},
|
|
{<<"message publish">>, <<"消息发布"/utf8>>},
|
|
<<"SELECT payload.msg as msg FROM \"t/#\" WHERE msg = 'hello'">>
|
|
).
|
|
event_info_message_deliver() ->
|
|
event_info_common(
|
|
'message.delivered',
|
|
{<<"message delivered">>, <<"消息已投递"/utf8>>},
|
|
{<<"message delivered">>, <<"消息已投递"/utf8>>},
|
|
<<"SELECT * FROM \"$events/message_delivered\" WHERE topic =~ 't/#'">>
|
|
).
|
|
event_info_message_acked() ->
|
|
event_info_common(
|
|
'message.acked',
|
|
{<<"message acked">>, <<"消息应答"/utf8>>},
|
|
{<<"message acked">>, <<"消息应答"/utf8>>},
|
|
<<"SELECT * FROM \"$events/message_acked\" WHERE topic =~ 't/#'">>
|
|
).
|
|
event_info_message_dropped() ->
|
|
event_info_common(
|
|
'message.dropped',
|
|
{<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>},
|
|
{<<"messages are discarded during forwarding, usually because there are no subscribers">>,
|
|
<<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>},
|
|
<<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
|
|
).
|
|
event_info_delivery_dropped() ->
|
|
event_info_common(
|
|
'delivery.dropped',
|
|
{<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>},
|
|
{<<"messages are discarded during delivery, i.e. because the message queue is full">>,
|
|
<<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>},
|
|
<<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">>
|
|
).
|
|
event_info_client_connected() ->
|
|
event_info_common(
|
|
'client.connected',
|
|
{<<"client connected">>, <<"连接建立"/utf8>>},
|
|
{<<"client connected">>, <<"连接建立"/utf8>>},
|
|
<<"SELECT * FROM \"$events/client_connected\"">>
|
|
).
|
|
event_info_client_disconnected() ->
|
|
event_info_common(
|
|
'client.disconnected',
|
|
{<<"client disconnected">>, <<"连接断开"/utf8>>},
|
|
{<<"client disconnected">>, <<"连接断开"/utf8>>},
|
|
<<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">>
|
|
).
|
|
event_info_session_subscribed() ->
|
|
event_info_common(
|
|
'session.subscribed',
|
|
{<<"session subscribed">>, <<"会话订阅完成"/utf8>>},
|
|
{<<"session subscribed">>, <<"会话订阅完成"/utf8>>},
|
|
<<"SELECT * FROM \"$events/session_subscribed\" WHERE topic =~ 't/#'">>
|
|
).
|
|
event_info_session_unsubscribed() ->
|
|
event_info_common(
|
|
'session.unsubscribed',
|
|
{<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>},
|
|
{<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>},
|
|
<<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">>
|
|
).
|
|
|
|
event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
|
|
#{event => event_topic(Event),
|
|
title => #{en => TitleEN, zh => TitleZH},
|
|
description => #{en => DescrEN, zh => DescrZH},
|
|
test_columns => test_columns(Event),
|
|
columns => columns(Event),
|
|
sql_example => SqlExam
|
|
}.
|
|
|
|
test_columns('message.dropped') ->
|
|
[ {<<"reason">>, <<"no_subscribers">>}
|
|
] ++ test_columns('message.publish');
|
|
test_columns('message.publish') ->
|
|
[ {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
];
|
|
test_columns('message.acked') ->
|
|
test_columns('message.delivered');
|
|
test_columns('message.delivered') ->
|
|
[ {<<"from_clientid">>, <<"c_emqx_1">>}
|
|
, {<<"from_username">>, <<"u_emqx_1">>}
|
|
, {<<"clientid">>, <<"c_emqx_2">>}
|
|
, {<<"username">>, <<"u_emqx_2">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
];
|
|
test_columns('delivery.dropped') ->
|
|
[ {<<"reason">>, <<"queue_full">>}
|
|
] ++ test_columns('message.delivered');
|
|
test_columns('client.connected') ->
|
|
[ {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"peername">>, <<"127.0.0.1:52918">>}
|
|
];
|
|
test_columns('client.disconnected') ->
|
|
[ {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"reason">>, <<"normal">>}
|
|
];
|
|
test_columns('session.unsubscribed') ->
|
|
test_columns('session.subscribed');
|
|
test_columns('session.subscribed') ->
|
|
[ {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
].
|
|
|
|
columns_with_exam('message.publish') ->
|
|
[ {<<"event">>, 'message.publish'}
|
|
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
, {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
, {<<"flags">>, #{}}
|
|
, {<<"headers">>, undefined}
|
|
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
, {<<"node">>, node()}
|
|
];
|
|
columns_with_exam('message.delivered') ->
|
|
[ {<<"event">>, 'message.delivered'}
|
|
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
, {<<"from_clientid">>, <<"c_emqx_1">>}
|
|
, {<<"from_username">>, <<"u_emqx_1">>}
|
|
, {<<"clientid">>, <<"c_emqx_2">>}
|
|
, {<<"username">>, <<"u_emqx_2">>}
|
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
, {<<"flags">>, #{}}
|
|
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
, {<<"node">>, node()}
|
|
];
|
|
columns_with_exam('message.acked') ->
|
|
[ {<<"event">>, 'message.acked'}
|
|
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
, {<<"from_clientid">>, <<"c_emqx_1">>}
|
|
, {<<"from_username">>, <<"u_emqx_1">>}
|
|
, {<<"clientid">>, <<"c_emqx_2">>}
|
|
, {<<"username">>, <<"u_emqx_2">>}
|
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
, {<<"flags">>, #{}}
|
|
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
, {<<"node">>, node()}
|
|
];
|
|
columns_with_exam('message.dropped') ->
|
|
[ {<<"event">>, 'message.dropped'}
|
|
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
, {<<"reason">>, no_subscribers}
|
|
, {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
, {<<"flags">>, #{}}
|
|
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
, {<<"node">>, node()}
|
|
];
|
|
columns_with_exam('delivery.dropped') ->
|
|
[ {<<"event">>, 'delivery.dropped'}
|
|
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
, {<<"reason">>, queue_full}
|
|
, {<<"from_clientid">>, <<"c_emqx_1">>}
|
|
, {<<"from_username">>, <<"u_emqx_1">>}
|
|
, {<<"clientid">>, <<"c_emqx_2">>}
|
|
, {<<"username">>, <<"u_emqx_2">>}
|
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
, {<<"flags">>, #{}}
|
|
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
, {<<"node">>, node()}
|
|
];
|
|
columns_with_exam('client.connected') ->
|
|
[ {<<"event">>, 'client.connected'}
|
|
, {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"mountpoint">>, undefined}
|
|
, {<<"peername">>, <<"192.168.0.10:56431">>}
|
|
, {<<"sockname">>, <<"0.0.0.0:1883">>}
|
|
, {<<"proto_name">>, <<"MQTT">>}
|
|
, {<<"proto_ver">>, 5}
|
|
, {<<"keepalive">>, 60}
|
|
, {<<"clean_start">>, true}
|
|
, {<<"expiry_interval">>, 3600}
|
|
, {<<"is_bridge">>, false}
|
|
, {<<"connected_at">>, erlang:system_time(millisecond)}
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
, {<<"node">>, node()}
|
|
];
|
|
columns_with_exam('client.disconnected') ->
|
|
[ {<<"event">>, 'client.disconnected'}
|
|
, {<<"reason">>, normal}
|
|
, {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"peername">>, <<"192.168.0.10:56431">>}
|
|
, {<<"sockname">>, <<"0.0.0.0:1883">>}
|
|
, {<<"disconnected_at">>, erlang:system_time(millisecond)}
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
, {<<"node">>, node()}
|
|
];
|
|
columns_with_exam('session.subscribed') ->
|
|
[ {<<"event">>, 'session.subscribed'}
|
|
, {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
, {<<"node">>, node()}
|
|
];
|
|
columns_with_exam('session.unsubscribed') ->
|
|
[ {<<"event">>, 'session.unsubscribed'}
|
|
, {<<"clientid">>, <<"c_emqx">>}
|
|
, {<<"username">>, <<"u_emqx">>}
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
, {<<"topic">>, <<"t/a">>}
|
|
, {<<"qos">>, 1}
|
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
|
, {<<"node">>, node()}
|
|
].
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Helper functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
hook_conf(HookPoint, Env) ->
|
|
Events = proplists:get_value(events, Env, []),
|
|
IgnoreSys = proplists:get_value(ignore_sys_message, Env, true),
|
|
case lists:keyfind(HookPoint, 1, Events) of
|
|
{_, on, QoS} -> #{enabled => true, qos => QoS, ignore_sys_message => IgnoreSys};
|
|
_ -> #{enabled => false, qos => 1, ignore_sys_message => IgnoreSys}
|
|
end.
|
|
|
|
hook_fun(Event) ->
|
|
case string:split(atom_to_list(Event), ".") of
|
|
[Prefix, Name] ->
|
|
list_to_atom(lists:append(["on_", Prefix, "_", Name]));
|
|
[_] ->
|
|
error(invalid_event, Event)
|
|
end.
|
|
|
|
reason(Reason) when is_atom(Reason) -> Reason;
|
|
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
|
|
reason({Error, _}) when is_atom(Error) -> Error;
|
|
reason(_) -> internal_error.
|
|
|
|
ntoa(undefined) -> undefined;
|
|
ntoa({IpAddr, Port}) ->
|
|
iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
|
|
ntoa(IpAddr) ->
|
|
iolist_to_binary(inet:ntoa(IpAddr)).
|
|
|
|
event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected';
|
|
event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected';
|
|
event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed';
|
|
event_name(<<"$events/session_unsubscribed", _/binary>>) ->
|
|
'session.unsubscribed';
|
|
event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
|
|
event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
|
|
event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
|
|
event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
|
|
event_name(_) -> 'message.publish'.
|
|
|
|
event_topic('client.connected') -> <<"$events/client_connected">>;
|
|
event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
|
|
event_topic('session.subscribed') -> <<"$events/session_subscribed">>;
|
|
event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
|
|
event_topic('message.delivered') -> <<"$events/message_delivered">>;
|
|
event_topic('message.acked') -> <<"$events/message_acked">>;
|
|
event_topic('message.dropped') -> <<"$events/message_dropped">>;
|
|
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
|
|
event_topic('message.publish') -> <<"$events/message_publish">>.
|
|
|
|
printable_maps(undefined) -> #{};
|
|
printable_maps(Headers) ->
|
|
maps:fold(
|
|
fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname ->
|
|
AccIn#{K => ntoa(V0)};
|
|
('User-Property', V0, AccIn) when is_list(V0) ->
|
|
AccIn#{
|
|
'User-Property' => maps:from_list(V0),
|
|
'User-Property-Pairs' => [#{
|
|
key => Key,
|
|
value => Value
|
|
} || {Key, Value} <- V0]
|
|
};
|
|
(K, V0, AccIn) -> AccIn#{K => V0}
|
|
end, #{}, Headers).
|