From 1cd226c18bd756dc569dda7a37530b89ad2b8ed9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 28 Dec 2021 10:12:39 +0800 Subject: [PATCH 1/4] fix(bridge): make direction defaults to egress if not provided --- apps/emqx_bridge/src/emqx_bridge_http_schema.erl | 1 + apps/emqx_bridge/src/emqx_bridge_schema.erl | 1 + 2 files changed, 2 insertions(+) diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index 43cace332..540a6a070 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -87,6 +87,7 @@ basic_config() -> , {direction, mk(egress, #{ desc => "The direction of this bridge, MUST be egress" + , default => egress })} ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)). diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 3acfbcdef..82fc79ebf 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -71,6 +71,7 @@ metrics_status_fields() -> direction_field(Dir, Desc) -> {direction, mk(Dir, #{ nullable => false + , default => egress , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" ++ Desc })}. From eb992ad2ad15d2b5d70b1b1b4592da342c2f560f Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 28 Dec 2021 11:35:47 +0800 Subject: [PATCH 2/4] fix(bridge): add test cases for sending msgs via http bridge --- .../test/emqx_bridge_api_SUITE.erl | 62 +++++++++++++++---- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 65baf7051..7724d467c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -47,7 +47,7 @@ groups() -> []. suite() -> - [{timetrap,{seconds,30}}]. + [{timetrap,{seconds,60}}]. init_per_suite(Config) -> ok = emqx_config:put([emqx_dashboard], #{ @@ -84,7 +84,7 @@ start_http_server(HandleFun) -> spawn_link(fun() -> {Port, Sock} = listen_on_random_port(), Parent ! {port, Port}, - loop(Sock, HandleFun) + loop(Sock, HandleFun, Parent) end), receive {port, Port} -> Port @@ -95,40 +95,49 @@ start_http_server(HandleFun) -> listen_on_random_port() -> Min = 1024, Max = 65000, Port = rand:uniform(Max - Min) + Min, - case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}]) of + case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}, binary]) of {ok, Sock} -> {Port, Sock}; {error, eaddrinuse} -> listen_on_random_port() end. -loop(Sock, HandleFun) -> +loop(Sock, HandleFun, Parent) -> {ok, Conn} = gen_tcp:accept(Sock), - Handler = spawn(fun () -> HandleFun(Conn) end), + Handler = spawn(fun () -> HandleFun(Conn, Parent) end), gen_tcp:controlling_process(Conn, Handler), - loop(Sock, HandleFun). + loop(Sock, HandleFun, Parent). make_response(CodeStr, Str) -> B = iolist_to_binary(Str), iolist_to_binary( io_lib:fwrite( - "HTTP/1.0 ~s\nContent-Type: text/html\nContent-Length: ~p\n\n~s", + "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s", [CodeStr, size(B), B])). -handle_fun_200_ok(Conn) -> +handle_fun_200_ok(Conn, Parent) -> case gen_tcp:recv(Conn, 0) of - {ok, Request} -> + {ok, ReqStr} -> + ct:pal("the http handler got request: ~p", [ReqStr]), + Req = parse_http_request(ReqStr), + Parent ! {http_server, received, Req}, gen_tcp:send(Conn, make_response("200 OK", "Request OK")), - self() ! {http_server, received, Request}, - handle_fun_200_ok(Conn); + handle_fun_200_ok(Conn, Parent); {error, closed} -> gen_tcp:close(Conn) end. +parse_http_request(ReqStr0) -> + [Method, ReqStr1] = string:split(ReqStr0, " ", leading), + [Path, ReqStr2] = string:split(ReqStr1, " ", leading), + [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), + [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading), + #{method => Method, path => Path, body => Body}. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_http_crud_apis(_) -> - Port = start_http_server(fun handle_fun_200_ok/1), + Port = start_http_server(fun handle_fun_200_ok/2), %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -163,6 +172,20 @@ t_http_crud_apis(_) -> , <<"message">> := <<"bridge already exists">> }, jsx:decode(RetMsg)), + %% send an message to emqx and the message should be forwarded to the HTTP server + Body = <<"my msg">>, + emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), + ?assert( + receive + {http_server, received, #{method := <<"POST">>, path := <<"/path1">>, + body := Body}} -> + true; + Msg -> + ct:pal("error: http got unexpected request: ~p", [Msg]), + false + after 100 -> + false + end), %% update the request-path of the bridge URL2 = ?URL(Port, "path2"), {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]), @@ -201,6 +224,19 @@ t_http_crud_apis(_) -> , <<"url">> := URL2 }, jsx:decode(Bridge3Str)), + %% send an message to emqx again, check the path has been changed + emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), + ?assert( + receive + {http_server, received, #{path := <<"/path2">>}} -> + true; + Msg2 -> + ct:pal("error: http got unexpected request: ~p", [Msg2]), + false + after 100 -> + false + end), + %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -215,7 +251,7 @@ t_http_crud_apis(_) -> ok. t_start_stop_bridges(_) -> - Port = start_http_server(fun handle_fun_200_ok/1), + Port = start_http_server(fun handle_fun_200_ok/2), URL1 = ?URL(Port, "abc"), {ok, 201, Bridge} = request(post, uri(["bridges"]), ?HTTP_BRIDGE(URL1)#{ From 4406589980fe8d7dd4d886fcc0254f98ed54c57f Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 28 Dec 2021 14:12:28 +0800 Subject: [PATCH 3/4] fix(bridge): time unit error for MQTT keepalive --- apps/emqx_connector/src/emqx_connector_mqtt.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index f8d17ce32..bcf558117 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -208,7 +208,7 @@ basic_config(#{ username => User, password => Password, clean_start => CleanStart, - keepalive => KeepAlive, + keepalive => ms_to_s(KeepAlive), retry_interval => RetryIntv, max_inflight => MaxInflight, ssl => EnableSsl, @@ -216,5 +216,8 @@ basic_config(#{ if_record_metrics => true }. +ms_to_s(Ms) -> + erlang:ceil(Ms / 1000). + clientid(Id) -> iolist_to_binary([Id, ":", atom_to_list(node())]). From 07997ab8652e8b3f709b05197a3294693d1b00b8 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 28 Dec 2021 14:13:26 +0800 Subject: [PATCH 4/4] fix(bridge): Bridges should send a JSON message if payload template not set --- apps/emqx_connector/src/emqx_connector_http.erl | 7 ++++++- apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl | 9 +++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 2b9bd48aa..21c06284d 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -266,11 +266,16 @@ process_request(#{ } = Conf, Msg) -> Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)) , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg) - , body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg) + , body => process_request_body(BodyTks, Msg) , headers => maps:to_list(proc_headers(HeadersTks, Msg)) , request_timeout => ReqTimeout }. +process_request_body([], Msg) -> + emqx_json:encode(Msg); +process_request_body(BodyTks, Msg) -> + emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg). + proc_headers(HeaderTks, Msg) -> maps:fold(fun(K, V, Acc) -> Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) => diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index eb483dcc5..1357037ee 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -66,7 +66,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = replace_vars_in_str(PayloadToken, MapMsg), + Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), #mqtt_msg{qos = QoS, @@ -82,13 +82,18 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, #{local_topic := TopicToken, payload := PayloadToken, local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = replace_vars_in_str(PayloadToken, MapMsg), + Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). +process_payload([], Msg) -> + emqx_json:encode(Msg); +process_payload(Tks, Msg) -> + replace_vars_in_str(Tks, Msg). + %% Replace a string contains vars to another string in which the placeholders are replace by the %% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: %% "a: 1".