From 7f1b4cef271d85a1470e6742dfa991822c4632c0 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 20 Feb 2024 09:36:01 +0800 Subject: [PATCH 1/4] feat: pulsar bridge v2 --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + apps/emqx_bridge/src/emqx_bridge_api.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 8 +- .../src/schema/emqx_bridge_enterprise.erl | 2 +- .../src/emqx_bridge_mqtt_connector_schema.erl | 2 +- .../src/emqx_bridge_mqtt_pubsub_schema.erl | 2 +- .../src/emqx_bridge_pulsar.app.src | 2 +- .../src/emqx_bridge_pulsar.erl | 48 ++- .../src/emqx_bridge_pulsar_action_info.erl | 54 ++++ ...r.erl => emqx_bridge_pulsar_connector.erl} | 144 +++++---- .../emqx_bridge_pulsar_connector_schema.erl | 71 +++++ .../src/emqx_bridge_pulsar_pubsub_schema.erl | 123 ++++++++ ...=> emqx_bridge_pulsar_connector_SUITE.erl} | 41 ++- .../emqx_bridge_rabbitmq_pubsub_schema.erl | 2 +- .../src/schema/emqx_connector_ee_schema.erl | 14 + .../src/schema/emqx_connector_schema.erl | 3 + rel/i18n/emqx_bridge_pulsar.hocon | 291 +++++++++--------- .../emqx_bridge_pulsar_pubsub_schema.hocon | 38 +++ 18 files changed, 584 insertions(+), 264 deletions(-) create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl rename apps/emqx_bridge_pulsar/src/{emqx_bridge_pulsar_impl_producer.erl => emqx_bridge_pulsar_connector.erl} (81%) create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl create mode 100644 apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl rename apps/emqx_bridge_pulsar/test/{emqx_bridge_pulsar_impl_producer_SUITE.erl => emqx_bridge_pulsar_connector_SUITE.erl} (97%) create mode 100644 rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 12fda5d51..36ac10716 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -110,6 +110,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_es_action_info, emqx_bridge_opents_action_info, emqx_bridge_rabbitmq_action_info, + emqx_bridge_pulsar_action_info, emqx_bridge_greptimedb_action_info, emqx_bridge_tdengine_action_info, emqx_bridge_s3_action_info diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 35b964b83..69b17a843 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -761,7 +761,7 @@ is_bridge_enabled(BridgeType, BridgeName) -> end. is_bridge_enabled_v1(BridgeType, BridgeName) -> - %% we read from the transalted config because the defaults are populated here. + %% we read from the translated config because the defaults are populated here. try emqx:get_config([bridges, BridgeType, binary_to_existing_atom(BridgeName)]) of ConfMap -> maps:get(enable, ConfMap, false) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 56fe0029a..be20b9a7b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1659,8 +1659,11 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> connector_conf := ConnectorRawConf, bridge_v2_type := BridgeV2Type, bridge_v2_name := _BridgeName, - bridge_v2_conf := BridgeV2RawConf + bridge_v2_conf := BridgeV2RawConf0 } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), + BridgeV2RawConf = emqx_action_info:action_convert_from_connector( + BridgeType, ConnectorRawConf, BridgeV2RawConf0 + ), create_dry_run_helper( ensure_atom_root_key(ConfRootKey), BridgeV2Type, ConnectorRawConf, BridgeV2RawConf ) @@ -1928,7 +1931,8 @@ convert_from_connectors(ConfRootKey, Conf) -> convert_from_connector(ConfRootKey, Type, Name, Action = #{<<"connector">> := ConnectorName}) -> case get_connector_info(ConnectorName, Type) of {ok, Connector} -> - Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action), + TypeAtom = to_existing_atom(Type), + Action1 = emqx_action_info:action_convert_from_connector(TypeAtom, Connector, Action), {ok, Action1}; {error, not_found} -> {error, #{ diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index faac94dcb..233d87fa1 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -123,7 +123,7 @@ resource_type(dynamo) -> emqx_bridge_dynamo_connector; resource_type(rocketmq) -> emqx_bridge_rocketmq_connector; resource_type(sqlserver) -> emqx_bridge_sqlserver_connector; resource_type(opents) -> emqx_bridge_opents_connector; -resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer; +resource_type(pulsar_producer) -> emqx_bridge_pulsar_connector; resource_type(oracle) -> emqx_oracle; resource_type(iotdb) -> emqx_bridge_iotdb_connector; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl index 6d5ae4f4c..7233e9e6c 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl @@ -308,7 +308,7 @@ fields(Field) when Fields = fields("specific_connector_config"), emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); fields(What) -> - error({emqx_bridge_mqtt_connector_schema, missing_field_handler, What}). + error({?MODULE, missing_field_handler, What}). ingress_pool_size(desc) -> ?DESC("ingress_pool_size"); diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl index 2b9bb05bd..60cf634c4 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl @@ -124,7 +124,7 @@ fields(Field) when -> emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields("mqtt_subscriber_source")); fields(What) -> - error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}). + error({?MODULE, missing_field_handler, What}). %% v2: api schema %% The parameter equls to %% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1 diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index c9abebf8b..ce7c313ae 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.1.8"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index f9f37846e..291c656ef 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -31,7 +31,21 @@ roots() -> []. fields(pulsar_producer) -> - fields(config) ++ fields(producer_opts); + fields(config) ++ + emqx_bridge_pulsar_pubsub_schema:fields(action_parameters) ++ + fields(producer_opts) ++ + [ + {local_topic, + mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})}, + {resource_opts, + mk( + ref(producer_resource_opts), + #{ + required => false, + desc => ?DESC(emqx_resource_schema, "creation_opts") + } + )} + ]; fields(config) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, @@ -85,10 +99,6 @@ fields(producer_opts) -> mk(emqx_schema:bytesize(), #{ default => <<"1MB">>, desc => ?DESC("producer_send_buffer") })}, - {sync_timeout, - mk(emqx_schema:timeout_duration_ms(), #{ - default => <<"3s">>, desc => ?DESC("producer_sync_timeout") - })}, {retention_period, mk( %% not used in a `receive ... after' block, just timestamp comparison @@ -100,26 +110,13 @@ fields(producer_opts) -> emqx_schema:bytesize(), #{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")} )}, - {local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})}, {pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})}, {strategy, mk( hoconsc:enum([random, roundrobin, key_dispatch]), #{default => random, desc => ?DESC("producer_strategy")} )}, - {buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})}, - {message, - mk(ref(producer_pulsar_message), #{ - required => false, desc => ?DESC("producer_message_opts") - })}, - {resource_opts, - mk( - ref(producer_resource_opts), - #{ - required => false, - desc => ?DESC(emqx_resource_schema, "creation_opts") - } - )} + {buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})} ]; fields(producer_buffer) -> [ @@ -144,12 +141,6 @@ fields(producer_buffer) -> desc => ?DESC("buffer_memory_overload_protection") })} ]; -fields(producer_pulsar_message) -> - [ - {key, - mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC("producer_key_template")})}, - {value, mk(string(), #{default => <<"${.}">>, desc => ?DESC("producer_value_template")})} - ]; fields(producer_resource_opts) -> SupportedOpts = [ health_check_interval, @@ -225,8 +216,8 @@ producer_strategy_key_validator( producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf)); producer_strategy_key_validator(#{ <<"strategy">> := key_dispatch, - <<"message">> := #{<<"key">> := ""} -}) -> + <<"message">> := #{<<"key">> := Key} +}) when Key =:= "" orelse Key =:= <<>> -> {error, "Message key cannot be empty when `key_dispatch` strategy is used"}; producer_strategy_key_validator(_) -> ok. @@ -248,8 +239,7 @@ struct_names() -> [ auth_basic, auth_token, - producer_buffer, - producer_pulsar_message + producer_buffer ]. override_default(OriginalFn, NewDefault) -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl new file mode 100644 index 000000000..f51ed7884 --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl @@ -0,0 +1,54 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_pulsar_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + is_action/1, + action_convert_from_connector/2 +]). + +is_action(_) -> true. + +bridge_v1_type_name() -> pulsar_producer. + +action_type_name() -> pulsar. + +connector_type_name() -> pulsar. + +schema_module() -> emqx_bridge_pulsar_pubsub_schema. + +action_convert_from_connector(ConnectorConfig, ActionConfig) -> + Dispatch = emqx_utils_conv:bin(maps:get(<<"strategy">>, ConnectorConfig, <<>>)), + case Dispatch of + <<"key_dispatch">> -> + case emqx_utils_maps:deep_find([<<"parameters">>, <<"message">>], ActionConfig) of + {ok, Message} -> + Validator = + #{ + <<"strategy">> => key_dispatch, + <<"message">> => emqx_utils_maps:binary_key_map(Message) + }, + case emqx_bridge_pulsar:producer_strategy_key_validator(Validator) of + ok -> + ActionConfig; + {error, Reason} -> + throw(#{ + reason => Reason, + kind => validation_error + }) + end; + {not_found, _, _} -> + %% no message field, use the default message template + ActionConfig + end; + _ -> + ActionConfig + end. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl similarity index 81% rename from apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl rename to apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 2098cfeba..7b080d0e6 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_pulsar_impl_producer). +-module(emqx_bridge_pulsar_connector). -include("emqx_bridge_pulsar.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -13,8 +13,12 @@ callback_mode/0, query_mode/1, on_start/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, on_stop/2, on_get_status/2, + on_get_channel_status/3, on_query/3, on_query_async/4 ]). @@ -23,8 +27,7 @@ -type state() :: #{ pulsar_client_id := pulsar_client_id(), producers := pulsar_producers:producers(), - sync_timeout := erlang:timeout(), - message_template := message_template() + channels := map() }. -type buffer_mode() :: memory | disk | hybrid. -type compression_mode() :: no_compression | snappy | zlib. @@ -77,16 +80,12 @@ query_mode(_Config) -> -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> - #{ - bridge_name := BridgeName, - servers := Servers0, - ssl := SSL - } = Config, + #{servers := Servers0, ssl := SSL} = Config, Servers = format_servers(Servers0), - ClientId = make_client_id(InstanceId, BridgeName), + ClientId = make_client_id(InstanceId), ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId), SSLOpts = emqx_tls_lib:to_client_opts(SSL), - ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)), + ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(10)), ClientOpts = #{ connect_timeout => ConnectTimeout, ssl_opts => SSLOpts, @@ -119,6 +118,30 @@ on_start(InstanceId, Config) -> end, start_producer(Config, InstanceId, ClientId, ClientOpts). +on_add_channel( + _InstanceId, + #{channels := Channels} = State, + ChannelId, + #{parameters := #{message := Message, sync_timeout := SyncTimeout}} +) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + false -> + Parameters = #{ + message => compile_message_template(Message), + sync_timeout => SyncTimeout + }, + NewChannels = maps:put(ChannelId, Parameters, Channels), + {ok, State#{channels => NewChannels}} + end. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> + {ok, State#{channels => maps:remove(ChannelId, Channels)}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + -spec on_stop(resource_id(), state()) -> ok. on_stop(InstanceId, _State) -> case emqx_resource:get_allocated_resources(InstanceId) of @@ -174,76 +197,77 @@ on_get_status(_InstanceId, _State) -> %% create the bridge is not quite finished, `State = undefined'. connecting. --spec on_query(resource_id(), {send_message, map()}, state()) -> +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> connected; + false -> {error, channel_not_exists} + end. + +-spec on_query(resource_id(), tuple(), state()) -> {ok, term()} | {error, timeout} | {error, term()}. -on_query(_InstanceId, {send_message, Message}, State) -> - #{ - producers := Producers, - sync_timeout := SyncTimeout, - message_template := MessageTemplate - } = State, - PulsarMessage = render_message(Message, MessageTemplate), - try - pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) - catch - error:timeout -> - {error, timeout} +on_query(_InstanceId, {ChannelId, Message}, State) -> + #{producers := Producers, channels := Channels} = State, + case maps:find(ChannelId, Channels) of + error -> + {error, channel_not_exists}; + {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout}} -> + PulsarMessage = render_message(Message, MessageTmpl), + try + pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) + catch + error:timeout -> + {error, timeout} + end end. -spec on_query_async( - resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() + resource_id(), tuple(), {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, pid()}. -on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) -> - ?tp_span( - pulsar_producer_on_query_async, - #{instance_id => _InstanceId, message => Message}, - do_on_query_async(Message, AsyncReplyFn, State) - ). +on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> + #{producers := Producers, channels := Channels} = State, + case maps:find(ChannelId, Channels) of + error -> + {error, channel_not_exists}; + {ok, #{message := MessageTmpl}} -> + ?tp_span( + pulsar_producer_on_query_async, + #{instance_id => _InstanceId, message => Message}, + on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) + ) + end. -do_on_query_async(Message, AsyncReplyFn, State) -> - #{ - producers := Producers, - message_template := MessageTemplate - } = State, - PulsarMessage = render_message(Message, MessageTemplate), +on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) -> + PulsarMessage = render_message(Message, MessageTmpl), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). %%------------------------------------------------------------------------------------- %% Internal fns %%------------------------------------------------------------------------------------- --spec to_bin(atom() | string() | binary()) -> binary(). -to_bin(A) when is_atom(A) -> - atom_to_binary(A); -to_bin(L) when is_list(L) -> - list_to_binary(L); -to_bin(B) when is_binary(B) -> - B. - -spec format_servers(binary()) -> [string()]. format_servers(Servers0) -> - Servers1 = emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS), lists:map( fun(#{scheme := Scheme, hostname := Host, port := Port}) -> Scheme ++ "://" ++ Host ++ ":" ++ integer_to_list(Port) end, - Servers1 + emqx_schema:parse_servers(Servers0, ?PULSAR_HOST_OPTIONS) ). --spec make_client_id(resource_id(), atom() | binary()) -> pulsar_client_id(). -make_client_id(InstanceId, BridgeName) -> +-spec make_client_id(resource_id()) -> pulsar_client_id(). +make_client_id(InstanceId) -> case is_dry_run(InstanceId) of true -> pulsar_producer_probe; false -> + {pulsar, Name} = emqx_connector_resource:parse_connector_id(InstanceId), ClientIdBin = iolist_to_binary([ - <<"pulsar_producer:">>, - to_bin(BridgeName), + <<"pulsar:">>, + emqx_utils_conv:bin(Name), <<":">>, - to_bin(node()) + emqx_utils_conv:bin(node()) ]), binary_to_atom(ClientIdBin) end. @@ -252,10 +276,8 @@ make_client_id(InstanceId, BridgeName) -> is_dry_run(InstanceId) -> TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), case TestIdStart of - nomatch -> - false; - _ -> - string:equal(TestIdStart, InstanceId) + nomatch -> false; + _ -> string:equal(TestIdStart, InstanceId) end. conn_opts(#{authentication := none}) -> @@ -275,11 +297,11 @@ conn_opts(#{authentication := #{jwt := JWT}}) -> -spec replayq_dir(pulsar_client_id()) -> string(). replayq_dir(ClientId) -> - filename:join([emqx:data_dir(), "pulsar", to_bin(ClientId)]). + filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]). -spec producer_name(pulsar_client_id()) -> atom(). producer_name(ClientId) -> - ClientIdBin = to_bin(ClientId), + ClientIdBin = emqx_utils_conv:bin(ClientId), binary_to_atom( iolist_to_binary([ <<"producer-">>, @@ -303,12 +325,10 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> }, compression := Compression, max_batch_bytes := MaxBatchBytes, - message := MessageTemplateOpts, pulsar_topic := PulsarTopic0, retention_period := RetentionPeriod, send_buffer := SendBuffer, - strategy := Strategy, - sync_timeout := SyncTimeout + strategy := Strategy } = Config, {OffloadMode, ReplayQDir} = case BufferMode of @@ -330,7 +350,6 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> }, ProducerName = producer_name(ClientId), ?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}), - MessageTemplate = compile_message_template(MessageTemplateOpts), ProducerOpts0 = #{ batch_size => BatchSize, @@ -353,8 +372,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> State = #{ pulsar_client_id => ClientId, producers => Producers, - sync_timeout => SyncTimeout, - message_template => MessageTemplate + channels => #{} }, ?tp(pulsar_producer_bridge_started, #{}), {ok, State} diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl new file mode 100644 index 000000000..953318e0a --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_connector_schema). + +-export([namespace/0, roots/0, fields/1, desc/1]). +-export([connector_examples/1, connector_example_values/0]). + +-include("emqx_bridge_pulsar.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-define(TYPE, pulsar). + +namespace() -> ?TYPE. + +roots() -> []. + +fields("config_connector") -> + lists:keydelete(enable, 1, emqx_bridge_schema:common_bridge_fields()) ++ + emqx_bridge_pulsar:fields(config) ++ + emqx_bridge_pulsar:fields(producer_opts) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields("post") -> + emqx_connector_schema:type_and_name_fields(?TYPE) ++ fields("config_connector"); +fields("put") -> + fields("config_connector"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("config_connector"). + +desc("config_connector") -> + ?DESC(emqx_bridge_pulsar, "config_connector"); +desc(connector_resource_opts) -> + ?DESC(emqx_bridge_pulsar, connector_resource_opts); +desc(_) -> + undefined. + +connector_examples(Method) -> + [ + #{ + <<"pulsar">> => + #{ + summary => <<"Pulsar Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?TYPE, connector_example_values() + ) + } + } + ]. + +connector_example_values() -> + #{ + name => <<"pulsar_connector">>, + type => ?TYPE, + enable => true, + servers => <<"pulsar://127.0.0.1:6650">>, + authentication => none, + connect_timeout => <<"5s">>, + batch_size => 10, + compression => no_compression, + send_buffer => <<"1MB">>, + retention_period => <<"100s">>, + max_batch_bytes => <<"32MB">>, + pulsar_topic => <<"test_topic">>, + strategy => random, + buffer => #{mode => memory}, + ssl => #{enable => false} + }. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl new file mode 100644 index 000000000..a705ed560 --- /dev/null +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl @@ -0,0 +1,123 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_pubsub_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1, desc/1, namespace/0]). + +-export([bridge_v2_examples/1]). + +-define(ACTION_TYPE, pulsar). +-define(CONNECTOR_SCHEMA, emqx_bridge_rabbitmq_connector_schema). + +namespace() -> "pulsar". + +roots() -> []. + +fields(action) -> + {pulsar, + ?HOCON( + ?MAP(name, ?R_REF(publisher_action)), + #{ + desc => <<"Pulsar Action Config">>, + required => false + } + )}; +fields(publisher_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + ?HOCON( + ?R_REF(action_parameters), + #{ + required => true, + desc => ?DESC(action_parameters) + } + ), + #{resource_opts_ref => ?R_REF(action_resource_opts)} + ); +fields(action_parameters) -> + [ + {sync_timeout, + ?HOCON(emqx_schema:timeout_duration_ms(), #{ + default => <<"3s">>, desc => ?DESC("producer_sync_timeout") + })}, + {message, + ?HOCON(?R_REF(producer_pulsar_message), #{ + required => false, desc => ?DESC("producer_message_opts") + })} + ]; +fields(producer_pulsar_message) -> + [ + {key, + ?HOCON(string(), #{ + default => <<"${.clientid}">>, + desc => ?DESC("producer_key_template") + })}, + {value, + ?HOCON(string(), #{ + default => <<"${.}">>, + desc => ?DESC("producer_value_template") + })} + ]; +fields(action_resource_opts) -> + UnsupportedOpts = [ + batch_size, + batch_time, + worker_pool_size, + request_ttl, + inflight_window, + max_buffer_bytes, + query_mode + ], + lists:filter( + fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, + emqx_bridge_v2_schema:action_resource_opts_fields() + ); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(publisher_action)); +fields(What) -> + error({?MODULE, missing_field_handler, What}). + +desc("config") -> + ?DESC("desc_config"); +desc(action_resource_opts) -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc(action_parameters) -> + ?DESC(action_parameters); +desc(producer_pulsar_message) -> + ?DESC("producer_message_opts"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; +desc(publisher_action) -> + ?DESC(publisher_action); +desc(_) -> + undefined. + +bridge_v2_examples(Method) -> + [ + #{ + <<"pulsar">> => #{ + summary => <<"Pulsar Producer Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, + _ActionType = ?ACTION_TYPE, + _ConnectorType = pulsar, + #{ + parameters => #{ + sync_timeout => <<"5s">>, + message => #{ + key => <<"${.clientid}">>, + value => <<"${.}">> + } + } + } + ) + } + } + ]. diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl similarity index 97% rename from apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl rename to apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl index dfc5af3a7..c9b25cc71 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_pulsar_impl_producer_SUITE). +-module(emqx_bridge_pulsar_connector_SUITE). -compile(nowarn_export_all). -compile(export_all). @@ -550,7 +550,6 @@ kill_resource_managers() -> t_start_and_produce_ok(Config) -> MQTTTopic = ?config(mqtt_topic, Config), - ResourceId = resource_id(Config), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), QoS = 0, Payload = emqx_guid:to_hexstr(emqx_guid:gen()), @@ -600,6 +599,13 @@ t_start_and_produce_ok(Config) -> _Sleep = 100, _Attempts0 = 20, begin + BridgeId = emqx_bridge_resource:bridge_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + ConnectorId = emqx_bridge_resource:resource_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>, ?assertMatch( #{ counters := #{ @@ -612,7 +618,7 @@ t_start_and_produce_ok(Config) -> success := 2 } }, - emqx_resource_manager:get_metrics(ResourceId) + emqx_resource:get_metrics(Id) ), ?assertEqual( 1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success') @@ -631,17 +637,22 @@ t_start_and_produce_ok(Config) -> %% Under normal operations, the bridge will be called async via %% `simple_async_query'. t_sync_query(Config) -> - ResourceId = resource_id(Config), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), ?check_trace( begin ?assertMatch({ok, _}, create_bridge_api(Config)), + ResourceId = resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - Message = {send_message, #{payload => Payload}}, + BridgeId = emqx_bridge_resource:bridge_id(<<"pulsar">>, ?config(pulsar_name, Config)), + ConnectorId = emqx_bridge_resource:resource_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>, + Message = {Id, #{payload => Payload}}, ?assertMatch( {ok, #{sequence_id := _}}, emqx_resource:simple_sync_query(ResourceId, Message) ), @@ -688,13 +699,13 @@ t_create_via_http(Config) -> t_start_stop(Config) -> PulsarName = ?config(pulsar_name, Config), - ResourceId = resource_id(Config), ?check_trace( begin ?assertMatch( {ok, _}, create_bridge(Config) ), + ResourceId = resource_id(Config), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( @@ -745,11 +756,11 @@ t_on_get_status(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - ResourceId = resource_id(Config), ?assertMatch( {ok, _}, create_bridge(Config) ), + ResourceId = resource_id(Config), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( @@ -777,7 +788,6 @@ t_start_when_down(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - ResourceId = resource_id(Config), ?check_trace( begin emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> @@ -787,6 +797,7 @@ t_start_when_down(Config) -> ), ok end), + ResourceId = resource_id(Config), %% Should recover given enough time. ?retry( _Sleep = 1_000, @@ -902,7 +913,6 @@ t_failure_to_start_producer(Config) -> %% die for whatever reason. t_producer_process_crash(Config) -> MQTTTopic = ?config(mqtt_topic, Config), - ResourceId = resource_id(Config), QoS = 0, ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), @@ -934,6 +944,7 @@ t_producer_process_crash(Config) -> ok after 1_000 -> ct:fail("pid didn't die") end, + ResourceId = resource_id(Config), ?retry( _Sleep0 = 50, _Attempts0 = 50, @@ -995,8 +1006,8 @@ t_resource_manager_crash_after_producers_started(Config) -> Producers =/= undefined, 10_000 ), - ?assertMatch(ok, delete_bridge(Config)), ?assertEqual([], get_pulsar_producers()), + ?assertMatch({error, bridge_not_found}, delete_bridge(Config)), ok end, [] @@ -1028,8 +1039,8 @@ t_resource_manager_crash_before_producers_started(Config) -> #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, 10_000 ), - ?assertMatch(ok, delete_bridge(Config)), ?assertEqual([], get_pulsar_producers()), + ?assertMatch({error, bridge_not_found}, delete_bridge(Config)), ok end, [] @@ -1046,7 +1057,7 @@ t_strategy_key_validation(Config) -> <<"reason">> := <<"Message key cannot be empty", _/binary>> } }}}, - probe_bridge_api( + create_bridge_api( Config, #{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}} ) @@ -1060,7 +1071,7 @@ t_strategy_key_validation(Config) -> <<"reason">> := <<"Message key cannot be empty", _/binary>> } }}}, - create_bridge_api( + probe_bridge_api( Config, #{<<"strategy">> => <<"key_dispatch">>, <<"message">> => #{<<"key">> => <<>>}} ) @@ -1075,7 +1086,6 @@ do_t_cluster(Config) -> ?check_trace( begin MQTTTopic = ?config(mqtt_topic, Config), - ResourceId = resource_id(Config), Nodes = [N1, N2 | _] = cluster(Config), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), QoS = 0, @@ -1095,6 +1105,7 @@ do_t_cluster(Config) -> ), 25_000 ), + ResourceId = erpc:call(N1, ?MODULE, resource_id, [Config]), lists:foreach( fun(N) -> ?retry( @@ -1147,12 +1158,12 @@ t_resilience(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - ResourceId = resource_id(Config), ?check_trace( begin {ok, _} = create_bridge(Config), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + ResourceId = resource_id(Config), ?retry( _Sleep0 = 1_000, _Attempts0 = 20, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl index 3fb00632c..9a9741226 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl @@ -170,7 +170,7 @@ fields(Field) when -> emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(subscriber_source)); fields(What) -> - error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}). + error({?MODULE, missing_field_handler, What}). %% v2: api schema %% The parameter equals to %% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1 diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 23bc5a8b4..15b196af4 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -74,6 +74,8 @@ resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; resource_type(tdengine) -> emqx_bridge_tdengine_connector; +resource_type(pulsar) -> + emqx_bridge_pulsar_connector; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(s3) -> @@ -94,6 +96,8 @@ connector_impl_module(elasticsearch) -> emqx_bridge_es_connector; connector_impl_module(opents) -> emqx_bridge_opents_connector; +connector_impl_module(pulsar) -> + emqx_bridge_pulsar_connector; connector_impl_module(tdengine) -> emqx_bridge_tdengine_connector; connector_impl_module(rabbitmq) -> @@ -317,6 +321,14 @@ connector_structs() -> required => false } )}, + {pulsar, + mk( + hoconsc:map(name, ref(emqx_bridge_pulsar_connector_schema, "config_connector")), + #{ + desc => <<"Pulsar Connector Config">>, + required => false + } + )}, {rabbitmq, mk( hoconsc:map(name, ref(emqx_bridge_rabbitmq_connector_schema, "config_connector")), @@ -361,6 +373,7 @@ schema_modules() -> emqx_bridge_iotdb_connector, emqx_bridge_es_connector, emqx_bridge_rabbitmq_connector_schema, + emqx_bridge_pulsar_connector_schema, emqx_bridge_opents_connector, emqx_bridge_greptimedb, emqx_bridge_tdengine_connector, @@ -410,6 +423,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method), api_ref(emqx_bridge_opents_connector, <<"opents">>, Method), api_ref(emqx_bridge_rabbitmq_connector_schema, <<"rabbitmq">>, Method), + api_ref(emqx_bridge_pulsar_connector_schema, <<"pulsar">>, Method), api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"), api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method), api_ref(emqx_bridge_s3, <<"s3">>, Method ++ "_connector") diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index fc68bbd9d..430a74bdb 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -174,6 +174,8 @@ connector_type_to_bridge_types(opents) -> [opents]; connector_type_to_bridge_types(greptimedb) -> [greptimedb]; +connector_type_to_bridge_types(pulsar) -> + [pulsar_producer, pulsar]; connector_type_to_bridge_types(tdengine) -> [tdengine]; connector_type_to_bridge_types(rabbitmq) -> @@ -269,6 +271,7 @@ split_bridge_to_connector_and_action( #{<<"connector">> := ConnectorName0} -> ConnectorName0; _ -> generate_connector_name(ConnectorsMap, BridgeName, 0) end, + OrgActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeType), {ActionMap, ActionType, ActionOrSource} = case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of diff --git a/rel/i18n/emqx_bridge_pulsar.hocon b/rel/i18n/emqx_bridge_pulsar.hocon index e1b6153d3..913ab8d2a 100644 --- a/rel/i18n/emqx_bridge_pulsar.hocon +++ b/rel/i18n/emqx_bridge_pulsar.hocon @@ -1,180 +1,173 @@ emqx_bridge_pulsar { - auth_basic { - desc = "Parameters for basic authentication." - label = "Basic auth params" - } - auth_basic_password { - desc = "Basic authentication password." - label = "Password" - } +config_connector.desc: +"""Pulsar connector config""" +config_connector.label: +"""Pulsar Connector""" - auth_basic_username { - desc = "Basic authentication username." - label = "Username" - } +connector_resource_opts.desc: +"""Pulsar connector resource options""" +connector_resource_opts.label: +"""Resource Options""" - auth_token { - desc = "Parameters for token authentication." - label = "Token auth params" - } +auth_basic.desc: + """Parameters for basic authentication.""" +auth_basic.label: +"""Basic auth params""" - auth_token_jwt { - desc = "JWT authentication token." - label = "JWT" - } +auth_basic_password.desc: +"""Basic authentication password.""" +auth_basic_password.label: +"""Password""" - authentication { - desc = "Authentication configs." - label = "Authentication" - } +auth_basic_username.desc: +"""Basic authentication username.""" +auth_basic_username.label: +"""Username""" - buffer_memory_overload_protection { - desc = "Applicable when buffer mode is set to memory\n" - "EMQX will drop old buffered messages under high memory pressure." - " The high memory threshold is defined in config sysmon.os.sysmem_high_watermark." - " NOTE: This config only works on Linux." - label = "Memory Overload Protection" - } +auth_token.desc: +"""Parameters for token authentication.""" +auth_token.label: +"""Token auth params""" - buffer_mode { - desc = "Message buffer mode.\n" - "memory: Buffer all messages in memory. The messages will be lost" - " in case of EMQX node restart\ndisk: Buffer all messages on disk." - " The messages on disk are able to survive EMQX node restart.\n" - "hybrid: Buffer message in memory first, when up to certain limit" - " (see segment_bytes config for more information), then start offloading" - " messages to disk, Like memory mode, the messages will be lost in" - " case of EMQX node restart." - label = "Buffer Mode" - } +auth_token_jwt.desc: +"""JWT authentication token.""" +auth_token_jwt.label: +"""JWT""" - buffer_per_partition_limit { - desc = "Number of bytes allowed to buffer for each Pulsar partition." - " When this limit is exceeded, old messages will be dropped in a trade for credits" - " for new messages to be buffered." - label = "Per-partition Buffer Limit" - } +authentication.desc: +"""Authentication configs.""" +authentication.label: +"""Authentication""" - buffer_segment_bytes { - desc = "Applicable when buffer mode is set to disk or hybrid.\n" - "This value is to specify the size of each on-disk buffer file." - label = "Segment File Bytes" - } +buffer_memory_overload_protection.desc: +"""Applicable when buffer mode is set to memory +EMQX will drop old buffered messages under high memory pressure. +The high memory threshold is defined in config sysmon.os.sysmem_high_watermark. + NOTE: This config only works on Linux.""" +buffer_memory_overload_protection.label: +"""Memory Overload Protection""" - config_enable { - desc = "Enable (true) or disable (false) this Pulsar bridge." - label = "Enable or Disable" - } +buffer_mode.desc: +"""Message buffer mode. +memory: Buffer all messages in memory. The messages will be lost + in case of EMQX node restart\ndisk: Buffer all messages on disk. + The messages on disk are able to survive EMQX node restart. +hybrid: Buffer message in memory first, when up to certain limit + (see segment_bytes config for more information), then start offloading + messages to disk, Like memory mode, the messages will be lost in + case of EMQX node restart.""" +buffer_mode.label: +"""Buffer Mode""" - connect_timeout { - desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)." - label = "Connect Timeout" - } +buffer_per_partition_limit.desc: +"""Number of bytes allowed to buffer for each Pulsar partition. + When this limit is exceeded, old messages will be dropped in a trade for credits + for new messages to be buffered.""" + buffer_per_partition_limit.label: +"""Per-partition Buffer Limit""" - desc_name { - desc = "Action name, a human-readable identifier." - label = "Action Name" - } +desc_name.desc: +"""Action name, a human-readable identifier.""" +desc_name.label: +"""Action Name""" - desc_type { - desc = "The Bridge Type" - label = "Bridge Type" - } +buffer_segment_bytes.desc: +"""Applicable when buffer mode is set to disk or hybrid. +This value is to specify the size of each on-disk buffer file.""" +buffer_segment_bytes.label: +"""Segment File Bytes""" - producer_batch_size { - desc = "Maximum number of individual requests to batch in a Pulsar message." - label = "Batch size" - } +config_enable.desc: +"""Enable (true) or disable (false) this Pulsar bridge.""" +config_enable.label: +"""Enable or Disable""" - producer_buffer { - desc = "Configure producer message buffer.\n\n" - "Tell Pulsar producer how to buffer messages when EMQX has more messages to" - " send than Pulsar can keep up, or when Pulsar is down." - label = "Message Buffer" - } +connect_timeout.desc: +"""Maximum wait time for TCP connection establishment (including authentication time if enabled).""" +connect_timeout.label: +"""Connect Timeout""" - producer_compression { - desc = "Compression method." - label = "Compression" - } +desc_name.desc: +"""Bridge name, used as a human-readable description of the bridge.""" +desc_name.label: +"""Bridge Name""" - producer_key_template { - desc = "Template to render Pulsar message key." - label = "Message Key" - } +desc_type.desc: +"""The Bridge Type""" +desc_type.label: +"""Bridge Type""" - producer_local_topic { - desc = "MQTT topic or topic filter as data source (bridge input)." - " If rule action is used as data source, this config should be left empty," - " otherwise messages will be duplicated in Pulsar." - label = "Source MQTT Topic" - } +producer_batch_size.desc: +"""Maximum number of individual requests to batch in a Pulsar message.""" +producer_batch_size.label: +"""Batch size""" - producer_max_batch_bytes { - desc = "Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers" - " default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in" - " order to compensate Pulsar message encoding overheads (especially when each individual" - " message is very small). When a single message is over the limit, it is still" - " sent (as a single element batch)." - label = "Max Batch Bytes" - } +producer_buffer.desc: +"""Configure producer message buffer." +Tell Pulsar producer how to buffer messages when EMQX has more messages to" + send than Pulsar can keep up, or when Pulsar is down.""" +producer_buffer.label: +"""Message Buffer""" - producer_message_opts { - desc = "Template to render a Pulsar message." - label = "Pulsar Message Template" - } +producer_compression.desc: +"""Compression method.""" +producer_compression.label: +"""Compression""" - producer_pulsar_message { - desc = "Template to render a Pulsar message." - label = "Pulsar Message Template" - } +producer_local_topic.desc: +"""MQTT topic or topic filter as data source (bridge input) + If rule action is used as data source, this config should be left empty, + otherwise messages will be duplicated in Pulsar.""" +producer_local_topic.label: +"""Source MQTT Topic""" - producer_pulsar_topic { - desc = "Pulsar topic name" - label = "Pulsar topic name" - } +producer_max_batch_bytes.desc: +"""Maximum bytes to collect in a Pulsar message batch. Most of the Pulsar brokers + default to a limit of 5 MB batch size. EMQX's default value is less than 5 MB in + order to compensate Pulsar message encoding overheads (especially when each individual + message is very small). When a single message is over the limit, it is still + sent (as a single element batch).""" +producer_max_batch_bytes.label: +"""Max Batch Bytes""" - producer_retention_period { - desc = "The amount of time messages will be buffered while there is no connection to" - " the Pulsar broker. Longer times mean that more memory/disk will be used" - label = "Retention Period" - } - producer_send_buffer { - desc = "Fine tune the socket send buffer. The default value is tuned for high throughput." - label = "Socket Send Buffer Size" - } +producer_pulsar_topic.desc: +"""Pulsar topic name""" +producer_pulsar_topic.label: +"""Pulsar topic name""" - producer_strategy { - desc = "Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions.\n" - "\n" - "random: Randomly pick a partition for each message.\n" - "roundrobin: Pick each available producer in turn for each message.\n" - "key_dispatch: Hash Pulsar message key of the first message in a batch" - " to a partition number." - label = "Partition Strategy" - } +producer_retention_period.desc: +"""The amount of time messages will be buffered while there is no connection to + the Pulsar broker. Longer times mean that more memory/disk will be used""" +producer_retention_period.label: +"""Retention Period""" - producer_sync_timeout { - desc = "Maximum wait time for receiving a receipt from Pulsar when publishing synchronously." - label = "Sync publish timeout" - } +producer_send_buffer.desc: +"""Fine tune the socket send buffer. The default value is tuned for high throughput.""" +producer_send_buffer.label: +"""Socket Send Buffer Size""" - producer_value_template { - desc = "Template to render Pulsar message value." - label = "Message Value" - } +producer_strategy.desc: +"""Partition strategy is to tell the producer how to dispatch messages to Pulsar partitions. - pulsar_producer_struct { - desc = "Configuration for a Pulsar bridge." - label = "Pulsar Bridge Configuration" - } +random: Randomly pick a partition for each message. +roundrobin: Pick each available producer in turn for each message. +key_dispatch: Hash Pulsar message key of the first message in a batch + to a partition number.""" +producer_strategy.label: +"""Partition Strategy""" + +pulsar_producer_struct.desc: +"""Configuration for a Pulsar bridge.""" +pulsar_producer_struct.label: +"""Pulsar Bridge Configuration""" + +servers.desc: +"""A comma separated list of Pulsar URLs in the form scheme://host[:port] + for the client to connect to. The supported schemes are pulsar:// (default) + and pulsar+ssl://. The default port is 6650.""" +servers.label: +"""Servers""" - servers { - desc = "A comma separated list of Pulsar URLs in the form scheme://host[:port]" - " for the client to connect to. The supported schemes are pulsar:// (default)" - " and pulsar+ssl://. The default port is 6650." - label = "Servers" - } } diff --git a/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon b/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon new file mode 100644 index 000000000..a359bc755 --- /dev/null +++ b/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon @@ -0,0 +1,38 @@ +emqx_bridge_pulsar_pubsub_schema { + +action_parameters.desc: +"""Action specific configs.""" +action_parameters.label: +"""Action""" + +publisher_action.desc: +"""Publish message to pulsar topic""" +publisher_action.label: +"""Publish Action """ + +producer_sync_timeout.desc: +"""Maximum wait time for receiving a receipt from Pulsar when publishing synchronously.""" +producer_sync_timeout.label: +"""Sync publish timeout""" + +producer_key_template.desc: +"""Template to render Pulsar message key.""" +producer_key_template.label: +"""Message Key""" + +producer_value_template.desc: +"""Template to render Pulsar message value.""" +producer_value_template.label: +"""Message Value""" + +producer_message_opts.desc: +"""Template to render a Pulsar message.""" +producer_message_opts.label: +"""Pulsar Message Template""" + +producer_pulsar_message.desc: +"""Template to render a Pulsar message.""" +producer_pulsar_message.label: +"""Pulsar Message Template""" + +} From 650f9659a418c129d7783e2ad87349b7626c7aa6 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 27 Feb 2024 13:32:30 +0800 Subject: [PATCH 2/4] fix: create pulsar producer when on_add_channel --- .../src/emqx_bridge_pulsar.erl | 35 ++- .../src/emqx_bridge_pulsar_action_info.erl | 31 +-- .../src/emqx_bridge_pulsar_connector.erl | 202 +++++++++--------- .../emqx_bridge_pulsar_connector_schema.erl | 13 +- .../src/emqx_bridge_pulsar_pubsub_schema.erl | 16 +- .../emqx_bridge_pulsar_connector_SUITE.erl | 55 +++-- 6 files changed, 183 insertions(+), 169 deletions(-) diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 291c656ef..626ad55cb 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -33,7 +33,6 @@ roots() -> fields(pulsar_producer) -> fields(config) ++ emqx_bridge_pulsar_pubsub_schema:fields(action_parameters) ++ - fields(producer_opts) ++ [ {local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})}, @@ -85,6 +84,7 @@ fields(config) -> ] ++ emqx_connector_schema_lib:ssl_fields(); fields(producer_opts) -> [ + {pulsar_topic, mk(string(), #{required => true, desc => ?DESC("producer_pulsar_topic")})}, {batch_size, mk( pos_integer(), @@ -110,7 +110,6 @@ fields(producer_opts) -> emqx_schema:bytesize(), #{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")} )}, - {pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})}, {strategy, mk( hoconsc:enum([random, roundrobin, key_dispatch]), @@ -202,7 +201,37 @@ conn_bridge_examples(_Method) -> #{ <<"pulsar_producer">> => #{ summary => <<"Pulsar Producer Bridge">>, - value => #{todo => true} + value => #{ + <<"authentication">> => <<"none">>, + <<"batch_size">> => 1, + <<"buffer">> => + #{ + <<"memory_overload_protection">> => true, + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"10MB">>, + <<"segment_bytes">> => <<"5MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"enable">> => true, + <<"local_topic">> => <<"mqtt/topic/-576460752303423482">>, + <<"max_batch_bytes">> => <<"900KB">>, + <<"message">> => + #{<<"key">> => <<"${.clientid}">>, <<"value">> => <<"${.}">>}, + <<"name">> => <<"pulsar_example_name">>, + <<"pulsar_topic">> => <<"pulsar_example_topic">>, + <<"retention_period">> => <<"infinity">>, + <<"send_buffer">> => <<"1MB">>, + <<"servers">> => <<"pulsar://127.0.0.1:6650">>, + <<"ssl">> => + #{ + <<"enable">> => false, + <<"server_name_indication">> => <<"auto">>, + <<"verify">> => <<"verify_none">> + }, + <<"strategy">> => <<"key_dispatch">>, + <<"sync_timeout">> => <<"5s">>, + <<"type">> => <<"pulsar_producer">> + } } } ]. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl index f51ed7884..6d15687f6 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl @@ -11,8 +11,7 @@ action_type_name/0, connector_type_name/0, schema_module/0, - is_action/1, - action_convert_from_connector/2 + is_action/1 ]). is_action(_) -> true. @@ -24,31 +23,3 @@ action_type_name() -> pulsar. connector_type_name() -> pulsar. schema_module() -> emqx_bridge_pulsar_pubsub_schema. - -action_convert_from_connector(ConnectorConfig, ActionConfig) -> - Dispatch = emqx_utils_conv:bin(maps:get(<<"strategy">>, ConnectorConfig, <<>>)), - case Dispatch of - <<"key_dispatch">> -> - case emqx_utils_maps:deep_find([<<"parameters">>, <<"message">>], ActionConfig) of - {ok, Message} -> - Validator = - #{ - <<"strategy">> => key_dispatch, - <<"message">> => emqx_utils_maps:binary_key_map(Message) - }, - case emqx_bridge_pulsar:producer_strategy_key_validator(Validator) of - ok -> - ActionConfig; - {error, Reason} -> - throw(#{ - reason => Reason, - kind => validation_error - }) - end; - {not_found, _, _} -> - %% no message field, use the default message template - ActionConfig - end; - _ -> - ActionConfig - end. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 7b080d0e6..c88716abc 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -25,13 +25,11 @@ -type pulsar_client_id() :: atom(). -type state() :: #{ - pulsar_client_id := pulsar_client_id(), - producers := pulsar_producers:producers(), - channels := map() + client_id := pulsar_client_id(), + channels := map(), + client_opts := map() }. --type buffer_mode() :: memory | disk | hybrid. --type compression_mode() :: no_compression | snappy | zlib. --type partition_strategy() :: random | roundrobin | key_dispatch. + -type message_template_raw() :: #{ key := binary(), value := binary() @@ -42,25 +40,9 @@ }. -type config() :: #{ authentication := _, - batch_size := pos_integer(), bridge_name := atom(), - buffer := #{ - mode := buffer_mode(), - per_partition_limit := emqx_schema:bytesize(), - segment_bytes := emqx_schema:bytesize(), - memory_overload_protection := boolean() - }, - compression := compression_mode(), - connect_timeout := emqx_schema:duration_ms(), - max_batch_bytes := emqx_schema:bytesize(), - message := message_template_raw(), - pulsar_topic := binary(), - retention_period := infinity | emqx_schema:duration_ms(), - send_buffer := emqx_schema:bytesize(), servers := binary(), - ssl := _, - strategy := partition_strategy(), - sync_timeout := emqx_schema:duration_ms() + ssl := _ }. %% Allocatable resources @@ -116,54 +98,57 @@ on_start(InstanceId, Config) -> end, throw(Message) end, - start_producer(Config, InstanceId, ClientId, ClientOpts). + {ok, #{channels => #{}, client_id => ClientId, client_opts => ClientOpts}}. on_add_channel( - _InstanceId, - #{channels := Channels} = State, + InstanceId, + #{channels := Channels, client_id := ClientId, client_opts := ClientOpts} = State, ChannelId, - #{parameters := #{message := Message, sync_timeout := SyncTimeout}} + #{parameters := #{message := Message, sync_timeout := SyncTimeout} = Params} ) -> case maps:is_key(ChannelId, Channels) of true -> - {error, already_exists}; + {error, channel_already_exists}; false -> + {ok, Producers} = start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params), Parameters = #{ message => compile_message_template(Message), - sync_timeout => SyncTimeout + sync_timeout => SyncTimeout, + producers => Producers }, NewChannels = maps:put(ChannelId, Parameters, Channels), {ok, State#{channels => NewChannels}} end. -on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> - {ok, State#{channels => maps:remove(ChannelId, Channels)}}. +on_remove_channel(InstanceId, State, ChannelId) -> + #{channels := Channels, client_id := ClientId} = State, + case maps:find(ChannelId, Channels) of + {ok, #{producers := Producers}} -> + stop_producers(ClientId, Producers), + emqx_resource:deallocate_resource(InstanceId, {?pulsar_producers, ChannelId}), + {ok, State#{channels => maps:remove(ChannelId, Channels)}}; + error -> + {ok, State} + end. on_get_channels(InstanceId) -> emqx_bridge_v2:get_channels_for_connector(InstanceId). -spec on_stop(resource_id(), state()) -> ok. on_stop(InstanceId, _State) -> - case emqx_resource:get_allocated_resources(InstanceId) of - #{?pulsar_client_id := ClientId, ?pulsar_producers := Producers} -> - stop_producers(ClientId, Producers), + Resources0 = emqx_resource:get_allocated_resources(InstanceId), + case maps:take(?pulsar_client_id, Resources0) of + {ClientId, Resources} -> + maps:foreach( + fun({?pulsar_producers, _BridgeV2Id}, Producers) -> + stop_producers(ClientId, Producers) + end, + Resources + ), stop_client(ClientId), - ?tp(pulsar_bridge_stopped, #{ - instance_id => InstanceId, - pulsar_client_id => ClientId, - pulsar_producers => Producers - }), - ok; - #{?pulsar_client_id := ClientId} -> - stop_client(ClientId), - ?tp(pulsar_bridge_stopped, #{ - instance_id => InstanceId, - pulsar_client_id => ClientId, - pulsar_producers => undefined - }), - ok; - _ -> ?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}), + ok; + error -> ok end. @@ -172,35 +157,32 @@ on_stop(InstanceId, _State) -> %% `emqx_resource_manager' will kill the Pulsar producers and messages might be lost. -spec on_get_status(resource_id(), state()) -> connected | connecting. on_get_status(_InstanceId, State = #{}) -> - #{ - pulsar_client_id := ClientId, - producers := Producers - } = State, + #{client_id := ClientId} = State, case pulsar_client_sup:find_client(ClientId) of {ok, Pid} -> try pulsar_client:get_status(Pid) of - true -> - get_producer_status(Producers); - false -> - connecting + true -> ?status_connected; + false -> ?status_connecting catch error:timeout -> - connecting; + ?status_connecting; exit:{noproc, _} -> - connecting + ?status_connecting end; {error, _} -> - connecting + ?status_connecting end; on_get_status(_InstanceId, _State) -> %% If a health check happens just after a concurrent request to %% create the bridge is not quite finished, `State = undefined'. - connecting. + ?status_connecting. on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> - case maps:is_key(ChannelId, Channels) of - true -> connected; - false -> {error, channel_not_exists} + case maps:find(ChannelId, Channels) of + {ok, #{producers := Producers}} -> + get_producer_status(Producers); + error -> + {error, channel_not_exists} end. -spec on_query(resource_id(), tuple(), state()) -> @@ -208,11 +190,11 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> | {error, timeout} | {error, term()}. on_query(_InstanceId, {ChannelId, Message}, State) -> - #{producers := Producers, channels := Channels} = State, + #{channels := Channels} = State, case maps:find(ChannelId, Channels) of error -> {error, channel_not_exists}; - {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout}} -> + {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} -> PulsarMessage = render_message(Message, MessageTmpl), try pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) @@ -227,11 +209,11 @@ on_query(_InstanceId, {ChannelId, Message}, State) -> ) -> {ok, pid()}. on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> - #{producers := Producers, channels := Channels} = State, + #{channels := Channels} = State, case maps:find(ChannelId, Channels) of error -> {error, channel_not_exists}; - {ok, #{message := MessageTmpl}} -> + {ok, #{message := MessageTmpl, producers := Producers}} -> ?tp_span( pulsar_producer_on_query_async, #{instance_id => _InstanceId, message => Message}, @@ -299,18 +281,22 @@ conn_opts(#{authentication := #{jwt := JWT}}) -> replayq_dir(ClientId) -> filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]). --spec producer_name(pulsar_client_id()) -> atom(). -producer_name(ClientId) -> - ClientIdBin = emqx_utils_conv:bin(ClientId), - binary_to_atom( - iolist_to_binary([ - <<"producer-">>, - ClientIdBin - ]) - ). +producer_name(InstanceId, ChannelId) -> + case is_dry_run(InstanceId) of + %% do not create more atom + true -> + pulsar_producer_probe_worker; + false -> + ChannelIdBin = emqx_utils_conv:bin(ChannelId), + binary_to_atom( + iolist_to_binary([ + <<"producer-">>, + ChannelIdBin + ]) + ) + end. --spec start_producer(config(), resource_id(), pulsar_client_id(), map()) -> {ok, state()}. -start_producer(Config, InstanceId, ClientId, ClientOpts) -> +start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) -> #{ conn_opts := ConnOpts, ssl_opts := SSLOpts @@ -325,16 +311,16 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> }, compression := Compression, max_batch_bytes := MaxBatchBytes, - pulsar_topic := PulsarTopic0, + pulsar_topic := PulsarTopic, retention_period := RetentionPeriod, send_buffer := SendBuffer, strategy := Strategy - } = Config, + } = Params, {OffloadMode, ReplayQDir} = case BufferMode of memory -> {false, false}; - disk -> {false, replayq_dir(ClientId)}; - hybrid -> {true, replayq_dir(ClientId)} + disk -> {false, replayq_dir(ChannelId)}; + hybrid -> {true, replayq_dir(ChannelId)} end, MemOLP = case os:type() of @@ -348,7 +334,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> replayq_seg_bytes => SegmentBytes, drop_if_highmem => MemOLP }, - ProducerName = producer_name(ClientId), + ProducerName = producer_name(InstanceId, ChannelId), ?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}), ProducerOpts0 = #{ @@ -363,19 +349,17 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> tcp_opts => [{sndbuf, SendBuffer}] }, ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0), - PulsarTopic = binary_to_list(PulsarTopic0), ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}), try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_producers, Producers), + ok = emqx_resource:allocate_resource( + InstanceId, + {?pulsar_producers, ChannelId}, + Producers + ), ?tp(pulsar_producer_producers_allocated, #{}), - State = #{ - pulsar_client_id => ClientId, - producers => Producers, - channels => #{} - }, ?tp(pulsar_producer_bridge_started, #{}), - {ok, State} + {ok, Producers} catch Kind:Error:Stacktrace -> ?tp( @@ -388,7 +372,10 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> stacktrace => Stacktrace } ), - stop_client(ClientId), + ?tp(pulsar_bridge_producer_stopped, #{ + pulsar_client_id => ClientId, + producers => undefined + }), throw(failed_to_start_pulsar_producer) end. @@ -412,7 +399,10 @@ stop_producers(ClientId, Producers) -> _ = log_when_error( fun() -> ok = pulsar:stop_and_delete_supervised_producers(Producers), - ?tp(pulsar_bridge_producer_stopped, #{pulsar_client_id => ClientId}), + ?tp(pulsar_bridge_producer_stopped, #{ + pulsar_client_id => ClientId, + producers => Producers + }), ok end, #{ @@ -467,15 +457,19 @@ get_producer_status(Producers) -> do_get_producer_status(Producers, 0). do_get_producer_status(_Producers, TimeSpent) when TimeSpent > ?HEALTH_CHECK_RETRY_TIMEOUT -> - connecting; + ?status_connecting; do_get_producer_status(Producers, TimeSpent) -> - case pulsar_producers:all_connected(Producers) of + try pulsar_producers:all_connected(Producers) of true -> - connected; + ?status_connected; false -> Sleep = 200, timer:sleep(Sleep), do_get_producer_status(Producers, TimeSpent + Sleep) + %% producer crashed with badarg. will recover later + catch + error:badarg -> + ?status_connecting end. partition_strategy(key_dispatch) -> first_key_dispatch; @@ -485,17 +479,17 @@ is_sensitive_key(auth_data) -> true; is_sensitive_key(_) -> false. get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) -> - Iter = maps:iterator(BrokerErrorMap), - do_get_error_message(Iter); + Iterator = maps:iterator(BrokerErrorMap), + do_get_error_message(Iterator); get_error_message(_Error) -> error. -do_get_error_message(Iter) -> - case maps:next(Iter) of - {{_Broker, _Port}, #{message := Message}, _NIter} -> +do_get_error_message(Iterator) -> + case maps:next(Iterator) of + {{_Broker, _Port}, #{message := Message}, _NIterator} -> {ok, Message}; - {_K, _V, NIter} -> - do_get_error_message(NIter); + {_K, _V, NIterator} -> + do_get_error_message(NIterator); none -> error end. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl index 953318e0a..f8b7e3909 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl @@ -18,9 +18,8 @@ namespace() -> ?TYPE. roots() -> []. fields("config_connector") -> - lists:keydelete(enable, 1, emqx_bridge_schema:common_bridge_fields()) ++ - emqx_bridge_pulsar:fields(config) ++ - emqx_bridge_pulsar:fields(producer_opts) ++ + emqx_bridge_schema:common_bridge_fields() ++ + lists:keydelete(enable, 1, emqx_bridge_pulsar:fields(config)) ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields(connector_resource_opts) -> emqx_connector_schema:resource_opts_fields(); @@ -59,13 +58,5 @@ connector_example_values() -> servers => <<"pulsar://127.0.0.1:6650">>, authentication => none, connect_timeout => <<"5s">>, - batch_size => 10, - compression => no_compression, - send_buffer => <<"1MB">>, - retention_period => <<"100s">>, - max_batch_bytes => <<"32MB">>, - pulsar_topic => <<"test_topic">>, - strategy => random, - buffer => #{mode => memory}, ssl => #{enable => false} }. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl index a705ed560..9565aa5bc 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl @@ -32,22 +32,23 @@ fields(publisher_action) -> ?R_REF(action_parameters), #{ required => true, - desc => ?DESC(action_parameters) + desc => ?DESC(action_parameters), + validator => fun emqx_bridge_pulsar:producer_strategy_key_validator/1 } ), #{resource_opts_ref => ?R_REF(action_resource_opts)} ); fields(action_parameters) -> [ - {sync_timeout, - ?HOCON(emqx_schema:timeout_duration_ms(), #{ - default => <<"3s">>, desc => ?DESC("producer_sync_timeout") - })}, {message, ?HOCON(?R_REF(producer_pulsar_message), #{ required => false, desc => ?DESC("producer_message_opts") + })}, + {sync_timeout, + ?HOCON(emqx_schema:timeout_duration_ms(), #{ + default => <<"3s">>, desc => ?DESC("producer_sync_timeout") })} - ]; + ] ++ emqx_bridge_pulsar:fields(producer_opts); fields(producer_pulsar_message) -> [ {key, @@ -114,7 +115,8 @@ bridge_v2_examples(Method) -> message => #{ key => <<"${.clientid}">>, value => <<"${.}">> - } + }, + pulsar_topic => <<"test_topic">> } } ) diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl index c9b25cc71..b3c351da0 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl @@ -599,13 +599,7 @@ t_start_and_produce_ok(Config) -> _Sleep = 100, _Attempts0 = 20, begin - BridgeId = emqx_bridge_resource:bridge_id( - <<"pulsar">>, ?config(pulsar_name, Config) - ), - ConnectorId = emqx_bridge_resource:resource_id( - <<"pulsar">>, ?config(pulsar_name, Config) - ), - Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>, + Id = get_channel_id(Config), ?assertMatch( #{ counters := #{ @@ -634,6 +628,15 @@ t_start_and_produce_ok(Config) -> ), ok. +get_channel_id(Config) -> + BridgeId = emqx_bridge_resource:bridge_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + ConnectorId = emqx_bridge_resource:resource_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + <<"action:", BridgeId/binary, ":", ConnectorId/binary>>. + %% Under normal operations, the bridge will be called async via %% `simple_async_query'. t_sync_query(Config) -> @@ -900,7 +903,7 @@ t_failure_to_start_producer(Config) -> {{ok, _}, {ok, _}} = ?wait_async_action( create_bridge(Config), - #{?snk_kind := pulsar_bridge_client_stopped}, + #{?snk_kind := pulsar_bridge_producer_stopped}, 20_000 ), ok @@ -928,6 +931,8 @@ t_producer_process_crash(Config) -> #{?snk_kind := pulsar_producer_bridge_started}, 10_000 ), + ResourceId = resource_id(Config), + ChannelId = get_channel_id(Config), [ProducerPid | _] = [ Pid || {_Name, PS, _Type, _Mods} <- supervisor:which_children(pulsar_producers_sup), @@ -944,17 +949,23 @@ t_producer_process_crash(Config) -> ok after 1_000 -> ct:fail("pid didn't die") end, - ResourceId = resource_id(Config), ?retry( _Sleep0 = 50, _Attempts0 = 50, - ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual( + #{error => <<"Not connected for unknown reason">>, status => connecting}, + emqx_resource_manager:channel_health_check(ResourceId, ChannelId) + ) ), + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), %% Should recover given enough time. ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual( + #{error => undefined, status => connected}, + emqx_resource_manager:channel_health_check(ResourceId, ChannelId) + ) ), {_, {ok, _}} = ?wait_async_action( @@ -1002,8 +1013,8 @@ t_resource_manager_crash_after_producers_started(Config) -> {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = ?wait_async_action( create_bridge(Config), - #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := Producers} when - Producers =/= undefined, + #{?snk_kind := pulsar_bridge_stopped, instance_id := InstanceId} when + InstanceId =/= undefined, 10_000 ), ?assertEqual([], get_pulsar_producers()), @@ -1036,7 +1047,7 @@ t_resource_manager_crash_before_producers_started(Config) -> {{error, {config_update_crashed, _}}, {ok, _}} = ?wait_async_action( create_bridge(Config), - #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, + #{?snk_kind := pulsar_bridge_stopped}, 10_000 ), ?assertEqual([], get_pulsar_producers()), @@ -1236,3 +1247,19 @@ t_resilience(Config) -> [] ), ok. + +get_producers_config(ConnectorId, ChannelId) -> + [ + #{ + state := + #{ + channels := + #{ChannelId := #{producers := Producers}} + } + } + ] = + lists:filter( + fun(#{id := Id}) -> Id =:= ConnectorId end, + emqx_resource_manager:list_all() + ), + Producers. From e9e1daf96201fbef33a6813ba14703f3adc56009 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 29 Feb 2024 11:44:50 +0800 Subject: [PATCH 3/4] chore: update some pulsar's desc --- .../emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl | 6 +++--- .../src/emqx_bridge_pulsar_pubsub_schema.erl | 3 +-- rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index c88716abc..8c39c3671 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -182,7 +182,7 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> {ok, #{producers := Producers}} -> get_producer_status(Producers); error -> - {error, channel_not_exists} + {error, channel_not_found} end. -spec on_query(resource_id(), tuple(), state()) -> @@ -193,7 +193,7 @@ on_query(_InstanceId, {ChannelId, Message}, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of error -> - {error, channel_not_exists}; + {error, channel_not_found}; {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} -> PulsarMessage = render_message(Message, MessageTmpl), try @@ -212,7 +212,7 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of error -> - {error, channel_not_exists}; + {error, channel_not_found}; {ok, #{message := MessageTmpl, producers := Producers}} -> ?tp_span( pulsar_producer_on_query_async, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl index 9565aa5bc..ccf985ba8 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl @@ -11,7 +11,6 @@ -export([bridge_v2_examples/1]). -define(ACTION_TYPE, pulsar). --define(CONNECTOR_SCHEMA, emqx_bridge_rabbitmq_connector_schema). namespace() -> "pulsar". @@ -94,7 +93,7 @@ desc(action_parameters) -> desc(producer_pulsar_message) -> ?DESC("producer_message_opts"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> - ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; + ["Configuration for Pulsar Producer using `", string:to_upper(Method), "` method."]; desc(publisher_action) -> ?DESC(publisher_action); desc(_) -> diff --git a/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon b/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon index a359bc755..f0708aee3 100644 --- a/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon +++ b/rel/i18n/emqx_bridge_pulsar_pubsub_schema.hocon @@ -6,7 +6,7 @@ action_parameters.label: """Action""" publisher_action.desc: -"""Publish message to pulsar topic""" +"""Publish message to Pulsar topic""" publisher_action.label: """Publish Action """ From 3814203fa2abb2a7f824860b9f14b373a2a403e2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 29 Feb 2024 16:22:33 +0800 Subject: [PATCH 4/4] test(pulsar): add pulsar action v2 testcase --- .../test/emqx_bridge_pulsar_v2_SUITE.erl | 451 ++++++++++++++++++ 1 file changed, 451 insertions(+) create mode 100644 apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl new file mode 100644 index 000000000..69b384ab8 --- /dev/null +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl @@ -0,0 +1,451 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_v2_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(TYPE, <<"pulsar">>). +-define(APPS, [emqx_conf, emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]). +-define(RULE_TOPIC, "pulsar/rule"). +-define(RULE_TOPIC_BIN, <>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, plain}, + {group, tls} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {plain, AllTCs}, + {tls, AllTCs} + ]. + +init_per_suite(Config) -> + %% Ensure enterprise bridge module is loaded + _ = emqx_bridge_enterprise:module_info(), + {ok, Cwd} = file:get_cwd(), + PrivDir = ?config(priv_dir, Config), + WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd), + Apps = emqx_cth_suite:start( + lists:flatten([ + ?APPS, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ]), + #{work_dir => WorkDir} + ), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)). + +init_per_group(plain = Type, Config) -> + PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"), + PulsarPort = list_to_integer(os:getenv("PULSAR_PLAIN_PORT", "6652")), + ProxyName = "pulsar_plain", + case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of + true -> + Config1 = common_init_per_group(), + NewConfig = + [ + {proxy_name, ProxyName}, + {pulsar_host, PulsarHost}, + {pulsar_port, PulsarPort}, + {pulsar_type, Type}, + {use_tls, false} + | Config1 ++ Config + ], + create_connector(?MODULE, NewConfig), + NewConfig; + false -> + maybe_skip_without_ci() + end; +init_per_group(tls = Type, Config) -> + PulsarHost = os:getenv("PULSAR_TLS_HOST", "toxiproxy"), + PulsarPort = list_to_integer(os:getenv("PULSAR_TLS_PORT", "6653")), + ProxyName = "pulsar_tls", + case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of + true -> + Config1 = common_init_per_group(), + NewConfig = + [ + {proxy_name, ProxyName}, + {pulsar_host, PulsarHost}, + {pulsar_port, PulsarPort}, + {pulsar_type, Type}, + {use_tls, true} + | Config1 ++ Config + ], + create_connector(?MODULE, NewConfig), + NewConfig; + false -> + maybe_skip_without_ci() + end. + +end_per_group(Group, Config) when + Group =:= plain; + Group =:= tls +-> + common_end_per_group(Config), + ok. + +common_init_per_group() -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + UniqueNum = integer_to_binary(erlang:unique_integer()), + MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {mqtt_topic, MQTTTopic} + ]. + +common_end_per_group(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + ok. + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +end_per_testcase(_Testcase, Config) -> + case proplists:get_bool(skip_does_not_apply, Config) of + true -> + ok; + false -> + ok = emqx_config:delete_override_conf_files(), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges(), + stop_consumer(Config), + %% in CI, apparently this needs more time since the + %% machines struggle with all the containers running... + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + flush_consumed(), + ok + end. + +common_init_per_testcase(TestCase, Config0) -> + ct:timetrap(timer:seconds(60)), + emqx_bridge_v2_testlib:delete_all_bridges(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + PulsarTopic = + << + (atom_to_binary(TestCase))/binary, + UniqueNum/binary + >>, + Config1 = [{pulsar_topic, PulsarTopic} | Config0], + ConsumerConfig = start_consumer(TestCase, Config1), + Config = ConsumerConfig ++ Config1, + ok = snabbkaffe:start_trace(), + Config. + +create_connector(Name, Config) -> + Connector = pulsar_connector(Config), + {ok, _} = emqx_connector:create(?TYPE, Name, Connector). + +delete_connector(Name) -> + ok = emqx_connector:remove(?TYPE, Name). + +create_action(Name, Config) -> + Action = pulsar_action(Config), + {ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action). + +delete_action(Name) -> + ok = emqx_bridge_v2:remove(actions, ?TYPE, Name). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_action_probe(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + Action = pulsar_action(Config), + {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), + ?assertMatch({{_, 204, _}, _, _}, Res0), + ok. + +t_action(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_action(Name, Config), + Actions = emqx_bridge_v2:list(actions), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Actions), Actions), + Topic = <<"lkadfdaction">>, + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"", Topic/binary, "\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [<<"pulsar:", Name/binary>>], + description => <<"bridge_v2 send msg to pulsar action">> + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + MQTTClientID = <<"pulsar_mqtt_clientid">>, + {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]), + {ok, _} = emqtt:connect(C1), + ReqPayload = payload(), + ReqPayloadBin = emqx_utils_json:encode(ReqPayload), + {ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]), + [#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000), + ?assertEqual(MQTTClientID, ClientID), + ?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)), + ok = emqtt:disconnect(C1), + InstanceId = instance_id(actions, Name), + #{counters := Counters} = emqx_resource:get_metrics(InstanceId), + ok = delete_action(Name), + ActionsAfterDelete = emqx_bridge_v2:list(actions), + ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), + ?assertMatch( + #{ + dropped := 0, + success := 1, + matched := 1, + failed := 0, + received := 0 + }, + Counters + ), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +pulsar_connector(Config) -> + PulsarHost = ?config(pulsar_host, Config), + PulsarPort = ?config(pulsar_port, Config), + UseTLS = proplists:get_value(use_tls, Config, false), + Name = atom_to_binary(?MODULE), + Prefix = + case UseTLS of + true -> <<"pulsar+ssl://">>; + false -> <<"pulsar://">> + end, + ServerURL = iolist_to_binary([ + Prefix, + PulsarHost, + ":", + integer_to_binary(PulsarPort) + ]), + Connector = #{ + <<"connectors">> => #{ + <<"pulsar">> => #{ + Name => #{ + <<"enable">> => true, + <<"ssl">> => #{ + <<"enable">> => UseTLS, + <<"verify">> => <<"verify_none">>, + <<"server_name_indication">> => <<"auto">> + }, + <<"authentication">> => <<"none">>, + <<"servers">> => ServerURL + } + } + } + }, + parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name). + +pulsar_action(Config) -> + Name = atom_to_binary(?MODULE), + Action = #{ + <<"actions">> => #{ + <<"pulsar">> => #{ + Name => #{ + <<"connector">> => Name, + <<"enable">> => true, + <<"parameters">> => #{ + <<"retention_period">> => <<"infinity">>, + <<"max_batch_bytes">> => <<"1MB">>, + <<"batch_size">> => 100, + <<"strategy">> => <<"random">>, + <<"buffer">> => #{ + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"10MB">>, + <<"segment_bytes">> => <<"5MB">>, + <<"memory_overload_protection">> => true + }, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"value">> => <<"${.}">> + }, + <<"pulsar_topic">> => ?config(pulsar_topic, Config) + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"1s">> + } + } + } + } + }, + parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name). + +parse_and_check(Key, Mod, Conf, Name) -> + ConfStr = hocon_pp:do(Conf, #{}), + ct:pal(ConfStr), + {ok, RawConf} = hocon:binary(ConfStr, #{format => map}), + hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}), + #{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf, + RetConf. + +instance_id(Type, Name) -> + ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name), + BridgeId = emqx_bridge_resource:bridge_id(?TYPE, Name), + TypeBin = + case Type of + sources -> <<"source:">>; + actions -> <<"action:">> + end, + <>. + +start_consumer(TestCase, Config) -> + PulsarHost = ?config(pulsar_host, Config), + PulsarPort = ?config(pulsar_port, Config), + PulsarTopic = ?config(pulsar_topic, Config), + UseTLS = ?config(use_tls, Config), + Scheme = + case UseTLS of + true -> <<"pulsar+ssl://">>; + false -> <<"pulsar://">> + end, + URL = + binary_to_list( + <> + ), + ConsumerClientId = list_to_atom( + atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer()) + ), + CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"), + SSLOpts = #{ + enable => UseTLS, + keyfile => filename:join([CertsPath, "key.pem"]), + certfile => filename:join([CertsPath, "cert.pem"]), + cacertfile => filename:join([CertsPath, "cacert.pem"]) + }, + Opts = #{enable_ssl => UseTLS, ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)}, + {ok, _ClientPid} = pulsar:ensure_supervised_client(ConsumerClientId, [URL], Opts), + ConsumerOpts = Opts#{ + cb_init_args => #{send_to => self()}, + cb_module => pulsar_echo_consumer, + sub_type => 'Shared', + subscription => atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer()), + max_consumer_num => 1, + %% Note! This must not coincide with the client + %% id, or else weird bugs will happen, like the + %% consumer never starts... + name => list_to_atom("test_consumer" ++ integer_to_list(erlang:unique_integer())), + consumer_id => 1 + }, + {ok, Consumer} = pulsar:ensure_supervised_consumers( + ConsumerClientId, + PulsarTopic, + ConsumerOpts + ), + %% since connection is async, and there's currently no way to + %% specify the subscription initial position as `Earliest', we + %% need to wait until the consumer is connected to avoid + %% flakiness. + ok = wait_until_consumer_connected(Consumer), + [ + {consumer_client_id, ConsumerClientId}, + {pulsar_consumer, Consumer} + ]. + +stop_consumer(Config) -> + ConsumerClientId = ?config(consumer_client_id, Config), + Consumer = ?config(pulsar_consumer, Config), + ok = pulsar:stop_and_delete_supervised_consumers(Consumer), + ok = pulsar:stop_and_delete_supervised_client(ConsumerClientId), + ok. + +wait_until_consumer_connected(Consumer) -> + ?retry( + _Sleep = 300, + _Attempts0 = 20, + true = pulsar_consumers:all_connected(Consumer) + ), + ok. + +wait_until_producer_connected() -> + wait_until_connected(pulsar_producers_sup, pulsar_producer). + +wait_until_connected(SupMod, Mod) -> + Pids = get_pids(SupMod, Mod), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + begin + true = length(Pids) > 0, + lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids) + end + ), + ok. + +get_pulsar_producers() -> + get_pids(pulsar_producers_sup, pulsar_producer). + +get_pids(SupMod, Mod) -> + [ + P + || {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod), + P <- element(2, process_info(SupPid, links)), + case proc_lib:initial_call(P) of + {Mod, init, _} -> true; + _ -> false + end + ]. + +receive_consumed(Timeout) -> + receive + {pulsar_message, #{payloads := Payloads}} -> + lists:map(fun try_decode_json/1, Payloads) + after Timeout -> + ct:pal("mailbox: ~p", [process_info(self(), messages)]), + ct:fail("no message consumed") + end. + +flush_consumed() -> + receive + {pulsar_message, _} -> flush_consumed() + after 0 -> ok + end. + +try_decode_json(Payload) -> + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {error, _} -> + Payload; + {ok, JSON} -> + JSON + end. + +payload() -> + #{<<"key">> => 42, <<"data">> => <<"pulsar">>, <<"timestamp">> => 10000}. + +maybe_skip_without_ci() -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_pulsar); + _ -> + {skip, no_pulsar} + end.