fix(bridge_mqtt): max_inflight_size -> max_inflight; this enables emqtt.erl to use the value (#3938)
* fix(bridge_mqtt): max_inflight_size -> max_inflight; this enables emqtt.erl to use the value * chore(bridge_mqtt): fix elvis * fix(worker): max_inflight_batches -> max_inflight
This commit is contained in:
parent
739e49218f
commit
56920b6ca6
|
|
@ -149,9 +149,10 @@ bridge.mqtt.aws.retry_interval = 20s
|
||||||
bridge.mqtt.aws.batch_size = 32
|
bridge.mqtt.aws.batch_size = 32
|
||||||
|
|
||||||
## Inflight size.
|
## Inflight size.
|
||||||
|
## 0 means infinity (no limit on the inflight window)
|
||||||
##
|
##
|
||||||
## Value: Integer
|
## Value: Integer
|
||||||
bridge.mqtt.aws.max_inflight_size = 32
|
bridge.mqtt.aws.max_inflight = 32
|
||||||
|
|
||||||
## Base directory for replayq to store messages on disk
|
## Base directory for replayq to store messages on disk
|
||||||
## If this config entry is missing or set to undefined,
|
## If this config entry is missing or set to undefined,
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@
|
||||||
{datatype, {duration, ms}}
|
{datatype, {duration, ms}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "bridge.mqtt.$name.max_inflight_size", "emqx_bridge_mqtt.bridges", [
|
{mapping, "bridge.mqtt.$name.max_inflight", "emqx_bridge_mqtt.bridges", [
|
||||||
{default, 0},
|
{default, 0},
|
||||||
{datatype, integer}
|
{datatype, integer}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
||||||
|
|
@ -92,7 +92,8 @@ ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end;
|
end;
|
||||||
ensure_subscribed(_Conn, _Topic, _QoS) ->
|
ensure_subscribed(_Conn, _Topic, _QoS) ->
|
||||||
%% return ok for now, next re-connect should should call start with new topic added to config
|
%% return ok for now
|
||||||
|
%% next re-connect should should call start with new topic added to config
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
|
ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
|
||||||
|
|
@ -101,7 +102,8 @@ ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end;
|
end;
|
||||||
ensure_unsubscribed(_, _) ->
|
ensure_unsubscribed(_, _) ->
|
||||||
%% return ok for now, next re-connect should should call start with this topic deleted from config
|
%% return ok for now
|
||||||
|
%% next re-connect should should call start with this topic deleted from config
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
safe_stop(Pid, StopF, Timeout) ->
|
safe_stop(Pid, StopF, Timeout) ->
|
||||||
|
|
@ -172,7 +174,7 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
replvar(Options) ->
|
replvar(Options) ->
|
||||||
replvar([clientid], Options).
|
replvar([clientid, max_inflight], Options).
|
||||||
|
|
||||||
replvar([], Options) ->
|
replvar([], Options) ->
|
||||||
Options;
|
Options;
|
||||||
|
|
@ -186,5 +188,11 @@ replvar([Key|More], Options) ->
|
||||||
|
|
||||||
%% ${node} => node()
|
%% ${node} => node()
|
||||||
feedvar(clientid, ClientId, _) ->
|
feedvar(clientid, ClientId, _) ->
|
||||||
iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))).
|
iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node())));
|
||||||
|
|
||||||
|
feedvar(max_inflight, 0, _) ->
|
||||||
|
infinity;
|
||||||
|
|
||||||
|
feedvar(max_inflight, Size, _) ->
|
||||||
|
Size.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -251,7 +251,7 @@ init_opts(Config) ->
|
||||||
BridgeHandler = maps:get(bridge_handler, Config, ?NO_BRIDGE_HANDLER),
|
BridgeHandler = maps:get(bridge_handler, Config, ?NO_BRIDGE_HANDLER),
|
||||||
Mountpoint = maps:get(forward_mountpoint, Config, undefined),
|
Mountpoint = maps:get(forward_mountpoint, Config, undefined),
|
||||||
ReceiveMountpoint = maps:get(receive_mountpoint, Config, undefined),
|
ReceiveMountpoint = maps:get(receive_mountpoint, Config, undefined),
|
||||||
MaxInflightSize = maps:get(max_inflight_batches, Config, ?DEFAULT_BATCH_SIZE),
|
MaxInflightSize = maps:get(max_inflight, Config, ?DEFAULT_BATCH_SIZE),
|
||||||
BatchSize = maps:get(batch_size, Config, ?DEFAULT_BATCH_SIZE),
|
BatchSize = maps:get(batch_size, Config, ?DEFAULT_BATCH_SIZE),
|
||||||
Name = maps:get(name, Config, undefined),
|
Name = maps:get(name, Config, undefined),
|
||||||
#{start_type => StartType,
|
#{start_type => StartType,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue