From 90ba2977fe30b307aa10f70b33dfda434828a44d Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 7 Feb 2024 21:33:11 +0800 Subject: [PATCH] feat: don't publish mqtt message in rabbitmq's source --- .../src/emqx_bridge_rabbitmq_connector.erl | 16 +- .../emqx_bridge_rabbitmq_pubsub_schema.erl | 44 +- .../emqx_bridge_rabbitmq_source_worker.erl | 75 ++-- .../test/emqx_bridge_rabbitmq_v2_SUITE.erl | 382 ++++++++++++++++++ .../emqx_bridge_rabbitmq_pubsub_schema.hocon | 15 - 5 files changed, 438 insertions(+), 94 deletions(-) create mode 100644 apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index f7a1e533f..134ba15b6 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -372,21 +372,7 @@ preproc_parameter(#{config_root := actions, parameters := Parameter}) -> config_root => actions }; preproc_parameter(#{config_root := sources, parameters := Parameter, hookpoints := Hooks}) -> - #{ - payload_template := PayloadTmpl, - qos := QosTmpl, - topic := TopicTmpl - } = Parameter, - Parameter#{ - payload_template => emqx_placeholder:preproc_tmpl(PayloadTmpl), - qos => preproc_qos(QosTmpl), - topic => emqx_placeholder:preproc_tmpl(TopicTmpl), - hookpoints => Hooks, - config_root => sources - }. - -preproc_qos(Qos) when is_integer(Qos) -> Qos; -preproc_qos(Qos) -> emqx_placeholder:preproc_tmpl(Qos). + Parameter#{hookpoints => Hooks, config_root => sources}. delivery_mode(non_persistent) -> 1; delivery_mode(persistent) -> 2. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl index 34c945937..81230ee3e 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl @@ -126,39 +126,6 @@ fields(subscriber_source) -> ); fields(source_parameters) -> [ - {wait_for_publish_confirmations, - hoconsc:mk( - boolean(), - #{ - default => true, - desc => ?DESC(?CONNECTOR_SCHEMA, "wait_for_publish_confirmations") - } - )}, - {topic, - ?HOCON( - binary(), - #{ - required => true, - validator => fun emqx_schema:non_empty_string/1, - desc => ?DESC("source_topic") - } - )}, - {qos, - ?HOCON( - ?UNION([emqx_schema:qos(), binary()]), - #{ - default => 0, - desc => ?DESC("source_qos") - } - )}, - {payload_template, - ?HOCON( - binary(), - #{ - required => false, - desc => ?DESC("source_payload_template") - } - )}, {queue, ?HOCON( binary(), @@ -167,6 +134,14 @@ fields(source_parameters) -> desc => ?DESC("source_queue") } )}, + {wait_for_publish_confirmations, + hoconsc:mk( + boolean(), + #{ + default => true, + desc => ?DESC(?CONNECTOR_SCHEMA, "wait_for_publish_confirmations") + } + )}, {no_ack, ?HOCON( boolean(), @@ -260,9 +235,6 @@ source_examples(Method) -> _ConnectorType = rabbitmq, #{ parameters => #{ - topic => <<"${payload.mqtt_topic}">>, - qos => <<"${payload.mqtt_qos}">>, - payload_template => <<"${payload.mqtt_payload}">>, queue => <<"test_queue">>, no_ack => true } diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl index b102faf5d..d0d43641b 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl @@ -37,28 +37,13 @@ handle_cast(_Request, State) -> handle_info( {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{ payload = Payload, - props = #'P_basic'{message_id = MessageId, headers = Headers} + props = PBasic }}, {Channel, InstanceId, Params} = State ) -> - #{ - hookpoints := Hooks, - payload_template := PayloadTmpl, - qos := QoSTmpl, - topic := TopicTmpl, - no_ack := NoAck - } = Params, - MQTTMsg = emqx_message:make( - make_message_id(MessageId), - InstanceId, - render(Payload, QoSTmpl), - render(Payload, TopicTmpl), - render(Payload, PayloadTmpl), - #{}, - make_headers(Headers) - ), - _ = emqx:publish(MQTTMsg), - lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [MQTTMsg]) end, Hooks), + Message = to_map(PBasic, Payload), + #{hookpoints := Hooks, no_ack := NoAck} = Params, + lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [Message]) end, Hooks), (NoAck =:= false) andalso amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}), emqx_resource_metrics:received_inc(InstanceId), @@ -68,18 +53,52 @@ handle_info(#'basic.cancel_ok'{}, State) -> handle_info(_Info, State) -> {noreply, State}. +to_map(PBasic, Payload) -> + #'P_basic'{ + content_type = ContentType, + content_encoding = ContentEncoding, + headers = Headers, + delivery_mode = DeliveryMode, + priority = Priority, + correlation_id = CorrelationId, + reply_to = ReplyTo, + expiration = Expiration, + message_id = MessageId, + timestamp = Timestamp, + type = Type, + user_id = UserId, + app_id = AppId, + cluster_id = ClusterId + } = PBasic, + Message = #{ + <<"payload">> => make_payload(Payload), + <<"content_type">> => ContentType, + <<"content_encoding">> => ContentEncoding, + <<"headers">> => make_headers(Headers), + <<"delivery_mode">> => DeliveryMode, + <<"priority">> => Priority, + <<"correlation_id">> => CorrelationId, + <<"reply_to">> => ReplyTo, + <<"expiration">> => Expiration, + <<"message_id">> => MessageId, + <<"timestamp">> => Timestamp, + <<"type">> => Type, + <<"user_id">> => UserId, + <<"app_id">> => AppId, + <<"cluster_id">> => ClusterId + }, + maps:filtermap(fun(_K, V) -> V =/= undefined andalso V =/= <<"undefined">> end, Message). + terminate(_Reason, _State) -> ok. -render(_Message, QoS) when is_integer(QoS) -> QoS; -render(Message, PayloadTmpl) -> - Opts = #{return => full_binary}, - emqx_placeholder:proc_tmpl(PayloadTmpl, Message, Opts). - -make_message_id(undefined) -> emqx_guid:gen(); -make_message_id(Id) -> Id. - make_headers(undefined) -> - #{}; + undefined; make_headers(Headers) when is_list(Headers) -> maps:from_list([{Key, Value} || {Key, _Type, Value} <- Headers]). + +make_payload(Payload) -> + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {ok, Map} -> Map; + {error, _} -> Payload + end. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl new file mode 100644 index 000000000..5a17c2914 --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl @@ -0,0 +1,382 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_rabbitmq_v2_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-define(TYPE, <<"rabbitmq">>). +-import(emqx_common_test_helpers, [on_exit/1]). + +rabbit_mq_host() -> + <<"rabbitmq">>. + +rabbit_mq_port() -> + 5672. + +rabbit_mq_exchange() -> + <<"messages">>. + +rabbit_mq_queue() -> + <<"test_queue">>. + +rabbit_mq_routing_key() -> + <<"test_routing_key">>. + +get_channel_connection(Config) -> + proplists:get_value(channel_connection, Config). + +get_rabbitmq(Config) -> + proplists:get_value(rabbitmq, Config). + +%%------------------------------------------------------------------------------ +%% Common Test Setup, Tear down and Testcase List +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"), + RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")), + case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of + true -> + Config1 = common_init(#{ + host => RabbitMQHost, port => RabbitMQPort + }), + Name = atom_to_binary(?MODULE), + Config2 = [{connector, Name} | Config1 ++ Config], + create_connector(Name, get_rabbitmq(Config2)), + Config2; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_rabbitmq); + _ -> + {skip, no_rabbitmq} + end + end. + +common_init(Opts) -> + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([ + emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine + ]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(amqp_client), + emqx_mgmt_api_test_util:init_suite(), + #{host := Host, port := Port} = Opts, + ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port), + [ + {channel_connection, ChannelConnection}, + {rabbitmq, #{server => Host, port => Port}} + ]. + +setup_rabbit_mq_exchange_and_queue(Host, Port) -> + %% Create an exchange and a queue + {ok, Connection} = + amqp_connection:start(#amqp_params_network{ + host = Host, + port = Port, + ssl_options = none + }), + {ok, Channel} = amqp_connection:open_channel(Connection), + %% Create an exchange + #'exchange.declare_ok'{} = + amqp_channel:call( + Channel, + #'exchange.declare'{ + exchange = rabbit_mq_exchange(), + type = <<"topic">> + } + ), + %% Create a queue + #'queue.declare_ok'{} = + amqp_channel:call( + Channel, + #'queue.declare'{queue = rabbit_mq_queue()} + ), + %% Bind the queue to the exchange + #'queue.bind_ok'{} = + amqp_channel:call( + Channel, + #'queue.bind'{ + queue = rabbit_mq_queue(), + exchange = rabbit_mq_exchange(), + routing_key = rabbit_mq_routing_key() + } + ), + #{ + connection => Connection, + channel => Channel + }. + +end_per_suite(Config) -> + delete_connector(proplists:get_value(connector, Config)), + #{ + connection := Connection, + channel := Channel + } = get_channel_connection(Config), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector), + _ = application:stop(emqx_bridge), + %% Close the channel + ok = amqp_channel:close(Channel), + %% Close the connection + ok = amqp_connection:close(Connection). + +rabbitmq_connector(Config) -> + Name = atom_to_binary(?MODULE), + Server = maps:get(server, Config, rabbit_mq_host()), + Port = maps:get(port, Config, rabbit_mq_port()), + ConfigStr = + io_lib:format( + "connectors.rabbitmq.~s {\n" + " enable = true\n" + " ssl = {enable = false}\n" + " server = \"~s\"\n" + " port = ~p\n" + " username = \"guest\"\n" + " password = \"guest\"\n" + "}\n", + [ + Name, + Server, + Port + ] + ), + ct:pal(ConfigStr), + parse_and_check(<<"connectors">>, emqx_connector_schema, ConfigStr, <<"rabbitmq">>, Name). + +rabbitmq_source() -> + Name = atom_to_binary(?MODULE), + ConfigStr = + io_lib:format( + "sources.rabbitmq.~s {\n" + "connector = ~s\n" + "enable = true\n" + "parameters {\n" + "no_ack = true\n" + "queue = ~s\n" + "wait_for_publish_confirmations = true\n" + "}}\n", + [ + Name, + Name, + rabbit_mq_queue() + ] + ), + ct:pal(ConfigStr), + parse_and_check(<<"sources">>, emqx_bridge_v2_schema, ConfigStr, <<"rabbitmq">>, Name). + +rabbitmq_action() -> + Name = atom_to_binary(?MODULE), + ConfigStr = + io_lib:format( + "actions.rabbitmq.~s {\n" + "connector = ~s\n" + "enable = true\n" + "parameters {\n" + "exchange: ~s\n" + "payload_template: \"${.payload}\"\n" + "routing_key: ~s\n" + "delivery_mode: non_persistent\n" + "publish_confirmation_timeout: 30s\n" + "wait_for_publish_confirmations = true\n" + "}}\n", + [ + Name, + Name, + rabbit_mq_exchange(), + rabbit_mq_routing_key() + ] + ), + ct:pal(ConfigStr), + parse_and_check(<<"actions">>, emqx_bridge_v2_schema, ConfigStr, <<"rabbitmq">>, Name). + +parse_and_check(Key, Mod, ConfigStr, Type, Name) -> + {ok, RawConf} = hocon:binary(ConfigStr, #{format => map}), + hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}), + #{Key := #{Type := #{Name := RetConfig}}} = RawConf, + RetConfig. + +create_connector(Name, Config) -> + Connector = rabbitmq_connector(Config), + {ok, _} = emqx_connector:create(?TYPE, Name, Connector). + +delete_connector(Name) -> + ok = emqx_connector:remove(?TYPE, Name). + +create_source(Name) -> + Source = rabbitmq_source(), + {ok, _} = emqx_bridge_v2:create(sources, ?TYPE, Name, Source). + +delete_source(Name) -> + ok = emqx_bridge_v2:remove(sources, ?TYPE, Name). + +create_action(Name) -> + Action = rabbitmq_action(), + {ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action). + +delete_action(Name) -> + ok = emqx_bridge_v2:remove(actions, ?TYPE, Name). + +%%------------------------------------------------------------------------------ +%% Test Cases +%%------------------------------------------------------------------------------ + +t_source(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_source(Name), + Sources = emqx_bridge_v2:list(sources), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Sources), Sources), + Topic = <<"tesldkafd">>, + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"$bridges/rabbitmq:", Name/binary, "\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [ + #{ + args => #{ + topic => Topic, + mqtt_properties => #{}, + payload => <<"${payload}">>, + qos => 0, + retain => false, + user_properties => [] + }, + function => republish + } + ], + description => <<"bridge_v2 republish rule">> + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + {ok, C1} = emqtt:start_link([{clean_start, true}]), + {ok, _} = emqtt:connect(C1), + {ok, #{}, [0]} = emqtt:subscribe(C1, Topic, [{qos, 0}, {rh, 0}]), + send_test_message_to_rabbitmq(Config), + PayloadBin = emqx_utils_json:encode(payload()), + ?assertMatch( + [ + #{ + dup := false, + properties := undefined, + topic := Topic, + qos := 0, + payload := PayloadBin, + retain := false + } + ], + receive_messages(1) + ), + ok = emqtt:disconnect(C1), + ok = delete_source(Name), + SourcesAfterDelete = emqx_bridge_v2:list(sources), + ?assertNot(lists:any(Any, SourcesAfterDelete), SourcesAfterDelete), + ok. + +t_action(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_action(Name), + Actions = emqx_bridge_v2:list(actions), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Actions), Actions), + Topic = <<"lkadfdaction">>, + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"", Topic/binary, "\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [<<"rabbitmq:", Name/binary>>], + description => <<"bridge_v2 send msg to rabbitmq action">> + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + {ok, C1} = emqtt:start_link([{clean_start, true}]), + {ok, _} = emqtt:connect(C1), + Payload = payload(), + PayloadBin = emqx_utils_json:encode(Payload), + {ok, _} = emqtt:publish(C1, Topic, #{}, PayloadBin, [{qos, 1}, {retain, false}]), + Msg = receive_test_message_from_rabbitmq(Config), + ?assertMatch(Payload, Msg), + ok = emqtt:disconnect(C1), + ok = delete_action(Name), + ActionsAfterDelete = emqx_bridge_v2:list(actions), + ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), + ok. + +receive_messages(Count) -> + receive_messages(Count, []). +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + ct:log("Msg: ~p ~n", [Msg]), + receive_messages(Count - 1, [Msg | Msgs]); + Other -> + ct:log("Other Msg: ~p~n", [Other]), + receive_messages(Count, Msgs) + after 2000 -> + Msgs + end. + +payload() -> + #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}. + +send_test_message_to_rabbitmq(Config) -> + #{channel := Channel} = get_channel_connection(Config), + MessageProperties = #'P_basic'{ + headers = [], + delivery_mode = 1 + }, + Method = #'basic.publish'{ + exchange = rabbit_mq_exchange(), + routing_key = rabbit_mq_routing_key() + }, + amqp_channel:cast( + Channel, + Method, + #amqp_msg{ + payload = emqx_utils_json:encode(payload()), + props = MessageProperties + } + ), + ok. + +receive_test_message_from_rabbitmq(Config) -> + #{channel := Channel} = get_channel_connection(Config), + #'basic.consume_ok'{consumer_tag = ConsumerTag} = + amqp_channel:call( + Channel, + #'basic.consume'{ + queue = rabbit_mq_queue() + } + ), + receive + %% This is the first message received + #'basic.consume_ok'{} -> + ok + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -> + %% Ack the message + amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), + %% Cancel the consumer + #'basic.cancel_ok'{consumer_tag = ConsumerTag} = + amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), + emqx_utils_json:decode(Content#amqp_msg.payload) + after 5000 -> + ?assert(false, "Did not receive message within 5 second") + end. diff --git a/rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon b/rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon index a73394386..82f1781e9 100644 --- a/rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon +++ b/rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon @@ -21,21 +21,6 @@ source_parameters.desc: source_parameters.label: """Source Parameters""" -source_topic.desc: -"""Topic used for constructing MQTT messages, supporting templates.""" -source_topic.label: -"""Source Topic""" - -source_qos.desc: -"""The QoS level of the MQTT message, supporting templates.""" -source_qos.label: -"""QoS""" - -source_payload_template.desc: -"""The template used to construct the payload of the MQTT message.""" -source_payload_template.label: -"""Source Payload Template""" - source_queue.desc: """The queue name of the RabbitMQ broker.""" source_queue.label: