diff --git a/apps/emqx/src/emqx_rule_actions_trans.erl b/apps/emqx/src/emqx_rule_actions_trans.erl deleted file mode 100644 index df1e58797..000000000 --- a/apps/emqx/src/emqx_rule_actions_trans.erl +++ /dev/null @@ -1,66 +0,0 @@ --module(emqx_rule_actions_trans). - --include_lib("syntax_tools/include/merl.hrl"). - --export([parse_transform/2]). - -parse_transform(Forms, _Options) -> - trans(Forms, []). - -trans([], ResAST) -> - lists:reverse(ResAST); -trans([{eof, L} | AST], ResAST) -> - lists:reverse([{eof, L} | ResAST]) ++ AST; -trans([{function, LineNo, FuncName, Arity, Clauses} | AST], ResAST) -> - NewClauses = trans_func_clauses(atom_to_list(FuncName), Clauses), - trans(AST, [{function, LineNo, FuncName, Arity, NewClauses} | ResAST]); -trans([Form | AST], ResAST) -> - trans(AST, [Form | ResAST]). - -trans_func_clauses("on_action_create_" ++ _ = _FuncName , Clauses) -> - NewClauses = [ - begin - Bindings = lists:flatten(get_vars(Args) ++ get_vars(Body, lefth)), - Body2 = append_to_result(Bindings, Body), - {clause, LineNo, Args, Guards, Body2} - end || {clause, LineNo, Args, Guards, Body} <- Clauses], - NewClauses; -trans_func_clauses(_FuncName, Clauses) -> - Clauses. - -get_vars(Exprs) -> - get_vars(Exprs, all). -get_vars(Exprs, Type) -> - do_get_vars(Exprs, [], Type). - -do_get_vars([], Vars, _Type) -> Vars; -do_get_vars([Line | Expr], Vars, all) -> - do_get_vars(Expr, [syntax_vars(erl_syntax:form_list([Line])) | Vars], all); -do_get_vars([Line | Expr], Vars, lefth) -> - do_get_vars(Expr, - case (Line) of - ?Q("_@LeftV = _@@_") -> Vars ++ syntax_vars(LeftV); - _ -> Vars - end, lefth). - -syntax_vars(Line) -> - sets:to_list(erl_syntax_lib:variables(Line)). - -%% append bindings to the return value as the first tuple element. -%% e.g. if the original result is R, then the new result will be {[binding()], R}. -append_to_result(Bindings, Exprs) -> - erl_syntax:revert_forms(do_append_to_result(to_keyword(Bindings), Exprs, [])). - -do_append_to_result(KeyWordVars, [Line], Res) -> - case Line of - ?Q("_@LeftV = _@RightV") -> - lists:reverse([?Q("{[_@KeyWordVars], _@LeftV}"), Line | Res]); - _ -> - lists:reverse([?Q("{[_@KeyWordVars], _@Line}") | Res]) - end; -do_append_to_result(KeyWordVars, [Line | Exprs], Res) -> - do_append_to_result(KeyWordVars, Exprs, [Line | Res]). - -to_keyword(Vars) -> - [erl_syntax:tuple([erl_syntax:atom(Var), merl:var(Var)]) - || Var <- Vars]. diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 97125d79f..6ea493aa2 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -146,7 +146,6 @@ reboot_apps() -> , emqx_management , emqx_retainer , emqx_exhook - , emqx_rule_actions , emqx_authn , emqx_authz ]. diff --git a/apps/emqx_rule_actions/README.md b/apps/emqx_rule_actions/README.md deleted file mode 100644 index c17e1a34a..000000000 --- a/apps/emqx_rule_actions/README.md +++ /dev/null @@ -1,11 +0,0 @@ -# emqx_rule_actions - -This project contains a collection of rule actions/resources. It is mainly for - making unit test easier. Also it's easier for us to create utils that many - modules depends on it. - -## Build ------ - - $ rebar3 compile - diff --git a/apps/emqx_rule_actions/rebar.config b/apps/emqx_rule_actions/rebar.config deleted file mode 100644 index 097c18a3d..000000000 --- a/apps/emqx_rule_actions/rebar.config +++ /dev/null @@ -1,25 +0,0 @@ -{deps, []}. - -{erl_opts, [warn_unused_vars, - warn_shadow_vars, - warn_unused_import, - warn_obsolete_guard, - no_debug_info, - compressed, %% for edge - {parse_transform} - ]}. - -{overrides, [{add, [{erl_opts, [no_debug_info, compressed]}]}]}. - -{edoc_opts, [{preprocess, true}]}. - -{xref_checks, [undefined_function_calls, undefined_functions, - locals_not_used, deprecated_function_calls, - warnings_as_errors, deprecated_functions - ]}. - -{cover_enabled, true}. -{cover_opts, [verbose]}. -{cover_export_enabled, true}. - -{plugins, [rebar3_proper]}. diff --git a/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl deleted file mode 100644 index ce1192579..000000000 --- a/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl +++ /dev/null @@ -1,576 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol - --module(emqx_bridge_mqtt_actions). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). --include_lib("emqx_rule_engine/include/rule_actions.hrl"). - --import(emqx_plugin_libs_rule, [str/1]). - --export([ on_resource_create/2 - , on_get_resource_status/2 - , on_resource_destroy/2 - ]). - -%% Callbacks of ecpool Worker --export([connect/1]). - --export([subscriptions/1]). - --export([ on_action_create_data_to_mqtt_broker/2 - , on_action_data_to_mqtt_broker/2 - ]). - --define(RESOURCE_TYPE_MQTT, 'bridge_mqtt'). --define(RESOURCE_TYPE_RPC, 'bridge_rpc'). - --define(RESOURCE_CONFIG_SPEC_MQTT, #{ - address => #{ - order => 1, - type => string, - required => true, - default => <<"127.0.0.1:1883">>, - title => #{en => <<" Broker Address">>, - zh => <<"远程 broker 地址"/utf8>>}, - description => #{en => <<"The MQTT Remote Address">>, - zh => <<"远程 MQTT Broker 的地址"/utf8>>} - }, - pool_size => #{ - order => 2, - type => number, - required => true, - default => 8, - title => #{en => <<"Pool Size">>, - zh => <<"连接池大小"/utf8>>}, - description => #{en => <<"MQTT Connection Pool Size">>, - zh => <<"连接池大小"/utf8>>} - }, - clientid => #{ - order => 3, - type => string, - required => true, - default => <<"client">>, - title => #{en => <<"ClientId">>, - zh => <<"客户端 Id"/utf8>>}, - description => #{en => <<"ClientId for connecting to remote MQTT broker">>, - zh => <<"连接远程 Broker 的 ClientId"/utf8>>} - }, - append => #{ - order => 4, - type => boolean, - required => false, - default => true, - title => #{en => <<"Append GUID">>, - zh => <<"附加 GUID"/utf8>>}, - description => #{en => <<"Append GUID to MQTT ClientId?">>, - zh => <<"是否将GUID附加到 MQTT ClientId 后"/utf8>>} - }, - username => #{ - order => 5, - type => string, - required => false, - default => <<"">>, - title => #{en => <<"Username">>, zh => <<"用户名"/utf8>>}, - description => #{en => <<"Username for connecting to remote MQTT Broker">>, - zh => <<"连接远程 Broker 的用户名"/utf8>>} - }, - password => #{ - order => 6, - type => password, - required => false, - default => <<"">>, - title => #{en => <<"Password">>, - zh => <<"密码"/utf8>>}, - description => #{en => <<"Password for connecting to remote MQTT Broker">>, - zh => <<"连接远程 Broker 的密码"/utf8>>} - }, - mountpoint => #{ - order => 7, - type => string, - required => false, - default => <<"bridge/aws/${node}/">>, - title => #{en => <<"Bridge MountPoint">>, - zh => <<"桥接挂载点"/utf8>>}, - description => #{ - en => <<"MountPoint for bridge topic:
" - "Example: The topic of messages sent to `topic1` on local node " - "will be transformed to `bridge/aws/${node}/topic1`">>, - zh => <<"桥接主题的挂载点:
" - "示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题" - "会变换为 `bridge/aws/${node}/topic1`"/utf8>> - } - }, - disk_cache => #{ - order => 8, - type => boolean, - required => false, - default => false, - title => #{en => <<"Disk Cache">>, - zh => <<"磁盘缓存"/utf8>>}, - description => #{en => <<"The flag which determines whether messages " - "can be cached on local disk when bridge is " - "disconnected">>, - zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁" - "盘队列上"/utf8>>} - }, - proto_ver => #{ - order => 9, - type => string, - required => false, - default => <<"mqttv4">>, - enum => [<<"mqttv3">>, <<"mqttv4">>, <<"mqttv5">>], - title => #{en => <<"Protocol Version">>, - zh => <<"协议版本"/utf8>>}, - description => #{en => <<"MQTTT Protocol version">>, - zh => <<"MQTT 协议版本"/utf8>>} - }, - keepalive => #{ - order => 10, - type => string, - required => false, - default => <<"60s">> , - title => #{en => <<"Keepalive">>, - zh => <<"心跳间隔"/utf8>>}, - description => #{en => <<"Keepalive">>, - zh => <<"心跳间隔"/utf8>>} - }, - reconnect_interval => #{ - order => 11, - type => string, - required => false, - default => <<"30s">>, - title => #{en => <<"Reconnect Interval">>, - zh => <<"重连间隔"/utf8>>}, - description => #{en => <<"Reconnect interval of bridge:
">>, - zh => <<"重连间隔"/utf8>>} - }, - retry_interval => #{ - order => 12, - type => string, - required => false, - default => <<"20s">>, - title => #{en => <<"Retry interval">>, - zh => <<"重传间隔"/utf8>>}, - description => #{en => <<"Retry interval for bridge QoS1 message delivering">>, - zh => <<"消息重传间隔"/utf8>>} - }, - bridge_mode => #{ - order => 13, - type => boolean, - required => false, - default => false, - title => #{en => <<"Bridge Mode">>, - zh => <<"桥接模式"/utf8>>}, - description => #{en => <<"Bridge mode for MQTT bridge connection">>, - zh => <<"MQTT 连接是否为桥接模式"/utf8>>} - }, - ssl => #{ - order => 14, - type => boolean, - default => false, - title => #{en => <<"Enable SSL">>, - zh => <<"开启SSL链接"/utf8>>}, - description => #{en => <<"Enable SSL or not">>, - zh => <<"是否开启 SSL"/utf8>>} - }, - cacertfile => #{ - order => 15, - type => file, - required => false, - default => <<"etc/certs/cacert.pem">>, - title => #{en => <<"CA certificates">>, - zh => <<"CA 证书"/utf8>>}, - description => #{en => <<"The file path of the CA certificates">>, - zh => <<"CA 证书路径"/utf8>>} - }, - certfile => #{ - order => 16, - type => file, - required => false, - default => <<"etc/certs/client-cert.pem">>, - title => #{en => <<"SSL Certfile">>, - zh => <<"SSL 客户端证书"/utf8>>}, - description => #{en => <<"The file path of the client certfile">>, - zh => <<"客户端证书路径"/utf8>>} - }, - keyfile => #{ - order => 17, - type => file, - required => false, - default => <<"etc/certs/client-key.pem">>, - title => #{en => <<"SSL Keyfile">>, - zh => <<"SSL 密钥文件"/utf8>>}, - description => #{en => <<"The file path of the client keyfile">>, - zh => <<"客户端密钥路径"/utf8>>} - }, - ciphers => #{ - order => 18, - type => string, - required => false, - default => <<"ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,", - "ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,", - "ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,", - "ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,", - "AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,", - "ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,", - "ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,", - "DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,", - "ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,", - "ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,", - "DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA">>, - title => #{en => <<"SSL Ciphers">>, - zh => <<"SSL 加密算法"/utf8>>}, - description => #{en => <<"SSL Ciphers">>, - zh => <<"SSL 加密算法"/utf8>>} - } - }). - --define(RESOURCE_CONFIG_SPEC_RPC, #{ - address => #{ - order => 1, - type => string, - required => true, - default => <<"emqx2@127.0.0.1">>, - title => #{en => <<"EMQ X Node Name">>, - zh => <<"EMQ X 节点名称"/utf8>>}, - description => #{en => <<"EMQ X Remote Node Name">>, - zh => <<"远程 EMQ X 节点名称 "/utf8>>} - }, - mountpoint => #{ - order => 2, - type => string, - required => false, - default => <<"bridge/emqx/${node}/">>, - title => #{en => <<"Bridge MountPoint">>, - zh => <<"桥接挂载点"/utf8>>}, - description => #{en => <<"MountPoint for bridge topic
" - "Example: The topic of messages sent to `topic1` on local node " - "will be transformed to `bridge/aws/${node}/topic1`">>, - zh => <<"桥接主题的挂载点
" - "示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题" - "会变换为 `bridge/aws/${node}/topic1`"/utf8>>} - }, - pool_size => #{ - order => 3, - type => number, - required => true, - default => 8, - title => #{en => <<"Pool Size">>, - zh => <<"连接池大小"/utf8>>}, - description => #{en => <<"MQTT/RPC Connection Pool Size">>, - zh => <<"连接池大小"/utf8>>} - }, - reconnect_interval => #{ - order => 4, - type => string, - required => false, - default => <<"30s">>, - title => #{en => <<"Reconnect Interval">>, - zh => <<"重连间隔"/utf8>>}, - description => #{en => <<"Reconnect Interval of bridge">>, - zh => <<"重连间隔"/utf8>>} - }, - batch_size => #{ - order => 5, - type => number, - required => false, - default => 32, - title => #{en => <<"Batch Size">>, - zh => <<"批处理大小"/utf8>>}, - description => #{en => <<"Batch Size">>, - zh => <<"批处理大小"/utf8>>} - }, - disk_cache => #{ - order => 6, - type => boolean, - required => false, - default => false, - title => #{en => <<"Disk Cache">>, - zh => <<"磁盘缓存"/utf8>>}, - description => #{en => <<"The flag which determines whether messages " - "can be cached on local disk when bridge is " - "disconnected">>, - zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁" - "盘队列上"/utf8>>} - } - }). - --define(ACTION_PARAM_RESOURCE, #{ - type => string, - required => true, - title => #{en => <<"Resource ID">>, zh => <<"资源 ID"/utf8>>}, - description => #{en => <<"Bind a resource to this action">>, - zh => <<"给动作绑定一个资源"/utf8>>} - }). - --resource_type(#{ - name => ?RESOURCE_TYPE_MQTT, - create => on_resource_create, - status => on_get_resource_status, - destroy => on_resource_destroy, - params => ?RESOURCE_CONFIG_SPEC_MQTT, - title => #{en => <<"MQTT Bridge">>, zh => <<"MQTT Bridge"/utf8>>}, - description => #{en => <<"MQTT Message Bridge">>, zh => <<"MQTT 消息桥接"/utf8>>} - }). - - --resource_type(#{ - name => ?RESOURCE_TYPE_RPC, - create => on_resource_create, - status => on_get_resource_status, - destroy => on_resource_destroy, - params => ?RESOURCE_CONFIG_SPEC_RPC, - title => #{en => <<"EMQX Bridge">>, zh => <<"EMQX Bridge"/utf8>>}, - description => #{en => <<"EMQ X RPC Bridge">>, zh => <<"EMQ X RPC 消息桥接"/utf8>>} - }). - --rule_action(#{ - name => data_to_mqtt_broker, - category => data_forward, - for => 'message.publish', - types => [?RESOURCE_TYPE_MQTT, ?RESOURCE_TYPE_RPC], - create => on_action_create_data_to_mqtt_broker, - params => #{'$resource' => ?ACTION_PARAM_RESOURCE, - forward_topic => #{ - order => 1, - type => string, - required => false, - default => <<"">>, - title => #{en => <<"Forward Topic">>, - zh => <<"转发消息主题"/utf8>>}, - description => #{en => <<"The topic used when forwarding the message. " - "Defaults to the topic of the bridge message if not provided.">>, - zh => <<"转发消息时使用的主题。如果未提供,则默认为桥接消息的主题。"/utf8>>} - }, - payload_tmpl => #{ - order => 2, - type => string, - input => textarea, - required => false, - default => <<"">>, - title => #{en => <<"Payload Template">>, - zh => <<"消息内容模板"/utf8>>}, - description => #{en => <<"The payload template, variable interpolation is supported. " - "If using empty template (default), then the payload will be " - "all the available vars in JSON format">>, - zh => <<"消息内容模板,支持变量。" - "若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>} - } - }, - title => #{en => <<"Data bridge to MQTT Broker">>, - zh => <<"桥接数据到 MQTT Broker"/utf8>>}, - description => #{en => <<"Bridge Data to MQTT Broker">>, - zh => <<"桥接数据到 MQTT Broker"/utf8>>} - }). - -on_resource_create(ResId, Params) -> - ?LOG(info, "Initiating Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]), - {ok, _} = application:ensure_all_started(ecpool), - PoolName = pool_name(ResId), - Options = options(Params, PoolName, ResId), - start_resource(ResId, PoolName, Options), - case test_resource_status(PoolName) of - true -> ok; - false -> - on_resource_destroy(ResId, #{<<"pool">> => PoolName}), - error({{?RESOURCE_TYPE_MQTT, ResId}, connection_failed}) - end, - #{<<"pool">> => PoolName}. - -start_resource(ResId, PoolName, Options) -> - case ecpool:start_sup_pool(PoolName, ?MODULE, Options) of - {ok, _} -> - ?LOG(info, "Initiated Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); - {error, {already_started, _Pid}} -> - on_resource_destroy(ResId, #{<<"pool">> => PoolName}), - start_resource(ResId, PoolName, Options); - {error, Reason} -> - ?LOG(error, "Initiate Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), - on_resource_destroy(ResId, #{<<"pool">> => PoolName}), - error({{?RESOURCE_TYPE_MQTT, ResId}, create_failed}) - end. - -test_resource_status(PoolName) -> - IsConnected = fun(Worker) -> - case ecpool_worker:client(Worker) of - {ok, Bridge} -> - try emqx_connector_mqtt_worker:status(Bridge) of - connected -> true; - _ -> false - catch _Error:_Reason -> - false - end; - {error, _} -> - false - end - end, - Status = [IsConnected(Worker) || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - lists:any(fun(St) -> St =:= true end, Status). - --spec(on_get_resource_status(ResId::binary(), Params::map()) -> Status::map()). -on_get_resource_status(_ResId, #{<<"pool">> := PoolName}) -> - IsAlive = test_resource_status(PoolName), - #{is_alive => IsAlive}. - -on_resource_destroy(ResId, #{<<"pool">> := PoolName}) -> - ?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]), - case ecpool:stop_sup_pool(PoolName) of - ok -> - ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]); - {error, Reason} -> - ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), - error({{?RESOURCE_TYPE_MQTT, ResId}, destroy_failed}) - end. - -on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName, - <<"forward_topic">> := ForwardTopic, - <<"payload_tmpl">> := PayloadTmpl}) -> - ?LOG(info, "Initiating Action ~p.", [?FUNCTION_NAME]), - PayloadTks = emqx_plugin_libs_rule:preproc_tmpl(PayloadTmpl), - TopicTks = case ForwardTopic == <<"">> of - true -> undefined; - false -> emqx_plugin_libs_rule:preproc_tmpl(ForwardTopic) - end, - Opts. - -on_action_data_to_mqtt_broker(Msg, _Env = - #{id := Id, clientid := From, flags := Flags, - topic := Topic, timestamp := TimeStamp, qos := QoS, - ?BINDING_KEYS := #{ - 'ActId' := ActId, - 'PoolName' := PoolName, - 'TopicTks' := TopicTks, - 'PayloadTks' := PayloadTks - }}) -> - Topic1 = case TopicTks =:= undefined of - true -> Topic; - false -> emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg) - end, - BrokerMsg = #message{id = Id, - qos = QoS, - from = From, - flags = Flags, - topic = Topic1, - payload = format_data(PayloadTks, Msg), - timestamp = TimeStamp}, - ecpool:with_client(PoolName, - fun(BridgePid) -> - BridgePid ! {deliver, rule_engine, BrokerMsg} - end), - emqx_rule_metrics:inc_actions_success(ActId). - -format_data([], Msg) -> - emqx_json:encode(Msg); - -format_data(Tokens, Msg) -> - emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg). - -subscriptions(Subscriptions) -> - scan_binary(<<"[", Subscriptions/binary, "].">>). - -is_node_addr(Addr0) -> - Addr = binary_to_list(Addr0), - case string:tokens(Addr, "@") of - [_NodeName, _Hostname] -> true; - _ -> false - end. - -scan_binary(Bin) -> - TermString = binary_to_list(Bin), - scan_string(TermString). - -scan_string(TermString) -> - {ok, Tokens, _} = erl_scan:string(TermString), - {ok, Term} = erl_parse:parse_term(Tokens), - Term. - -connect(Options) when is_list(Options) -> - connect(maps:from_list(Options)); -connect(Options = #{disk_cache := DiskCache, ecpool_worker_id := Id, pool_name := Pool}) -> - Options0 = case DiskCache of - true -> - DataDir = filename:join([emqx:get_config([node, data_dir]), replayq, Pool, integer_to_list(Id)]), - QueueOption = #{replayq_dir => DataDir}, - Options#{queue => QueueOption}; - false -> - Options - end, - Options1 = case maps:is_key(append, Options0) of - false -> Options0; - true -> - case maps:get(append, Options0, false) of - true -> - ClientId = lists:concat([str(maps:get(clientid, Options0)), "_", str(emqx_guid:to_hexstr(emqx_guid:gen()))]), - Options0#{clientid => ClientId}; - false -> - Options0 - end - end, - Options2 = maps:without([ecpool_worker_id, pool_name, append], Options1), - emqx_connector_mqtt_worker:start_link(Options2#{name => name(Pool, Id)}). -name(Pool, Id) -> - list_to_atom(atom_to_list(Pool) ++ ":" ++ integer_to_list(Id)). -pool_name(ResId) -> - list_to_atom("bridge_mqtt:" ++ str(ResId)). - -options(Options, PoolName, ResId) -> - GetD = fun(Key, Default) -> maps:get(Key, Options, Default) end, - Get = fun(Key) -> GetD(Key, undefined) end, - Address = Get(<<"address">>), - [{max_inflight_batches, 32}, - {forward_mountpoint, str(Get(<<"mountpoint">>))}, - {disk_cache, GetD(<<"disk_cache">>, false)}, - {start_type, auto}, - {reconnect_delay_ms, hocon_postprocess:duration(str(Get(<<"reconnect_interval">>)))}, - {if_record_metrics, false}, - {pool_size, GetD(<<"pool_size">>, 1)}, - {pool_name, PoolName} - ] ++ case is_node_addr(Address) of - true -> - [{address, binary_to_atom(Get(<<"address">>), utf8)}, - {connect_module, emqx_bridge_rpc}, - {batch_size, Get(<<"batch_size">>)}]; - false -> - [{address, binary_to_list(Address)}, - {bridge_mode, GetD(<<"bridge_mode">>, true)}, - {clean_start, true}, - {clientid, str(Get(<<"clientid">>))}, - {append, Get(<<"append">>)}, - {connect_module, emqx_bridge_mqtt}, - {keepalive, hocon_postprocess:duration(str(Get(<<"keepalive">>))) div 1000}, - {username, str(Get(<<"username">>))}, - {password, str(Get(<<"password">>))}, - {proto_ver, mqtt_ver(Get(<<"proto_ver">>))}, - {retry_interval, hocon_postprocess:duration(str(GetD(<<"retry_interval">>, "30s"))) div 1000} - | maybe_ssl(Options, Get(<<"ssl">>), ResId)] - end. - -maybe_ssl(_Options, false, _ResId) -> - []; -maybe_ssl(Options, true, ResId) -> - [{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Options, "rules", ResId)}]. - -mqtt_ver(ProtoVer) -> - case ProtoVer of - <<"mqttv3">> -> v3; - <<"mqttv4">> -> v4; - <<"mqttv5">> -> v5; - _ -> v4 - end. diff --git a/apps/emqx_rule_actions/src/emqx_rule_actions.app.src b/apps/emqx_rule_actions/src/emqx_rule_actions.app.src deleted file mode 100644 index 8c2b8d247..000000000 --- a/apps/emqx_rule_actions/src/emqx_rule_actions.app.src +++ /dev/null @@ -1,12 +0,0 @@ -%% -*- mode: erlang -*- -{application, emqx_rule_actions, - [{description, "Rule actions"}, - {vsn, "5.0.0"}, - {registered, []}, - {applications, - [kernel,stdlib,emqx]}, - {env,[]}, - {modules, []}, - {licenses, ["Apache 2.0"]}, - {links, []} - ]}. diff --git a/apps/emqx_rule_actions/src/emqx_web_hook_actions.erl b/apps/emqx_rule_actions/src/emqx_web_hook_actions.erl deleted file mode 100644 index 1b68ad5b0..000000000 --- a/apps/emqx_rule_actions/src/emqx_web_hook_actions.erl +++ /dev/null @@ -1,379 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% Define the default actions. --module(emqx_web_hook_actions). - --export([ on_resource_create/2 - , on_get_resource_status/2 - , on_resource_destroy/2 - ]). - --export([ on_action_create_data_to_webserver/2 - , on_action_data_to_webserver/2 - ]). - --export_type([action_fun/0]). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). --include_lib("emqx_rule_engine/include/rule_actions.hrl"). - --type(action_fun() :: fun((Data :: map(), Envs :: map()) -> Result :: any())). - --type(url() :: binary()). - --define(RESOURCE_TYPE_WEBHOOK, 'web_hook'). --define(RESOURCE_CONFIG_SPEC, #{ - url => #{order => 1, - type => string, - format => url, - required => true, - title => #{en => <<"Request URL">>, - zh => <<"请求 URL"/utf8>>}, - description => #{en => <<"The URL of the server that will receive the Webhook requests.">>, - zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}}, - connect_timeout => #{order => 2, - type => string, - default => <<"5s">>, - title => #{en => <<"Connect Timeout">>, - zh => <<"连接超时时间"/utf8>>}, - description => #{en => <<"Connect Timeout In Seconds">>, - zh => <<"连接超时时间"/utf8>>}}, - request_timeout => #{order => 3, - type => string, - default => <<"5s">>, - title => #{en => <<"Request Timeout">>, - zh => <<"请求超时时间时间"/utf8>>}, - description => #{en => <<"Request Timeout In Seconds">>, - zh => <<"请求超时时间"/utf8>>}}, - pool_size => #{order => 4, - type => number, - default => 8, - title => #{en => <<"Pool Size">>, zh => <<"连接池大小"/utf8>>}, - description => #{en => <<"Connection Pool">>, - zh => <<"连接池大小"/utf8>>} - }, - cacertfile => #{order => 5, - type => file, - default => <<"">>, - title => #{en => <<"CA Certificate File">>, - zh => <<"CA 证书文件"/utf8>>}, - description => #{en => <<"CA Certificate file">>, - zh => <<"CA 证书文件"/utf8>>}}, - keyfile => #{order => 6, - type => file, - default => <<"">>, - title =>#{en => <<"SSL Key">>, - zh => <<"SSL Key"/utf8>>}, - description => #{en => <<"Your ssl keyfile">>, - zh => <<"SSL 私钥"/utf8>>}}, - certfile => #{order => 7, - type => file, - default => <<"">>, - title => #{en => <<"SSL Cert">>, - zh => <<"SSL Cert"/utf8>>}, - description => #{en => <<"Your ssl certfile">>, - zh => <<"SSL 证书"/utf8>>}}, - verify => #{order => 8, - type => boolean, - default => false, - title => #{en => <<"Verify Server Certfile">>, - zh => <<"校验服务器证书"/utf8>>}, - description => #{en => <<"Whether to verify the server certificate. By default, the client will not verify the server's certificate. If verification is required, please set it to true.">>, - zh => <<"是否校验服务器证书。 默认客户端不会去校验服务器的证书,如果需要校验,请设置成true。"/utf8>>}}, - server_name_indication => #{order => 9, - type => string, - title => #{en => <<"Server Name Indication">>, - zh => <<"服务器名称指示"/utf8>>}, - description => #{en => <<"Specify the hostname used for peer certificate verification, or set to disable to turn off this verification.">>, - zh => <<"指定用于对端证书验证时使用的主机名,或者设置为 disable 以关闭此项验证。"/utf8>>}} -}). - --define(ACTION_PARAM_RESOURCE, #{ - order => 0, - type => string, - required => true, - title => #{en => <<"Resource ID">>, - zh => <<"资源 ID"/utf8>>}, - description => #{en => <<"Bind a resource to this action">>, - zh => <<"给动作绑定一个资源"/utf8>>} -}). - --define(ACTION_DATA_SPEC, #{ - '$resource' => ?ACTION_PARAM_RESOURCE, - method => #{ - order => 1, - type => string, - enum => [<<"POST">>, <<"DELETE">>, <<"PUT">>, <<"GET">>], - default => <<"POST">>, - title => #{en => <<"Method">>, - zh => <<"Method"/utf8>>}, - description => #{en => <<"HTTP Method.\n" - "Note that: the Body option in the Action will be discarded in case of GET or DELETE method.">>, - zh => <<"HTTP Method。\n" - "注意:当方法为 GET 或 DELETE 时,动作中的 Body 选项会被忽略。"/utf8>>}}, - path => #{ - order => 2, - type => string, - required => false, - default => <<"">>, - title => #{en => <<"Path">>, - zh => <<"Path"/utf8>>}, - description => #{en => <<"The path part of the URL, support using ${Var} to get the field value output by the rule.">>, - zh => <<"URL 的路径部分,支持使用 ${Var} 获取规则输出的字段值。\n"/utf8>>} - }, - headers => #{ - order => 3, - type => object, - schema => #{}, - default => #{<<"content-type">> => <<"application/json">>}, - title => #{en => <<"Headers">>, - zh => <<"Headers"/utf8>>}, - description => #{en => <<"HTTP headers.">>, - zh => <<"HTTP headers。"/utf8>>}}, - body => #{ - order => 4, - type => string, - input => textarea, - required => false, - default => <<"">>, - title => #{en => <<"Body">>, - zh => <<"Body"/utf8>>}, - description => #{en => <<"The HTTP body supports the use of ${Var} to obtain the field value output by the rule.\n" - "The content of the default HTTP request body is a JSON string composed of the keys and values of all fields output by the rule.">>, - zh => <<"HTTP 请求体,支持使用 ${Var} 获取规则输出的字段值\n" - "默认 HTTP 请求体的内容为规则输出的所有字段的键和值构成的 JSON 字符串。"/utf8>>}} - }). - --resource_type( - #{name => ?RESOURCE_TYPE_WEBHOOK, - create => on_resource_create, - status => on_get_resource_status, - destroy => on_resource_destroy, - params => ?RESOURCE_CONFIG_SPEC, - title => #{en => <<"WebHook">>, - zh => <<"WebHook"/utf8>>}, - description => #{en => <<"WebHook">>, - zh => <<"WebHook"/utf8>>} -}). - --rule_action(#{name => data_to_webserver, - category => data_forward, - for => '$any', - create => on_action_create_data_to_webserver, - params => ?ACTION_DATA_SPEC, - types => [?RESOURCE_TYPE_WEBHOOK], - title => #{en => <<"Data to Web Server">>, - zh => <<"发送数据到 Web 服务"/utf8>>}, - description => #{en => <<"Forward Messages to Web Server">>, - zh => <<"将数据转发给 Web 服务"/utf8>>} -}). - -%%------------------------------------------------------------------------------ -%% Actions for web hook -%%------------------------------------------------------------------------------ - --spec(on_resource_create(binary(), map()) -> map()). -on_resource_create(ResId, Conf) -> - {ok, _} = application:ensure_all_started(ehttpc), - Options = pool_opts(Conf, ResId), - PoolName = pool_name(ResId), - case test_http_connect(Conf) of - true -> ok; - false -> error({error, check_http_connectivity_failed}) - end, - start_resource(ResId, PoolName, Options), - Conf#{<<"pool">> => PoolName, options => Options}. - -start_resource(ResId, PoolName, Options) -> - case ehttpc_pool:start_pool(PoolName, Options) of - {ok, _} -> - ?LOG(info, "Initiated Resource ~p Successfully, ResId: ~p", - [?RESOURCE_TYPE_WEBHOOK, ResId]); - {error, {already_started, _Pid}} -> - on_resource_destroy(ResId, #{<<"pool">> => PoolName}), - start_resource(ResId, PoolName, Options); - {error, Reason} -> - ?LOG(error, "Initiate Resource ~p failed, ResId: ~p, ~0p", - [?RESOURCE_TYPE_WEBHOOK, ResId, Reason]), - error({{?RESOURCE_TYPE_WEBHOOK, ResId}, create_failed}) - end. - --spec(on_get_resource_status(binary(), map()) -> map()). -on_get_resource_status(_ResId, Conf) -> - #{is_alive => test_http_connect(Conf)}. - --spec(on_resource_destroy(binary(), map()) -> ok | {error, Reason::term()}). -on_resource_destroy(ResId, #{<<"pool">> := PoolName}) -> - ?LOG(info, "Destroying Resource ~p, ResId: ~p", [?RESOURCE_TYPE_WEBHOOK, ResId]), - case ehttpc_pool:stop_pool(PoolName) of - ok -> - ?LOG(info, "Destroyed Resource ~p Successfully, ResId: ~p", [?RESOURCE_TYPE_WEBHOOK, ResId]); - {error, Reason} -> - ?LOG(error, "Destroy Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_WEBHOOK, ResId, Reason]), - error({{?RESOURCE_TYPE_WEBHOOK, ResId}, destroy_failed}) - end. - -%% An action that forwards publish messages to a remote web server. --spec(on_action_create_data_to_webserver(Id::binary(), #{url() := string()}) -> {bindings(), NewParams :: map()}). -on_action_create_data_to_webserver(Id, Params) -> - #{method := Method, - path := Path, - headers := Headers, - body := Body, - pool := Pool, - request_timeout := RequestTimeout} = parse_action_params(Params), - BodyTokens = emqx_plugin_libs_rule:preproc_tmpl(Body), - PathTokens = emqx_plugin_libs_rule:preproc_tmpl(Path), - Params. - -on_action_data_to_webserver(Selected, _Envs = - #{?BINDING_KEYS := #{ - 'Id' := Id, - 'Method' := Method, - 'Headers' := Headers, - 'PathTokens' := PathTokens, - 'BodyTokens' := BodyTokens, - 'Pool' := Pool, - 'RequestTimeout' := RequestTimeout}, - clientid := ClientID}) -> - NBody = format_msg(BodyTokens, Selected), - NPath = emqx_plugin_libs_rule:proc_tmpl(PathTokens, Selected), - Req = create_req(Method, NPath, Headers, NBody), - case ehttpc:request(ehttpc_pool:pick_worker(Pool, ClientID), Method, Req, RequestTimeout) of - {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> - emqx_rule_metrics:inc_actions_success(Id); - {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> - emqx_rule_metrics:inc_actions_success(Id); - {ok, StatusCode, _} -> - ?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]), - emqx_rule_metrics:inc_actions_error(Id); - {ok, StatusCode, _, _} -> - ?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]), - emqx_rule_metrics:inc_actions_error(Id); - {error, Reason} -> - ?LOG(error, "[WebHook Action] HTTP request error: ~p", [Reason]), - emqx_rule_metrics:inc_actions_error(Id) - end. - -format_msg([], Data) -> - emqx_json:encode(Data); -format_msg(Tokens, Data) -> - emqx_plugin_libs_rule:proc_tmpl(Tokens, Data). - -%%------------------------------------------------------------------------------ -%% Internal functions -%%------------------------------------------------------------------------------ - -create_req(Method, Path, Headers, _Body) - when Method =:= get orelse Method =:= delete -> - {Path, Headers}; -create_req(_, Path, Headers, Body) -> - {Path, Headers, Body}. - -parse_action_params(Params = #{<<"url">> := URL}) -> - try - {ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL), - Method = method(maps:get(<<"method">>, Params, <<"POST">>)), - Headers = headers(maps:get(<<"headers">>, Params, undefined)), - NHeaders = ensure_content_type_header(Headers, Method), - #{method => Method, - path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)), - headers => NHeaders, - body => maps:get(<<"body">>, Params, <<>>), - request_timeout => hocon_postprocess:duration(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))), - pool => maps:get(<<"pool">>, Params)} - catch _:_ -> - throw({invalid_params, Params}) - end. - -ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =:= put -> - Headers; -ensure_content_type_header(Headers, _Method) -> - lists:keydelete("content-type", 1, Headers). - -merge_path(CommonPath, <<>>) -> - CommonPath; -merge_path(CommonPath, Path0) -> - case emqx_http_lib:uri_parse(Path0) of - {ok, #{path := Path1, 'query' := Query}} -> - Path2 = filename:join(CommonPath, Path1), - <>; - {ok, #{path := Path1}} -> - filename:join(CommonPath, Path1) - end. - -method(GET) when GET == <<"GET">>; GET == <<"get">> -> get; -method(POST) when POST == <<"POST">>; POST == <<"post">> -> post; -method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put; -method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete. - -headers(undefined) -> []; -headers(Headers) when is_map(Headers) -> - headers(maps:to_list(Headers)); -headers(Headers) when is_list(Headers) -> - [{string:to_lower(str(K)), str(V)} || {K, V} <- Headers]. - -str(Str) when is_list(Str) -> Str; -str(Atom) when is_atom(Atom) -> atom_to_list(Atom); -str(Bin) when is_binary(Bin) -> binary_to_list(Bin). - -pool_opts(Params = #{<<"url">> := URL}, ResId) -> - {ok, #{host := Host, - port := Port, - scheme := Scheme}} = emqx_http_lib:uri_parse(URL), - PoolSize = maps:get(<<"pool_size">>, Params, 32), - ConnectTimeout = - hocon_postprocess:duration(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))), - TransportOpts0 = - case Scheme =:= https of - true -> [get_ssl_opts(Params, ResId)]; - false -> [] - end, - TransportOpts = emqx_misc:ipv6_probe(TransportOpts0), - Opts = case Scheme =:= https of - true -> [{transport_opts, TransportOpts}, {transport, ssl}]; - false -> [{transport_opts, TransportOpts}] - end, - [{host, Host}, - {port, Port}, - {pool_size, PoolSize}, - {pool_type, hash}, - {connect_timeout, ConnectTimeout}, - {retry, 5}, - {retry_timeout, 1000} | Opts]. - -pool_name(ResId) -> - list_to_atom("webhook:" ++ str(ResId)). - -get_ssl_opts(Opts, ResId) -> - emqx_plugin_libs_ssl:save_files_return_opts(Opts, "rules", ResId). - -test_http_connect(Conf) -> - Url = fun() -> maps:get(<<"url">>, Conf) end, - try - emqx_plugin_libs_rule:http_connectivity(Url()) - of - ok -> true; - {error, _Reason} -> - ?LOG(error, "check http_connectivity failed: ~p", [Url()]), - false - catch - Err:Reason:ST -> - ?LOG(error, "check http_connectivity failed: ~p, ~0p", [Conf, {Err, Reason, ST}]), - false - end. diff --git a/apps/emqx_rule_engine/docs/api_examples.md b/apps/emqx_rule_engine/docs/api_examples.md deleted file mode 100644 index f546a3a57..000000000 --- a/apps/emqx_rule_engine/docs/api_examples.md +++ /dev/null @@ -1,197 +0,0 @@ -#Rule-Engine-APIs - -## ENVs - -APPSECRET="88ebdd6569afc:Mjg3MzUyNTI2Mjk2NTcyOTEwMDEwMDMzMTE2NTM1MTkzNjA" - -## Rules - -### test sql - -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules?test' -d \ -'{"rawsql":"select * from \"message.publish\" where topic=\"t/a\"","ctx":{}}' - - - -### create - -```shell -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules' -d \ -'{"rawsql":"select * from \"t/a\"","actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule"}' - -{"code":0,"data":{"actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule","enabled":true,"id":"rule:bc987915","rawsql":"select * from \"t/a\""}} - -## with a resource id in the action args -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules' -d \ -'{"rawsql":"select * from \"t/a\"","actions":[{"name":"inspect","params":{"$resource":"resource:3a7b44a1"}}],"description":"test-rule"}' - -{"code":0,"data":{"actions":[{"name":"inspect","params":{"$resource":"resource:3a7b44a1","a":1}}],"description":"test-rule","enabled":true,"id":"rule:6fce0ca9","rawsql":"select * from \"t/a\""}} -``` - -### modify - -```shell -## modify all of the params -$ curl -XPUT -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915' -d \ -'{"rawsql":"select * from \"t/a\"","actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule"}' - -## modify some of the params: disable it -$ curl -XPUT -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915' -d \ -'{"enabled": false}' - -## modify some of the params: add fallback actions -$ curl -XPUT -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915' -d \ -'{"actions":[{"name":"inspect","params":{"a":1}, "fallbacks": [{"name":"donothing"}]}]}' -``` - -### show - -```shell -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915' - -{"code":0,"data":{"actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule","enabled":true,"id":"rule:bc987915","rawsql":"select * from \"t/a\""}} -``` - -### list - -```shell -$ curl -v --basic -u $APPSECRET -k http://localhost:8081/api/v4/rules - -{"code":0,"data":[{"actions":[{"name":"inspect","params":{"a":1}}],"description":"test-rule","enabled":true,"id":"rule:bc987915","rawsql":"select * from \"t/a\""},{"actions":[{"name":"inspect","params":{"$resource":"resource:3a7b44a1","a":1}}],"description":"test-rule","enabled":true,"id":"rule:6fce0ca9","rawsql":"select * from \"t/a\""}]} -``` - -### delete - -```shell -$ curl -XDELETE -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules/rule:bc987915' - -{"code":0} -``` - -## Actions - -### list - -```shell -$ curl -v --basic -u $APPSECRET -k http://localhost:8081/api/v4/actions - -{"code":0,"data":[{"app":"emqx_rule_engine","description":"Republish a MQTT message to a another topic","name":"republish","params":{...},"types":[]},{"app":"emqx_rule_engine","description":"Inspect the details of action params for debug purpose","name":"inspect","params":{},"types":[]},{"app":"emqx_web_hook","description":"Forward Messages to Web Server","name":"data_to_webserver","params":{...},"types":["web_hook"]}]} -``` - -### show - -```shell -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/actions/inspect' - -{"code":0,"data":{"app":"emqx_rule_engine","description":"Debug Action","name":"inspect","params":{"$resource":"built_in"}}} -``` - -## Resource Types - -### list - -```shell -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resource_types' - -{"code":0,"data":[{"description":"Debug resource type","name":"built_in","params":{},"provider":"emqx_rule_engine"}]} -``` - -### list all resources of a type - -```shell -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resource_types/built_in/resources' - -{"code":0,"data":[{"attrs":"undefined","config":{"a":1},"description":"test-rule","id":"resource:71df3086","type":"built_in"}]} -``` - -### show - -```shell -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resource_types/built_in' - -{"code":0,"data":{"description":"Debug resource type","name":"built_in","params":{},"provider":"emqx_rule_engine"}} -``` - - - -## Resources - -### create - -```shell -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources' -d \ -'{"type": "built_in", "config": {"a":1}, "description": "test-resource"}' - -{"code":0,"data":{"attrs":"undefined","config":{"a":1},"description":"test-resource","id":"resource:71df3086","type":"built_in"}} -``` - -### start - -```shell -$ curl -XPOST -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources/resource:71df3086' - -{"code":0} -``` - -### list - -```shell -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources' - -{"code":0,"data":[{"attrs":"undefined","config":{"a":1},"description":"test-resource","id":"resource:71df3086","type":"built_in"}]} -``` - -### show - -```shell -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources/resource:71df3086' - -{"code":0,"data":{"attrs":"undefined","config":{"a":1},"description":"test-resource","id":"resource:71df3086","type":"built_in"}} -``` - -### get resource status - -```shell -curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resource_status/resource:71df3086' - -{"code":0,"data":{"is_alive":true}} -``` - -### delete - -```shell -$ curl -XDELETE -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources/resource:71df3086' - -{"code":0} -``` - -## Rule example using webhook - -``` shell - -$ curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/resources' -d \ -'{"type": "web_hook", "config": {"url": "http://127.0.0.1:9910", "headers": {"token":"axfw34y235wrq234t4ersgw4t"}, "method": "POST"}, "description": "web hook resource-1"}' - -{"code":0,"data":{"attrs":"undefined","config":{"headers":{"token":"axfw34y235wrq234t4ersgw4t"},"method":"POST","url":"http://127.0.0.1:9910"},"description":"web hook resource-1","id":"resource:8531a11f","type":"web_hook"}} - -curl -v --basic -u $APPSECRET -k 'http://localhost:8081/api/v4/rules' -d \ -'{"rawsql":"SELECT clientid as c, username as u.name FROM \"#\"","actions":[{"name":"data_to_webserver","params":{"$resource": "resource:8531a11f"}}],"description":"Forward connected events to webhook"}' - -{"code":0,"data":{"actions":[{"name":"data_to_webserver","params":{"$resource":"resource:8531a11f","headers":{"token":"axfw34y235wrq234t4ersgw4t"},"method":"POST","url":"http://127.0.0.1:9910"}}],"description":"Forward connected events to webhook","enabled":true,"id":"rule:4fe05936","rawsql":"select * from \"#\""}} -``` - -Start a `web server` using `nc`, and then connect to emqx broker using a mqtt client with username = 'Shawn': - -```shell -$ echo -e "HTTP/1.1 200 OK\n\n $(date)" | nc -l 127.0.0.1 9910 - -POST / HTTP/1.1 -content-type: application/json -content-length: 48 -te: -host: 127.0.0.1:9910 -connection: keep-alive -token: axfw34y235wrq234t4ersgw4t - -{"c":"clientId-bP70ymeIyo","u":{"name":"Shawn"} -``` diff --git a/apps/emqx_rule_engine/docs/cli_examples.md b/apps/emqx_rule_engine/docs/cli_examples.md deleted file mode 100644 index 3d854129c..000000000 --- a/apps/emqx_rule_engine/docs/cli_examples.md +++ /dev/null @@ -1,164 +0,0 @@ -#Rule-Engine-CLIs - -## Rules - -### create - -```shell - $ ./bin/emqx_ctl rules create 'SELECT payload FROM "t/#" username="Steven"' '[{"name":"data_to_webserver", "params": {"$resource": "resource:9093f1cb"}}]' --descr="Msg From Steven to WebServer" - -Rule rule:98a75239 created -``` - -### modify - - -```shell - ## update sql, action, description - $ ./bin/emqx_ctl rules update 'rule:98a75239' \ - -s "select * from \"t/a\" " \ - -a '[{"name":"do_nothing", "fallbacks": []' -g continue \ - -d 'Rule for debug2' \ - - ## update sql only - $ ./bin/emqx_ctl rules update 'rule:98a75239' -s 'SELECT * FROM "t/a"' - - ## disable the rule - $ ./bin/emqx_ctl rules update 'rule:98a75239' -e false - -``` - -### show - -```shell -$ ./bin/emqx_ctl rules show rule:98a75239 - -rule(id='rule:98a75239', rawsql='SELECT payload FROM "t/#" username="Steven"', actions=[{"name":"data_to_webserver","params":{"$resource":"resource:9093f1cb","url":"http://host-name/chats"}}], enabled='true', description='Msg From Steven to WebServer') -``` - -### list - -```shell -$ ./bin/emqx_ctl rules list - -rule(id='rule:98a75239', rawsql='SELECT payload FROM "t/#" username="Steven"', actions=[{"name":"data_to_webserver","params":{"$resource":"resource:9093f1cb","url":"http://host-name/chats"}}], enabled='true', description='Msg From Steven to WebServer') - -``` - -### delete - -```shell -$ ./bin/emqx_ctl rules delete 'rule:98a75239' - -ok -``` - -## Actions - -### list - -```shell -$ ./bin/emqx_ctl rule-actions list - -action(name='republish', app='emqx_rule_engine', types=[], params=#{...}, description='Republish a MQTT message to a another topic') -action(name='inspect', app='emqx_rule_engine', types=[], params=#{...}, description='Inspect the details of action params for debug purpose') -action(name='data_to_webserver', app='emqx_web_hook', types=[], params=#{...}, description='Forward Messages to Web Server') -``` - -### show - -```shell -$ ./bin/emqx_ctl rule-actions show 'data_to_webserver' - -action(name='data_to_webserver', app='emqx_web_hook', types=['web_hook'], params=#{...}, description='Forward Messages to Web Server') -``` - -## Resource - -### create - -```shell -$ ./bin/emqx_ctl resources create 'web_hook' -c '{"url": "http://host-name/chats"}' --descr 'Resource towards http://host-name/chats' - -Resource resource:19addfef created -``` - -### list - -```shell -$ ./bin/emqx_ctl resources list - -resource(id='resource:19addfef', type='web_hook', config=#{<<"url">> => <<"http://host-name/chats">>}, attrs=undefined, description='Resource towards http://host-name/chats') - -``` - -### list all resources of a type - -```shell -$ ./bin/emqx_ctl resources list -t 'web_hook' - -resource(id='resource:19addfef', type='web_hook', config=#{<<"url">> => <<"http://host-name/chats">>}, attrs=undefined, description='Resource towards http://host-name/chats') -``` - -### show - -```shell -$ ./bin/emqx_ctl resources show 'resource:19addfef' - -resource(id='resource:19addfef', type='web_hook', config=#{<<"url">> => <<"http://host-name/chats">>}, attrs=undefined, description='Resource towards http://host-name/chats') -``` - -### delete - -```shell -$ ./bin/emqx_ctl resources delete 'resource:19addfef' - -ok -``` - -## Resources Types - -### list - -```shell -$ ./bin/emqx_ctl resource-types list - -resource_type(name='built_in', provider='emqx_rule_engine', params=#{...}, on_create={emqx_rule_actions,on_resource_create}, description='The built in resource type for debug purpose') -resource_type(name='web_hook', provider='emqx_web_hook', params=#{...}, on_create={emqx_web_hook_actions,on_resource_create}, description='WebHook Resource') -``` - -### show - -```shell -$ ./bin/emqx_ctl resource-types show built_in - -resource_type(name='built_in', provider='emqx_rule_engine', params=#{}, description='The built in resource type for debug purpose') -``` - -## Rule example using webhook - -``` shell -1. Create a webhook resource to URL http://127.0.0.1:9910. -./bin/emqx_ctl resources create 'web_hook' --config '{"url": "http://127.0.0.1:9910", "headers": {"token":"axfw34y235wrq234t4ersgw4t"}, "method": "POST"}' -Resource resource:3128243e created - -2. Create a rule using action data_to_webserver, and bind above resource to that action. -./bin/emqx_ctl rules create 'client.connected' 'SELECT clientid as c, username as u.name FROM "#"' '[{"name":"data_to_webserver", "params": {"$resource": "resource:3128243e"}}]' --descr "Forward Connected Events to WebServer" -Rule rule:222b59f7 created -``` - -Start a simple `Web Server` using `nc`, and then connect to emqx broker using a mqtt client with username = 'Shawn': - -```shell -$ echo -e "HTTP/1.1 200 OK\n\n $(date)" | nc -l 127.0.0.1 9910 - -POST / HTTP/1.1 -content-type: application/json -content-length: 48 -te: -host: 127.0.0.1:9910 -connection: keep-alive -token: axfw34y235wrq234t4ersgw4t - -{"c":"clientId-bP70ymeIyo","u":{"name":"Shawn"} -``` diff --git a/apps/emqx_rule_engine/docs/design.md b/apps/emqx_rule_engine/docs/design.md deleted file mode 100644 index 3e2c60c41..000000000 --- a/apps/emqx_rule_engine/docs/design.md +++ /dev/null @@ -1,188 +0,0 @@ - -# EMQ X Rule Engine - -This is the design guide of message routing rule engine for the EMQ X Broker. - -## Concept - -A rule is: - -``` -when - Match | -then - Select and Take ; -``` - -or: - -``` -rule "Rule Name" - when - rule match - select - para1 = val1 - para2 = val2 - then - action(#{para2 => val1, #para2 => val2}) -``` - -## Architecture - -``` - |-----------------| - P ---->| Message Routing |----> S - |-----------------| - | /|\ - \|/ | - |-----------------| - | Rule Engine | - |-----------------| - | | - Backends Services Bridges -``` - -## Design - -``` -Event | Message -> Rules -> Actions -> Resources -``` - -``` - P -> |--------------------| |---------------------------------------| - | Messages (Routing) | -> | Rules (Select Data, Match Conditions) | - S <- |--------------------| |---------------------------------------| - |---------| |-----------| |-------------------------------| - ->| Actions | -> | Resources | -> | (Backends, Bridges, WebHooks) | - |---------| |-----------| |-------------------------------| -``` - - - -## Rule - -A rule consists of a SELECT statement, a topic filter, and a rule action - -Rules consist of the following: - -- Id -- Name -- Topic -- Description -- Action -- Enabled - -The operations on a rule: - -- Create -- Enable -- Disable -- Delete - - - -## Action - -Actions consist of the following: - -- Id -- Name -- For -- App -- Module -- Func -- Args -- Descr - -Define a rule action in ADT: - -``` -action :: Application -> Resource -> Params -> IO () -``` - -A rule action: - -Module:function(Args) - - - -## Resource - -### Resource Name - -``` -backend:mysql:localhost:port:db -backend:redis:localhost: -webhook:url -bridge:kafka: -bridge:rabbit:localhost -``` - -### Resource Properties - -- Name -- Descr or Description -- Config #{} -- Instances -- State: Running | Stopped - -### Resource Management - -1. Create Resource -2. List Resources -3. Lookup Resource -4. Delete Resource -5. Test Resource - -### Resource State (Lifecircle) - -0. Create Resource and Validate a Resource -1. Start/Connect Resource -2. Bind resource name to instance -3. Stop/Disconnect Resource -4. Unbind resource name with instance -5. Is Resource Alive? - -### Resource Type - -The properties and behaviors of resources is defined by resource types. A resoure type is provided(contributed) by a plugin. - -### Resource Type Provider - -Provider of resource type is a EMQ X Plugin. - -### Resource Manager - -``` - Supervisor - | - \|/ -Action ----> Proxy(Batch|Write) ----> Connection -----> ExternalResource - | /|\ - |------------------Fetch----------------| -``` - - - -## REST API - -Rules API -Actions API -Resources API - -## CLI - -``` -rules list -rules show - -rule-actions list -rule-actions show - -resources list -resources show - -resource_templates list -resource_templates show -``` - diff --git a/apps/emqx_rule_engine/include/rule_actions.hrl b/apps/emqx_rule_engine/include/rule_actions.hrl deleted file mode 100644 index e432c4399..000000000 --- a/apps/emqx_rule_engine/include/rule_actions.hrl +++ /dev/null @@ -1,11 +0,0 @@ --compile({parse_transform, emqx_rule_actions_trans}). - --type selected_data() :: map(). --type env_vars() :: map(). --type bindings() :: list({atom(), term()}). - --define(BINDING_KEYS, '__bindings__'). - --define(bound_v(Key, ENVS0), - maps:get(Key, - maps:get(?BINDING_KEYS, ENVS0, #{}))). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 052f916b3..6b3b3061f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -26,5 +26,4 @@ start(_Type, _Args) -> emqx_rule_engine_sup:start_link(). stop(_State) -> - ok = emqx_rule_events:unload(), - ok = emqx_rule_engine_cli:unload(). + ok = emqx_rule_events:unload(). diff --git a/rebar.config.erl b/rebar.config.erl index 4888fafe0..a5386df6b 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -272,7 +272,6 @@ relx_apps(ReleaseType) -> , emqx_exhook , emqx_bridge , emqx_rule_engine - , emqx_rule_actions , emqx_modules , emqx_management , emqx_dashboard