diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index bdf43885a..59eb3f0fb 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -39,6 +39,7 @@ on_start/2, on_stop/2, on_query/3, + on_query_async/4, on_get_status/2 ]). @@ -190,6 +191,16 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), ok. +on_query_async( + _InstId, + {send_message, Msg}, + {ReplayFun, Args}, + #{name := InstanceId} +) -> + ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), + emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}), + ok. + on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> AutoReconn = maps:get(auto_reconnect, Conf, true), case emqx_connector_mqtt_worker:status(InstanceId) of diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 372405b59..7571c59b8 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -21,6 +21,7 @@ -export([ start/1, send/2, + send_async/3, stop/1, ping/1 ]). @@ -32,7 +33,6 @@ %% callbacks for emqtt -export([ - handle_puback/2, handle_publish/3, handle_disconnected/2 ]). @@ -134,44 +134,11 @@ safe_stop(Pid, StopF, Timeout) -> exit(Pid, kill) end. -send(Conn, Msgs) -> - send(Conn, Msgs, []). +send(#{client_pid := ClientPid}, Msg) -> + emqtt:publish(ClientPid, Msg). -send(_Conn, [], []) -> - %% all messages in the batch are QoS-0 - Ref = make_ref(), - %% QoS-0 messages do not have packet ID - %% the batch ack is simulated with a loop-back message - self() ! {batch_ack, Ref}, - {ok, Ref}; -send(_Conn, [], PktIds) -> - %% PktIds is not an empty list if there is any non-QoS-0 message in the batch, - %% And the worker should wait for all acks - {ok, PktIds}; -send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) -> - case emqtt:publish(ClientPid, Msg) of - ok -> - send(Conn, Rest, PktIds); - {ok, PktId} -> - send(Conn, Rest, [PktId | PktIds]); - {error, Reason} -> - %% NOTE: There is no partial success of a batch and recover from the middle - %% only to retry all messages in one batch - {error, Reason} - end. - -handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) when - RC =:= ?RC_SUCCESS; - RC =:= ?RC_NO_MATCHING_SUBSCRIBERS --> - Parent ! {batch_ack, PktId}, - ok; -handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> - ?SLOG(warning, #{ - msg => "publish_to_remote_node_falied", - packet_id => PktId, - reason_code => RC - }). +send_async(#{client_pid := ClientPid}, Msg, Callback) -> + emqtt:publish_async(ClientPid, Msg, Callback). handle_publish(Msg, undefined, _Opts) -> ?SLOG(error, #{ @@ -200,7 +167,6 @@ handle_disconnected(Reason, Parent) -> make_hdlr(Parent, Vars, Opts) -> #{ - puback => {fun ?MODULE:handle_puback/2, [Parent]}, publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} }. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 3f3a4b9ce..5e4fa8f72 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -91,16 +91,14 @@ ensure_stopped/1, status/1, ping/1, - send_to_remote/2 + send_to_remote/2, + send_to_remote_async/3 ]). -export([get_forwards/1]). -export([get_subscriptions/1]). -%% Internal --export([msg_marshaller/1]). - -export_type([ config/0, ack_ref/0 @@ -133,12 +131,6 @@ %% mountpoint: The topic mount point for messages sent to remote node/cluster %% `undefined', `<<>>' or `""' to disable %% forwards: Local topics to subscribe. -%% replayq.batch_bytes_limit: Max number of bytes to collect in a batch for each -%% send call towards emqx_bridge_connect -%% replayq.batch_count_limit: Max number of messages to collect in a batch for -%% each send call towards emqx_bridge_connect -%% replayq.dir: Directory where replayq should persist messages -%% replayq.seg_bytes: Size in bytes for each replayq segment file %% %% Find more connection specific configs in the callback modules %% of emqx_bridge_connect behaviour. @@ -173,9 +165,14 @@ ping(Name) -> gen_statem:call(name(Name), ping). send_to_remote(Pid, Msg) when is_pid(Pid) -> - gen_statem:cast(Pid, {send_to_remote, Msg}); + gen_statem:call(Pid, {send_to_remote, Msg}); send_to_remote(Name, Msg) -> - gen_statem:cast(name(Name), {send_to_remote, Msg}). + gen_statem:call(name(Name), {send_to_remote, Msg}). + +send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) -> + gen_statem:cast(Pid, {send_to_remote_async, Msg, Callback}); +send_to_remote_async(Name, Msg, Callback) -> + gen_statem:cast(name(Name), {send_to_remote_async, Msg, Callback}). %% @doc Return all forwards (local subscriptions). -spec get_forwards(id()) -> [topic()]. @@ -194,12 +191,10 @@ init(#{name := Name} = ConnectOpts) -> name => Name }), erlang:process_flag(trap_exit, true), - Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), State = init_state(ConnectOpts), self() ! idle, {ok, idle, State#{ - connect_opts => pre_process_opts(ConnectOpts), - replayq => Queue + connect_opts => pre_process_opts(ConnectOpts) }}. init_state(Opts) -> @@ -212,32 +207,11 @@ init_state(Opts) -> start_type => StartType, reconnect_interval => ReconnDelayMs, mountpoint => format_mountpoint(Mountpoint), - inflight => [], max_inflight => MaxInflightSize, connection => undefined, name => Name }. -open_replayq(Name, QCfg) -> - Dir = maps:get(dir, QCfg, undefined), - SegBytes = maps:get(seg_bytes, QCfg, ?DEFAULT_SEG_BYTES), - MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE), - QueueConfig = - case Dir =:= undefined orelse Dir =:= "" of - true -> - #{mem_only => true}; - false -> - #{ - dir => filename:join([Dir, node(), Name]), - seg_bytes => SegBytes, - max_total_size => MaxTotalSize - } - end, - replayq:open(QueueConfig#{ - sizer => fun emqx_connector_mqtt_msg:estimate_size/1, - marshaller => fun ?MODULE:msg_marshaller/1 - }). - pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) -> ConnectOpts#{ subscriptions => pre_process_in_out(in, InConf), @@ -276,9 +250,8 @@ pre_process_conf(Key, Conf) -> code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. -terminate(_Reason, _StateName, #{replayq := Q} = State) -> +terminate(_Reason, _StateName, State) -> _ = disconnect(State), - _ = replayq:close(Q), maybe_destroy_session(State). maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) -> @@ -322,15 +295,18 @@ connecting(#{reconnect_interval := ReconnectDelayMs} = State) -> {keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}} end. -connected(state_timeout, connected, #{inflight := Inflight} = State) -> - case retry_inflight(State#{inflight := []}, Inflight) of - {ok, NewState} -> - {keep_state, NewState, {next_event, internal, maybe_send}}; - {error, NewState} -> - {keep_state, NewState} +connected(state_timeout, connected, State) -> + %% nothing to do + {keep_state, State}; +connected({call, From}, {send_to_remote, Msg}, State) -> + case do_send(State, Msg) of + {ok, NState} -> + {keep_state, NState, [{reply, From, ok}]}; + {error, Reason} -> + {keep_state_and_data, [[reply, From, {error, Reason}]]} end; -connected(internal, maybe_send, State) -> - {_, NewState} = pop_and_send(State), +connected(cast, {send_to_remote_async, Msg, Callback}, State) -> + {_, NewState} = do_send_async(State, Msg, Callback), {keep_state, NewState}; connected( info, @@ -345,9 +321,6 @@ connected( false -> keep_state_and_data end; -connected(info, {batch_ack, Ref}, State) -> - NewState = handle_batch_ack(State, Ref), - {keep_state, NewState, {next_event, internal, maybe_send}}; connected(Type, Content, State) -> common(connected, Type, Content, State). @@ -368,9 +341,6 @@ common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; -common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) -> - NewQ = replayq:append(Q, [Msg]), - {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(StateName, Type, Content, #{name := Name} = State) -> ?SLOG(notice, #{ msg => "bridge_discarded_event", @@ -384,13 +354,12 @@ common(StateName, Type, Content, #{name := Name} = State) -> do_connect( #{ connect_opts := ConnectOpts, - inflight := Inflight, name := Name } = State ) -> case emqx_connector_mqtt_mod:start(ConnectOpts) of {ok, Conn} -> - ?tp(info, connected, #{name => Name, inflight => length(Inflight)}), + ?tp(info, connected, #{name => Name}), {ok, State#{connection => Conn}}; {error, Reason} -> ConnectOpts1 = obfuscate(ConnectOpts), @@ -402,39 +371,7 @@ do_connect( {error, Reason, State} end. -%% Retry all inflight (previously sent but not acked) batches. -retry_inflight(State, []) -> - {ok, State}; -retry_inflight(State, [#{q_ack_ref := QAckRef, msg := Msg} | Rest] = OldInf) -> - case do_send(State, QAckRef, Msg) of - {ok, State1} -> - retry_inflight(State1, Rest); - {error, #{inflight := NewInf} = State1} -> - {error, State1#{inflight := NewInf ++ OldInf}} - end. - -pop_and_send(#{inflight := Inflight, max_inflight := Max} = State) -> - pop_and_send_loop(State, Max - length(Inflight)). - -pop_and_send_loop(State, 0) -> - ?tp(debug, inflight_full, #{}), - {ok, State}; -pop_and_send_loop(#{replayq := Q} = State, N) -> - case replayq:is_empty(Q) of - true -> - ?tp(debug, replayq_drained, #{}), - {ok, State}; - false -> - BatchSize = 1, - Opts = #{count_limit => BatchSize, bytes_limit => 999999999}, - {Q1, QAckRef, [Msg]} = replayq:pop(Q, Opts), - case do_send(State#{replayq := Q1}, QAckRef, Msg) of - {ok, NewState} -> pop_and_send_loop(NewState, N - 1); - {error, NewState} -> {error, NewState} - end - end. - -do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) -> +do_send(#{connect_opts := #{forwards := undefined}}, Msg) -> ?SLOG(error, #{ msg => "cannot_forward_messages_to_remote_broker" @@ -443,98 +380,68 @@ do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) -> }); do_send( #{ - inflight := Inflight, connection := Connection, mountpoint := Mountpoint, connect_opts := #{forwards := Forwards} } = State, - QAckRef, Msg ) -> Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), - ExportMsg = fun(Message) -> - emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) - end, + ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars), ?SLOG(debug, #{ msg => "publish_to_remote_broker", message => Msg, vars => Vars }), - case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of - {ok, Refs} -> - {ok, State#{ - inflight := Inflight ++ - [ - #{ - q_ack_ref => QAckRef, - send_ack_ref => map_set(Refs), - msg => Msg - } - ] - }}; + case emqx_connector_mqtt_mod:send(Connection, ExportMsg) of + ok -> + {ok, State}; + {ok, #{reason_code := RC}} when + RC =:= ?RC_SUCCESS; + RC =:= ?RC_NO_MATCHING_SUBSCRIBERS + -> + {ok, State}; + {ok, #{reason_code := RC, reason_code_name := RCN}} -> + ?SLOG(warning, #{ + msg => "publish_to_remote_node_falied", + message => Msg, + reason_code => RC, + reason_code_name => RCN + }), + {error, RCN}; {error, Reason} -> ?SLOG(info, #{ msg => "mqtt_bridge_produce_failed", reason => Reason }), - {error, State} + {error, Reason} end. -%% map as set, ack-reference -> 1 -map_set(Ref) when is_reference(Ref) -> - %% QoS-0 or RPC call returns a reference - map_set([Ref]); -map_set(List) -> - map_set(List, #{}). - -map_set([], Set) -> Set; -map_set([H | T], Set) -> map_set(T, Set#{H => 1}). - -handle_batch_ack(#{inflight := Inflight0, replayq := Q} = State, Ref) -> - Inflight1 = do_ack(Inflight0, Ref), - Inflight = drop_acked_batches(Q, Inflight1), - State#{inflight := Inflight}. - -do_ack([], Ref) -> - ?SLOG(debug, #{ - msg => "stale_batch_ack_reference", - ref => Ref - }), - []; -do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) -> - case maps:is_key(Ref, Refs) of - true -> - NewRefs = maps:without([Ref], Refs), - [First#{send_ack_ref := NewRefs} | Rest]; - false -> - [First | do_ack(Rest, Ref)] - end. - -%% Drop the consecutive header of the inflight list having empty send_ack_ref -drop_acked_batches(_Q, []) -> - ?tp(debug, inflight_drained, #{}), - []; -drop_acked_batches( - Q, - [ - #{ - send_ack_ref := Refs, - q_ack_ref := QAckRef - } - | Rest - ] = All +do_send_async(#{connect_opts := #{forwards := undefined}}, Msg, _Callback) -> + %% TODO: eval callback with undefined error + ?SLOG(error, #{ + msg => + "cannot_forward_messages_to_remote_broker" + "_as_'egress'_is_not_configured", + messages => Msg + }); +do_send_async( + #{ + connection := Connection, + mountpoint := Mountpoint, + connect_opts := #{forwards := Forwards} + }, + Msg, + Callback ) -> - case maps:size(Refs) of - 0 -> - %% all messages are acked by bridge target - %% now it's safe to ack replayq (delete from disk) - ok = replayq:ack(Q, QAckRef), - %% continue to check more sent batches - drop_acked_batches(Q, Rest); - _ -> - %% the head (oldest) inflight batch is not acked, keep waiting - All - end. + Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), + ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars), + ?SLOG(debug, #{ + msg => "publish_to_remote_broker", + message => Msg, + vars => Vars + }), + emqx_connector_mqtt_mod:send_async(Connection, ExportMsg, Callback). disconnect(#{connection := Conn} = State) when Conn =/= undefined -> emqx_connector_mqtt_mod:stop(Conn), @@ -542,10 +449,6 @@ disconnect(#{connection := Conn} = State) when Conn =/= undefined -> disconnect(State) -> State. -%% Called only when replayq needs to dump it to disk. -msg_marshaller(Bin) when is_binary(Bin) -> emqx_connector_mqtt_msg:from_binary(Bin); -msg_marshaller(Msg) -> emqx_connector_mqtt_msg:to_binary(Msg). - format_mountpoint(undefined) -> undefined; format_mountpoint(Prefix) ->