From a0f8e4f3280dbfb6a260084e175f919824f7ee45 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 8 Feb 2024 12:37:45 +0800 Subject: [PATCH] test: rafator rabbitmq test SUITE, delete redundance code --- .../emqx_bridge_rabbitmq_pubsub_schema.erl | 1 + .../test/emqx_bridge_rabbitmq_SUITE.erl | 423 ------------------ .../emqx_bridge_rabbitmq_connector_SUITE.erl | 156 ++----- .../test/emqx_bridge_rabbitmq_test_utils.erl | 203 +++++++++ .../test/emqx_bridge_rabbitmq_v1_SUITE.erl | 221 +++++++++ .../test/emqx_bridge_rabbitmq_v2_SUITE.erl | 291 ++++-------- 6 files changed, 545 insertions(+), 750 deletions(-) delete mode 100644 apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl create mode 100644 apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl create mode 100644 apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v1_SUITE.erl diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl index 81230ee3e..3fb00632c 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl @@ -101,6 +101,7 @@ fields(action_parameters) -> hoconsc:mk( binary(), #{ + default => <<"">>, desc => ?DESC(?CONNECTOR_SCHEMA, "payload_template") } )} diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl deleted file mode 100644 index 7698608d3..000000000 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl +++ /dev/null @@ -1,423 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_bridge_rabbitmq_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --include_lib("emqx_connector/include/emqx_connector.hrl"). --include_lib("eunit/include/eunit.hrl"). --include_lib("stdlib/include/assert.hrl"). --include_lib("amqp_client/include/amqp_client.hrl"). - -%% See comment in -%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to -%% run this without bringing up the whole CI infrastructure --define(TYPE, <<"rabbitmq">>). - -rabbit_mq_host() -> - <<"rabbitmq">>. - -rabbit_mq_port() -> - 5672. - -rabbit_mq_exchange() -> - <<"messages">>. - -rabbit_mq_queue() -> - <<"test_queue">>. - -rabbit_mq_routing_key() -> - <<"test_routing_key">>. - -get_channel_connection(Config) -> - proplists:get_value(channel_connection, Config). - -get_rabbitmq(Config) -> - proplists:get_value(rabbitmq, Config). - -get_tls(Config) -> - proplists:get_value(tls, Config). - -%%------------------------------------------------------------------------------ -%% Common Test Setup, Tear down and Testcase List -%%------------------------------------------------------------------------------ - -all() -> - [ - {group, tcp}, - {group, tls} - ]. - -groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), - [ - {tcp, AllTCs}, - {tls, AllTCs} - ]. - -init_per_suite(Config) -> - Config. - -end_per_suite(_Config) -> - ok. - -init_per_group(tcp, Config) -> - RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"), - RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")), - case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of - true -> - Config1 = common_init_per_group(#{ - host => RabbitMQHost, port => RabbitMQPort, tls => false - }), - Config1 ++ Config; - false -> - case os:getenv("IS_CI") of - "yes" -> - throw(no_rabbitmq); - _ -> - {skip, no_rabbitmq} - end - end; -init_per_group(tls, Config) -> - RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"), - RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")), - case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of - true -> - Config1 = common_init_per_group(#{ - host => RabbitMQHost, port => RabbitMQPort, tls => true - }), - Config1 ++ Config; - false -> - case os:getenv("IS_CI") of - "yes" -> - throw(no_rabbitmq); - _ -> - {skip, no_rabbitmq} - end - end; -init_per_group(_Group, Config) -> - Config. - -common_init_per_group(Opts) -> - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource]), - {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(amqp_client), - emqx_mgmt_api_test_util:init_suite(), - #{host := Host, port := Port, tls := UseTLS} = Opts, - ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS), - [ - {channel_connection, ChannelConnection}, - {rabbitmq, #{server => Host, port => Port}}, - {tls, UseTLS} - ]. - -setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) -> - SSLOptions = - case UseTLS of - false -> none; - true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS)) - end, - %% Create an exchange and a queue - {ok, Connection} = - amqp_connection:start(#amqp_params_network{ - host = Host, - port = Port, - ssl_options = SSLOptions - }), - {ok, Channel} = amqp_connection:open_channel(Connection), - %% Create an exchange - #'exchange.declare_ok'{} = - amqp_channel:call( - Channel, - #'exchange.declare'{ - exchange = rabbit_mq_exchange(), - type = <<"topic">> - } - ), - %% Create a queue - #'queue.declare_ok'{} = - amqp_channel:call( - Channel, - #'queue.declare'{queue = rabbit_mq_queue()} - ), - %% Bind the queue to the exchange - #'queue.bind_ok'{} = - amqp_channel:call( - Channel, - #'queue.bind'{ - queue = rabbit_mq_queue(), - exchange = rabbit_mq_exchange(), - routing_key = rabbit_mq_routing_key() - } - ), - #{ - connection => Connection, - channel => Channel - }. - -end_per_group(_Group, Config) -> - #{ - connection := Connection, - channel := Channel - } = get_channel_connection(Config), - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), - _ = application:stop(emqx_connector), - _ = application:stop(emqx_bridge), - %% Close the channel - ok = amqp_channel:close(Channel), - %% Close the connection - ok = amqp_connection:close(Connection). - -init_per_testcase(_, Config) -> - Config. - -end_per_testcase(_, _Config) -> - ok. - -rabbitmq_config(UseTLS, Config) -> - BatchSize = maps:get(batch_size, Config, 1), - BatchTime = maps:get(batch_time_ms, Config, 0), - Name = atom_to_binary(?MODULE), - Server = maps:get(server, Config, rabbit_mq_host()), - Port = maps:get(port, Config, rabbit_mq_port()), - Template = maps:get(payload_template, Config, <<"">>), - ConfigString = - io_lib:format( - "bridges.rabbitmq.~s {\n" - " enable = true\n" - " ssl = ~s\n" - " server = \"~s\"\n" - " port = ~p\n" - " username = \"guest\"\n" - " password = \"guest\"\n" - " routing_key = \"~s\"\n" - " exchange = \"~s\"\n" - " payload_template = \"~s\"\n" - " resource_opts = {\n" - " batch_size = ~b\n" - " batch_time = ~bms\n" - " }\n" - "}\n", - [ - Name, - hocon_pp:do(ssl_options(UseTLS), #{embedded => true}), - Server, - Port, - rabbit_mq_routing_key(), - rabbit_mq_exchange(), - Template, - BatchSize, - BatchTime - ] - ), - ct:pal(ConfigString), - parse_and_check(ConfigString, <<"rabbitmq">>, Name). - -ssl_options(true) -> - CertsDir = filename:join([ - emqx_common_test_helpers:proj_root(), - ".ci", - "docker-compose-file", - "certs" - ]), - #{ - enable => true, - cacertfile => filename:join([CertsDir, "ca.crt"]), - certfile => filename:join([CertsDir, "client.pem"]), - keyfile => filename:join([CertsDir, "client.key"]) - }; -ssl_options(false) -> - #{ - enable => false - }. - -parse_and_check(ConfigString, BridgeType, Name) -> - {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), - hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, - RetConfig. - -create_bridge(Name, UseTLS, Config) -> - BridgeConfig = rabbitmq_config(UseTLS, Config), - {ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig), - emqx_bridge_resource:bridge_id(?TYPE, Name). - -delete_bridge(Name) -> - ok = emqx_bridge:remove(?TYPE, Name). - -%%------------------------------------------------------------------------------ -%% Test Cases -%%------------------------------------------------------------------------------ - -t_create_delete_bridge(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - create_bridge(Name, UseTLS, RabbitMQ), - Bridges = emqx_bridge:list(), - Any = fun(#{name := BName}) -> BName =:= Name end, - ?assert(lists:any(Any, Bridges), Bridges), - ok = delete_bridge(Name), - BridgesAfterDelete = emqx_bridge:list(), - ?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete), - ok. - -t_create_delete_bridge_non_existing_server(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - UseTLS = get_tls(Config), - create_bridge(Name, UseTLS, #{server => <<"non_existing_server">>, port => 3174}), - %% Check that the new bridge is in the list of bridges - Bridges = emqx_bridge:list(), - Any = fun(#{name := BName}) -> BName =:= Name end, - ?assert(lists:any(Any, Bridges)), - ok = delete_bridge(Name), - BridgesAfterDelete = emqx_bridge:list(), - ?assertNot(lists:any(Any, BridgesAfterDelete)), - ok. - -t_send_message_query(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{batch_size => 1}), - Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, - %% This will use the SQL template included in the bridge - emqx_bridge:send_message(BridgeID, Payload), - %% Check that the data got to the database - ?assertEqual(Payload, receive_simple_test_message(Config)), - ok = delete_bridge(Name), - ok. - -t_send_message_query_with_template(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - BridgeID = create_bridge(Name, UseTLS, RabbitMQ#{ - batch_size => 1, - payload_template => - << - "{" - " \\\"key\\\": ${key}," - " \\\"data\\\": \\\"${data}\\\"," - " \\\"timestamp\\\": ${timestamp}," - " \\\"secret\\\": 42" - "}" - >> - }), - Payload = #{ - <<"key">> => 7, - <<"data">> => <<"RabbitMQ">>, - <<"timestamp">> => 10000 - }, - emqx_bridge:send_message(BridgeID, Payload), - %% Check that the data got to the database - ExpectedResult = Payload#{ - <<"secret">> => 42 - }, - ?assertEqual(ExpectedResult, receive_simple_test_message(Config)), - ok = delete_bridge(Name), - ok. - -t_send_simple_batch(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - BridgeConf = RabbitMQ#{batch_size => 100}, - UseTLS = get_tls(Config), - BridgeID = create_bridge(Name, UseTLS, BridgeConf), - Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, - emqx_bridge:send_message(BridgeID, Payload), - ?assertEqual(Payload, receive_simple_test_message(Config)), - ok = delete_bridge(Name), - ok. - -t_send_simple_batch_with_template(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - BridgeConf = - RabbitMQ#{ - batch_size => 100, - payload_template => - << - "{" - " \\\"key\\\": ${key}," - " \\\"data\\\": \\\"${data}\\\"," - " \\\"timestamp\\\": ${timestamp}," - " \\\"secret\\\": 42" - "}" - >> - }, - BridgeID = create_bridge(Name, UseTLS, BridgeConf), - Payload = #{ - <<"key">> => 7, - <<"data">> => <<"RabbitMQ">>, - <<"timestamp">> => 10000 - }, - emqx_bridge:send_message(BridgeID, Payload), - ExpectedResult = Payload#{ - <<"secret">> => 42 - }, - ?assertEqual(ExpectedResult, receive_simple_test_message(Config)), - ok = delete_bridge(Name), - ok. - -t_heavy_batching(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - NumberOfMessages = 20000, - RabbitMQ = get_rabbitmq(Config), - UseTLS = get_tls(Config), - BridgeConf = RabbitMQ#{ - batch_size => 10173, - batch_time_ms => 50 - }, - BridgeID = create_bridge(Name, UseTLS, BridgeConf), - SendMessage = fun(Key) -> - Payload = #{<<"key">> => Key}, - emqx_bridge:send_message(BridgeID, Payload) - end, - [SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)], - AllMessages = lists:foldl( - fun(_, Acc) -> - Message = receive_simple_test_message(Config), - #{<<"key">> := Key} = Message, - Acc#{Key => true} - end, - #{}, - lists:seq(1, NumberOfMessages) - ), - ?assertEqual(NumberOfMessages, maps:size(AllMessages)), - ok = delete_bridge(Name), - ok. - -receive_simple_test_message(Config) -> - #{channel := Channel} = get_channel_connection(Config), - #'basic.consume_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call( - Channel, - #'basic.consume'{ - queue = rabbit_mq_queue() - } - ), - receive - %% This is the first message received - #'basic.consume_ok'{} -> - ok - end, - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -> - %% Ack the message - amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), - %% Cancel the consumer - #'basic.cancel_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), - emqx_utils_json:decode(Content#amqp_msg.payload) - after 5000 -> - ?assert(false, "Did not receive message within 5 second") - end. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl index ee5a2609b..56cdc8b0d 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_rabbitmq_connector_SUITE). @@ -12,6 +12,18 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-import(emqx_bridge_rabbitmq_test_utils, [ + rabbit_mq_exchange/0, + rabbit_mq_routing_key/0, + rabbit_mq_queue/0, + rabbit_mq_host/0, + rabbit_mq_port/0, + get_rabbitmq/1, + ssl_options/1, + get_channel_connection/1, + parse_and_check/4, + receive_message_from_rabbitmq/1 +]). %% This test SUITE requires a running RabbitMQ instance. If you don't want to %% bring up the whole CI infrastructure with the `scripts/ct/run.sh` script @@ -21,99 +33,24 @@ %% %% docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3.11-management -rabbit_mq_host() -> - list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")). - -rabbit_mq_port() -> - list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")). - -rabbit_mq_password() -> - <<"guest">>. - -rabbit_mq_exchange() -> - <<"test_exchange">>. - -rabbit_mq_queue() -> - <<"test_queue">>. - -rabbit_mq_routing_key() -> - <<"test_routing_key">>. - all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, tcp}, + {group, tls} + ]. -init_per_suite(Config) -> - Host = rabbit_mq_host(), - Port = rabbit_mq_port(), - ct:pal("rabbitmq:~p~n", [{Host, Port}]), - case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of - true -> - Apps = emqx_cth_suite:start( - [emqx_conf, emqx_connector, emqx_bridge_rabbitmq], - #{work_dir => emqx_cth_suite:work_dir(Config)} - ), - ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port), - [{channel_connection, ChannelConnection}, {suite_apps, Apps} | Config]; - false -> - case os:getenv("IS_CI") of - "yes" -> - throw(no_rabbitmq); - _ -> - {skip, no_rabbitmq} - end - end. +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {tcp, AllTCs}, + {tls, AllTCs} + ]. -setup_rabbit_mq_exchange_and_queue(Host, Port) -> - %% Create an exchange and a queue - {ok, Connection} = - amqp_connection:start(#amqp_params_network{ - host = binary_to_list(Host), - port = Port - }), - {ok, Channel} = amqp_connection:open_channel(Connection), - %% Create an exchange - #'exchange.declare_ok'{} = - amqp_channel:call( - Channel, - #'exchange.declare'{ - exchange = rabbit_mq_exchange(), - type = <<"topic">> - } - ), - %% Create a queue - #'queue.declare_ok'{} = - amqp_channel:call( - Channel, - #'queue.declare'{queue = rabbit_mq_queue()} - ), - %% Bind the queue to the exchange - #'queue.bind_ok'{} = - amqp_channel:call( - Channel, - #'queue.bind'{ - queue = rabbit_mq_queue(), - exchange = rabbit_mq_exchange(), - routing_key = rabbit_mq_routing_key() - } - ), - #{ - connection => Connection, - channel => Channel - }. +init_per_group(Group, Config) -> + emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config). -get_channel_connection(Config) -> - proplists:get_value(channel_connection, Config). - -end_per_suite(Config) -> - #{ - connection := Connection, - channel := Channel - } = get_channel_connection(Config), - %% Close the channel - ok = amqp_channel:close(Channel), - %% Close the connection - ok = amqp_connection:close(Connection), - ok = emqx_cth_suite:stop(?config(suite_apps, Config)). +end_per_group(Group, Config) -> + emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config). % %%------------------------------------------------------------------------------ % %% Testcases @@ -143,7 +80,6 @@ t_start_passfile(Config) -> ). perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> - #{channel := Channel} = get_channel_connection(TestConfig), CheckedConfig = check_config(InitialConfig), #{ id := PoolName, @@ -159,7 +95,7 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> emqx_resource:get_instance(ResourceID), ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)), %% Perform query as further check that the resource is working as expected - perform_query(ResourceID, Channel), + perform_query(ResourceID, TestConfig), ?assertEqual(ok, emqx_resource:stop(ResourceID)), %% Resource will be listed still, but state will be changed and healthcheck will fail %% as the worker no longer exists. @@ -181,7 +117,7 @@ perform_lifecycle_check(ResourceID, InitialConfig, TestConfig) -> emqx_resource:get_instance(ResourceID), ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)), %% Check that everything is working again by performing a query - perform_query(ResourceID, Channel), + perform_query(ResourceID, TestConfig), % Stop and remove the resource in one go. ?assertEqual(ok, emqx_resource:remove_local(ResourceID)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), @@ -214,37 +150,12 @@ perform_query(PoolName, Channel) -> ?assertEqual(ok, emqx_resource_manager:add_channel(PoolName, ChannelId, ActionConfig)), ok = emqx_resource:query(PoolName, {ChannelId, payload()}), %% Get the message from queue: - ok = receive_simple_test_message(Channel), + SendData = test_data(), + RecvData = receive_message_from_rabbitmq(Channel), + ?assertMatch(SendData, RecvData), ?assertEqual(ok, emqx_resource_manager:remove_channel(PoolName, ChannelId)), ok. -receive_simple_test_message(Channel) -> - #'basic.consume_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call( - Channel, - #'basic.consume'{ - queue = rabbit_mq_queue() - } - ), - receive - %% This is the first message received - #'basic.consume_ok'{} -> - ok - end, - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -> - Expected = test_data(), - ?assertEqual(Expected, emqx_utils_json:decode(Content#amqp_msg.payload)), - %% Ack the message - amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), - %% Cancel the consumer - #'basic.cancel_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), - ok - after 5000 -> - ?assert(false, "Did not receive message within 5 second") - end. - rabbitmq_config() -> rabbitmq_config(#{}). @@ -278,3 +189,6 @@ rabbitmq_action_config() -> wait_for_publish_confirmations => true } }. + +rabbit_mq_password() -> + <<"guest">>. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl new file mode 100644 index 000000000..47df47976 --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl @@ -0,0 +1,203 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_rabbitmq_test_utils). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +init_per_group(tcp, Config) -> + RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"), + RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")), + case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of + true -> + Config1 = common_init_per_group(#{ + host => RabbitMQHost, port => RabbitMQPort, tls => false + }), + Config1 ++ Config; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_rabbitmq); + _ -> + {skip, no_rabbitmq} + end + end; +init_per_group(tls, Config) -> + RabbitMQHost = os:getenv("RABBITMQ_TLS_HOST", "rabbitmq"), + RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_TLS_PORT", "5671")), + case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of + true -> + Config1 = common_init_per_group(#{ + host => RabbitMQHost, port => RabbitMQPort, tls => true + }), + Config1 ++ Config; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_rabbitmq); + _ -> + {skip, no_rabbitmq} + end + end; +init_per_group(_Group, Config) -> + Config. + +common_init_per_group(Opts) -> + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([ + emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine + ]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(amqp_client), + emqx_mgmt_api_test_util:init_suite(), + #{host := Host, port := Port, tls := UseTLS} = Opts, + ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS), + [ + {channel_connection, ChannelConnection}, + {rabbitmq, #{server => Host, port => Port, tls => UseTLS}} + ]. + +setup_rabbit_mq_exchange_and_queue(Host, Port, UseTLS) -> + SSLOptions = + case UseTLS of + false -> none; + true -> emqx_tls_lib:to_client_opts(ssl_options(UseTLS)) + end, + %% Create an exchange and a queue + {ok, Connection} = + amqp_connection:start(#amqp_params_network{ + host = Host, + port = Port, + ssl_options = SSLOptions + }), + {ok, Channel} = amqp_connection:open_channel(Connection), + %% Create an exchange + #'exchange.declare_ok'{} = + amqp_channel:call( + Channel, + #'exchange.declare'{ + exchange = rabbit_mq_exchange(), + type = <<"topic">> + } + ), + %% Create a queue + #'queue.declare_ok'{} = + amqp_channel:call( + Channel, + #'queue.declare'{queue = rabbit_mq_queue()} + ), + %% Bind the queue to the exchange + #'queue.bind_ok'{} = + amqp_channel:call( + Channel, + #'queue.bind'{ + queue = rabbit_mq_queue(), + exchange = rabbit_mq_exchange(), + routing_key = rabbit_mq_routing_key() + } + ), + #{ + connection => Connection, + channel => Channel + }. + +end_per_group(_Group, Config) -> + #{ + connection := Connection, + channel := Channel + } = get_channel_connection(Config), + amqp_channel:call(Channel, #'queue.purge'{queue = rabbit_mq_queue()}), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector), + _ = application:stop(emqx_bridge), + %% Close the channel + ok = amqp_channel:close(Channel), + %% Close the connection + ok = amqp_connection:close(Connection). + +rabbit_mq_host() -> + list_to_binary(os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq")). + +rabbit_mq_port() -> + list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")). + +rabbit_mq_exchange() -> + <<"messages">>. + +rabbit_mq_queue() -> + <<"test_queue">>. + +rabbit_mq_routing_key() -> + <<"test_routing_key">>. + +get_rabbitmq(Config) -> + proplists:get_value(rabbitmq, Config). + +get_channel_connection(Config) -> + proplists:get_value(channel_connection, Config). + +ssl_options(true) -> + CertsDir = filename:join([ + emqx_common_test_helpers:proj_root(), + ".ci", + "docker-compose-file", + "certs" + ]), + #{ + enable => true, + cacertfile => filename:join([CertsDir, "ca.crt"]), + certfile => filename:join([CertsDir, "client.pem"]), + keyfile => filename:join([CertsDir, "client.key"]) + }; +ssl_options(false) -> + #{ + enable => false + }. + +parse_and_check(Key, Mod, Conf, Name) -> + ConfStr = hocon_pp:do(Conf, #{}), + ct:pal(ConfStr), + {ok, RawConf} = hocon:binary(ConfStr, #{format => map}), + hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}), + #{Key := #{<<"rabbitmq">> := #{Name := RetConf}}} = RawConf, + RetConf. + +receive_message_from_rabbitmq(Config) -> + #{channel := Channel} = get_channel_connection(Config), + #'basic.consume_ok'{consumer_tag = ConsumerTag} = + amqp_channel:call( + Channel, + #'basic.consume'{ + queue = rabbit_mq_queue() + } + ), + receive + %% This is the first message received + #'basic.consume_ok'{} -> + ok + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -> + %% Ack the message + amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), + %% Cancel the consumer + #'basic.cancel_ok'{consumer_tag = ConsumerTag} = + amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), + Payload = Content#amqp_msg.payload, + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {ok, Msg} -> Msg; + {error, _} -> ?assert(false, {"Failed to decode the message", Payload}) + end + after 5000 -> + ?assert(false, "Did not receive message within 5 second") + end. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v1_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v1_SUITE.erl new file mode 100644 index 000000000..48756c616 --- /dev/null +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v1_SUITE.erl @@ -0,0 +1,221 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_rabbitmq_v1_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +%% See comment in +%% apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl for how to +%% run this without bringing up the whole CI infrastructure +-define(TYPE, <<"rabbitmq">>). +-import(emqx_bridge_rabbitmq_test_utils, [ + rabbit_mq_exchange/0, + rabbit_mq_routing_key/0, + rabbit_mq_queue/0, + rabbit_mq_host/0, + rabbit_mq_port/0, + get_rabbitmq/1, + ssl_options/1, + get_channel_connection/1, + parse_and_check/4, + receive_message_from_rabbitmq/1 +]). +%%------------------------------------------------------------------------------ +%% Common Test Setup, Tear down and Testcase List +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, tcp}, + {group, tls} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {tcp, AllTCs}, + {tls, AllTCs} + ]. + +init_per_group(Group, Config) -> + emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config). + +end_per_group(Group, Config) -> + emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config). + +create_bridge(Name, Config) -> + BridgeConfig = rabbitmq_config(Config), + {ok, _} = emqx_bridge:create(?TYPE, Name, BridgeConfig), + emqx_bridge_resource:bridge_id(?TYPE, Name). + +delete_bridge(Name) -> + ok = emqx_bridge:remove(?TYPE, Name). + +%%------------------------------------------------------------------------------ +%% Test Cases +%%------------------------------------------------------------------------------ + +t_create_delete_bridge(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + create_bridge(Name, RabbitMQ), + Bridges = emqx_bridge:list(), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Bridges), Bridges), + ok = delete_bridge(Name), + BridgesAfterDelete = emqx_bridge:list(), + ?assertNot(lists:any(Any, BridgesAfterDelete), BridgesAfterDelete), + ok. + +t_create_delete_bridge_non_existing_server(_Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_bridge(Name, #{server => <<"non_existing_server">>, port => 3174}), + %% Check that the new bridge is in the list of bridges + Bridges = emqx_bridge:list(), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Bridges)), + ok = delete_bridge(Name), + BridgesAfterDelete = emqx_bridge:list(), + ?assertNot(lists:any(Any, BridgesAfterDelete)), + ok. + +t_send_message_query(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + BridgeID = create_bridge(Name, RabbitMQ#{batch_size => 1}), + Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, + %% This will use the SQL template included in the bridge + emqx_bridge:send_message(BridgeID, Payload), + %% Check that the data got to the database + ?assertEqual(Payload, receive_message_from_rabbitmq(Config)), + ok = delete_bridge(Name), + ok. + +t_send_message_query_with_template(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + BridgeID = create_bridge(Name, RabbitMQ#{ + batch_size => 1, + payload_template => payload_template() + }), + Payload = #{ + <<"key">> => 7, + <<"data">> => <<"RabbitMQ">>, + <<"timestamp">> => 10000 + }, + emqx_bridge:send_message(BridgeID, Payload), + %% Check that the data got to the database + ExpectedResult = Payload#{ + <<"secret">> => 42 + }, + ?assertEqual(ExpectedResult, receive_message_from_rabbitmq(Config)), + ok = delete_bridge(Name), + ok. + +t_send_simple_batch(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + BridgeConf = RabbitMQ#{batch_size => 100}, + BridgeID = create_bridge(Name, BridgeConf), + Payload = #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}, + emqx_bridge:send_message(BridgeID, Payload), + ?assertEqual(Payload, receive_message_from_rabbitmq(Config)), + ok = delete_bridge(Name), + ok. + +t_send_simple_batch_with_template(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + RabbitMQ = get_rabbitmq(Config), + BridgeConf = + RabbitMQ#{ + batch_size => 100, + payload_template => payload_template() + }, + BridgeID = create_bridge(Name, BridgeConf), + Payload = #{ + <<"key">> => 7, + <<"data">> => <<"RabbitMQ">>, + <<"timestamp">> => 10000 + }, + emqx_bridge:send_message(BridgeID, Payload), + ExpectedResult = Payload#{<<"secret">> => 42}, + ?assertEqual(ExpectedResult, receive_message_from_rabbitmq(Config)), + ok = delete_bridge(Name), + ok. + +t_heavy_batching(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + NumberOfMessages = 20000, + RabbitMQ = get_rabbitmq(Config), + BridgeConf = RabbitMQ#{ + batch_size => 10173, + batch_time_ms => 50 + }, + BridgeID = create_bridge(Name, BridgeConf), + SendMessage = fun(Key) -> + Payload = #{<<"key">> => Key}, + emqx_bridge:send_message(BridgeID, Payload) + end, + [SendMessage(Key) || Key <- lists:seq(1, NumberOfMessages)], + AllMessages = lists:foldl( + fun(_, Acc) -> + Message = receive_message_from_rabbitmq(Config), + #{<<"key">> := Key} = Message, + Acc#{Key => true} + end, + #{}, + lists:seq(1, NumberOfMessages) + ), + ?assertEqual(NumberOfMessages, maps:size(AllMessages)), + ok = delete_bridge(Name), + ok. + +rabbitmq_config(Config) -> + UseTLS = maps:get(tls, Config, false), + BatchSize = maps:get(batch_size, Config, 1), + BatchTime = maps:get(batch_time_ms, Config, 0), + Name = atom_to_binary(?MODULE), + Server = maps:get(server, Config, rabbit_mq_host()), + Port = maps:get(port, Config, rabbit_mq_port()), + Template = maps:get(payload_template, Config, <<"">>), + Bridge = + #{ + <<"bridges">> => #{ + <<"rabbitmq">> => #{ + Name => #{ + <<"enable">> => true, + <<"ssl">> => ssl_options(UseTLS), + <<"server">> => Server, + <<"port">> => Port, + <<"username">> => <<"guest">>, + <<"password">> => <<"guest">>, + <<"routing_key">> => rabbit_mq_routing_key(), + <<"exchange">> => rabbit_mq_exchange(), + <<"payload_template">> => Template, + <<"resource_opts">> => #{ + <<"batch_size">> => BatchSize, + <<"batch_time">> => BatchTime + } + } + } + } + }, + parse_and_check(<<"bridges">>, emqx_bridge_schema, Bridge, Name). + +payload_template() -> + << + "{" + " \"key\": ${key}," + " \"data\": \"${data}\"," + " \"timestamp\": ${timestamp}," + " \"secret\": 42" + "}" + >>. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl index 5a17c2914..8b11f732a 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_rabbitmq_v2_SUITE). @@ -12,203 +12,108 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). --define(TYPE, <<"rabbitmq">>). +-import(emqx_bridge_rabbitmq_test_utils, [ + rabbit_mq_exchange/0, + rabbit_mq_routing_key/0, + rabbit_mq_queue/0, + rabbit_mq_host/0, + rabbit_mq_port/0, + get_rabbitmq/1, + get_tls/1, + ssl_options/1, + get_channel_connection/1, + parse_and_check/4, + receive_message_from_rabbitmq/1 +]). -import(emqx_common_test_helpers, [on_exit/1]). -rabbit_mq_host() -> - <<"rabbitmq">>. - -rabbit_mq_port() -> - 5672. - -rabbit_mq_exchange() -> - <<"messages">>. - -rabbit_mq_queue() -> - <<"test_queue">>. - -rabbit_mq_routing_key() -> - <<"test_routing_key">>. - -get_channel_connection(Config) -> - proplists:get_value(channel_connection, Config). - -get_rabbitmq(Config) -> - proplists:get_value(rabbitmq, Config). - -%%------------------------------------------------------------------------------ -%% Common Test Setup, Tear down and Testcase List -%%------------------------------------------------------------------------------ +-define(TYPE, <<"rabbitmq">>). all() -> - emqx_common_test_helpers:all(?MODULE). - -init_per_suite(Config) -> - RabbitMQHost = os:getenv("RABBITMQ_PLAIN_HOST", "rabbitmq"), - RabbitMQPort = list_to_integer(os:getenv("RABBITMQ_PLAIN_PORT", "5672")), - case emqx_common_test_helpers:is_tcp_server_available(RabbitMQHost, RabbitMQPort) of - true -> - Config1 = common_init(#{ - host => RabbitMQHost, port => RabbitMQPort - }), - Name = atom_to_binary(?MODULE), - Config2 = [{connector, Name} | Config1 ++ Config], - create_connector(Name, get_rabbitmq(Config2)), - Config2; - false -> - case os:getenv("IS_CI") of - "yes" -> - throw(no_rabbitmq); - _ -> - {skip, no_rabbitmq} - end - end. - -common_init(Opts) -> - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([ - emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine - ]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource]), - {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(amqp_client), - emqx_mgmt_api_test_util:init_suite(), - #{host := Host, port := Port} = Opts, - ChannelConnection = setup_rabbit_mq_exchange_and_queue(Host, Port), [ - {channel_connection, ChannelConnection}, - {rabbitmq, #{server => Host, port => Port}} + {group, tcp}, + {group, tls} ]. -setup_rabbit_mq_exchange_and_queue(Host, Port) -> - %% Create an exchange and a queue - {ok, Connection} = - amqp_connection:start(#amqp_params_network{ - host = Host, - port = Port, - ssl_options = none - }), - {ok, Channel} = amqp_connection:open_channel(Connection), - %% Create an exchange - #'exchange.declare_ok'{} = - amqp_channel:call( - Channel, - #'exchange.declare'{ - exchange = rabbit_mq_exchange(), - type = <<"topic">> - } - ), - %% Create a queue - #'queue.declare_ok'{} = - amqp_channel:call( - Channel, - #'queue.declare'{queue = rabbit_mq_queue()} - ), - %% Bind the queue to the exchange - #'queue.bind_ok'{} = - amqp_channel:call( - Channel, - #'queue.bind'{ - queue = rabbit_mq_queue(), - exchange = rabbit_mq_exchange(), - routing_key = rabbit_mq_routing_key() - } - ), - #{ - connection => Connection, - channel => Channel - }. +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {tcp, AllTCs}, + {tls, AllTCs} + ]. -end_per_suite(Config) -> - delete_connector(proplists:get_value(connector, Config)), - #{ - connection := Connection, - channel := Channel - } = get_channel_connection(Config), - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine]), - ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), - _ = application:stop(emqx_connector), - _ = application:stop(emqx_bridge), - %% Close the channel - ok = amqp_channel:close(Channel), - %% Close the connection - ok = amqp_connection:close(Connection). +init_per_group(Group, Config) -> + Config1 = emqx_bridge_rabbitmq_test_utils:init_per_group(Group, Config), + Name = atom_to_binary(?MODULE), + create_connector(Name, get_rabbitmq(Config1)), + Config1. + +end_per_group(Group, Config) -> + Name = atom_to_binary(?MODULE), + delete_connector(Name), + emqx_bridge_rabbitmq_test_utils:end_per_group(Group, Config). rabbitmq_connector(Config) -> + UseTLS = maps:get(tls, Config, false), Name = atom_to_binary(?MODULE), Server = maps:get(server, Config, rabbit_mq_host()), Port = maps:get(port, Config, rabbit_mq_port()), - ConfigStr = - io_lib:format( - "connectors.rabbitmq.~s {\n" - " enable = true\n" - " ssl = {enable = false}\n" - " server = \"~s\"\n" - " port = ~p\n" - " username = \"guest\"\n" - " password = \"guest\"\n" - "}\n", - [ - Name, - Server, - Port - ] - ), - ct:pal(ConfigStr), - parse_and_check(<<"connectors">>, emqx_connector_schema, ConfigStr, <<"rabbitmq">>, Name). + Connector = #{ + <<"connectors">> => #{ + <<"rabbitmq">> => #{ + Name => #{ + <<"enable">> => true, + <<"ssl">> => ssl_options(UseTLS), + <<"server">> => Server, + <<"port">> => Port, + <<"username">> => <<"guest">>, + <<"password">> => <<"guest">> + } + } + } + }, + parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name). rabbitmq_source() -> Name = atom_to_binary(?MODULE), - ConfigStr = - io_lib:format( - "sources.rabbitmq.~s {\n" - "connector = ~s\n" - "enable = true\n" - "parameters {\n" - "no_ack = true\n" - "queue = ~s\n" - "wait_for_publish_confirmations = true\n" - "}}\n", - [ - Name, - Name, - rabbit_mq_queue() - ] - ), - ct:pal(ConfigStr), - parse_and_check(<<"sources">>, emqx_bridge_v2_schema, ConfigStr, <<"rabbitmq">>, Name). + Source = #{ + <<"sources">> => #{ + <<"rabbitmq">> => #{ + Name => #{ + <<"enable">> => true, + <<"connector">> => Name, + <<"parameters">> => #{ + <<"no_ack">> => true, + <<"queue">> => rabbit_mq_queue(), + <<"wait_for_publish_confirmations">> => true + } + } + } + } + }, + parse_and_check(<<"sources">>, emqx_bridge_v2_schema, Source, Name). rabbitmq_action() -> Name = atom_to_binary(?MODULE), - ConfigStr = - io_lib:format( - "actions.rabbitmq.~s {\n" - "connector = ~s\n" - "enable = true\n" - "parameters {\n" - "exchange: ~s\n" - "payload_template: \"${.payload}\"\n" - "routing_key: ~s\n" - "delivery_mode: non_persistent\n" - "publish_confirmation_timeout: 30s\n" - "wait_for_publish_confirmations = true\n" - "}}\n", - [ - Name, - Name, - rabbit_mq_exchange(), - rabbit_mq_routing_key() - ] - ), - ct:pal(ConfigStr), - parse_and_check(<<"actions">>, emqx_bridge_v2_schema, ConfigStr, <<"rabbitmq">>, Name). - -parse_and_check(Key, Mod, ConfigStr, Type, Name) -> - {ok, RawConf} = hocon:binary(ConfigStr, #{format => map}), - hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}), - #{Key := #{Type := #{Name := RetConfig}}} = RawConf, - RetConfig. + Action = #{ + <<"actions">> => #{ + <<"rabbitmq">> => #{ + Name => #{ + <<"connector">> => Name, + <<"enable">> => true, + <<"parameters">> => #{ + <<"exchange">> => rabbit_mq_exchange(), + <<"payload_template">> => <<"${.payload}">>, + <<"routing_key">> => rabbit_mq_routing_key(), + <<"delivery_mode">> => <<"non_persistent">>, + <<"publish_confirmation_timeout">> => <<"30s">>, + <<"wait_for_publish_confirmations">> => true + } + } + } + } + }, + parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name). create_connector(Name, Config) -> Connector = rabbitmq_connector(Config), @@ -308,7 +213,7 @@ t_action(Config) -> Payload = payload(), PayloadBin = emqx_utils_json:encode(Payload), {ok, _} = emqtt:publish(C1, Topic, #{}, PayloadBin, [{qos, 1}, {retain, false}]), - Msg = receive_test_message_from_rabbitmq(Config), + Msg = receive_message_from_rabbitmq(Config), ?assertMatch(Payload, Msg), ok = emqtt:disconnect(C1), ok = delete_action(Name), @@ -354,29 +259,3 @@ send_test_message_to_rabbitmq(Config) -> } ), ok. - -receive_test_message_from_rabbitmq(Config) -> - #{channel := Channel} = get_channel_connection(Config), - #'basic.consume_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call( - Channel, - #'basic.consume'{ - queue = rabbit_mq_queue() - } - ), - receive - %% This is the first message received - #'basic.consume_ok'{} -> - ok - end, - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -> - %% Ack the message - amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), - %% Cancel the consumer - #'basic.cancel_ok'{consumer_tag = ConsumerTag} = - amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = ConsumerTag}), - emqx_utils_json:decode(Content#amqp_msg.payload) - after 5000 -> - ?assert(false, "Did not receive message within 5 second") - end.