fix(influxdb): connector use a fallbacke `pool_size` for influxdb client

This commit is contained in:
JimMoen 2022-08-18 16:00:04 +08:00
parent 9050aa6468
commit 06363e63d9
7 changed files with 41 additions and 33 deletions

View File

@ -68,6 +68,8 @@ ssl_fields() ->
relational_db_fields() -> relational_db_fields() ->
[ [
{database, fun database/1}, {database, fun database/1},
%% TODO: The `pool_size` for drivers will be deprecated. Ues `worker_pool_size` for emqx_resource
%% See emqx_resource.hrl
{pool_size, fun pool_size/1}, {pool_size, fun pool_size/1},
{username, fun username/1}, {username, fun username/1},
{password, fun password/1}, {password, fun password/1},

View File

@ -22,6 +22,17 @@ emqx_resource_schema {
} }
} }
worker_pool_size {
desc {
en: """Resource worker pool size."""
zh: """资源连接池大小。"""
}
label {
en: """Worker Pool Size"""
zh: """资源连接池大小"""
}
}
health_check_interval { health_check_interval {
desc { desc {
en: """Health check interval, in milliseconds.""" en: """Health check interval, in milliseconds."""

View File

@ -49,25 +49,26 @@
%% use auto_restart_interval instead %% use auto_restart_interval instead
auto_retry_interval => integer(), auto_retry_interval => integer(),
%%======================================= Deprecated Opts End %%======================================= Deprecated Opts End
health_check_interval => integer(), worker_pool_size => pos_integer(),
health_check_interval => pos_integer(),
%% We can choose to block the return of emqx_resource:start until %% We can choose to block the return of emqx_resource:start until
%% the resource connected, wait max to `start_timeout` ms. %% the resource connected, wait max to `start_timeout` ms.
start_timeout => integer(), start_timeout => pos_integer(),
%% If `start_after_created` is set to true, the resource is started right %% If `start_after_created` is set to true, the resource is started right
%% after it is created. But note that a `started` resource is not guaranteed %% after it is created. But note that a `started` resource is not guaranteed
%% to be `connected`. %% to be `connected`.
start_after_created => boolean(), start_after_created => boolean(),
%% If the resource disconnected, we can set to retry starting the resource %% If the resource disconnected, we can set to retry starting the resource
%% periodically. %% periodically.
auto_restart_interval => integer(), auto_restart_interval => pos_integer(),
enable_batch => boolean(), enable_batch => boolean(),
batch_size => integer(), batch_size => pos_integer(),
batch_time => integer(), batch_time => pos_integer(),
enable_queue => boolean(), enable_queue => boolean(),
queue_max_bytes => integer(), queue_max_bytes => pos_integer(),
query_mode => async | sync | dynamic, query_mode => async | sync | dynamic,
resume_interval => integer(), resume_interval => pos_integer(),
async_inflight_window => integer() async_inflight_window => pos_integer()
}. }.
-type query_result() :: -type query_result() ::
ok ok
@ -75,6 +76,8 @@
| {error, term()} | {error, term()}
| {resource_down, term()}. | {resource_down, term()}.
-define(WORKER_POOL_SIZE, 16).
-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
-define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>). -define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>).

View File

@ -53,23 +53,23 @@ init([]) ->
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.
start_workers(ResId, Opts) -> start_workers(ResId, Opts) ->
PoolSize = pool_size(Opts), WorkerPoolSize = worker_pool_size(Opts),
_ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]), _ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]),
lists:foreach( lists:foreach(
fun(Idx) -> fun(Idx) ->
_ = ensure_worker_added(ResId, Idx), _ = ensure_worker_added(ResId, Idx),
ok = ensure_worker_started(ResId, Idx, Opts) ok = ensure_worker_started(ResId, Idx, Opts)
end, end,
lists:seq(1, PoolSize) lists:seq(1, WorkerPoolSize)
). ).
stop_workers(ResId, Opts) -> stop_workers(ResId, Opts) ->
PoolSize = pool_size(Opts), WorkerPoolSize = worker_pool_size(Opts),
lists:foreach( lists:foreach(
fun(Idx) -> fun(Idx) ->
ensure_worker_removed(ResId, Idx) ensure_worker_removed(ResId, Idx)
end, end,
lists:seq(1, PoolSize) lists:seq(1, WorkerPoolSize)
), ),
ensure_worker_pool_removed(ResId), ensure_worker_pool_removed(ResId),
ok. ok.
@ -77,7 +77,7 @@ stop_workers(ResId, Opts) ->
%%%============================================================================= %%%=============================================================================
%%% Internal %%% Internal
%%%============================================================================= %%%=============================================================================
pool_size(Opts) -> worker_pool_size(Opts) ->
maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)). maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)).
ensure_worker_pool(ResId, Type, Opts) -> ensure_worker_pool(ResId, Type, Opts) ->

View File

@ -44,6 +44,7 @@ fields("resource_opts") ->
]; ];
fields("creation_opts") -> fields("creation_opts") ->
[ [
{worker_pool_size, fun worker_pool_size/1},
{health_check_interval, fun health_check_interval/1}, {health_check_interval, fun health_check_interval/1},
{start_after_created, fun start_after_created/1}, {start_after_created, fun start_after_created/1},
{start_timeout, fun start_timeout/1}, {start_timeout, fun start_timeout/1},
@ -57,6 +58,12 @@ fields("creation_opts") ->
{max_queue_bytes, fun queue_max_bytes/1} {max_queue_bytes, fun queue_max_bytes/1}
]. ].
worker_pool_size(type) -> pos_integer();
worker_pool_size(desc) -> ?DESC("worker_pool_size");
worker_pool_size(default) -> ?WORKER_POOL_SIZE;
worker_pool_size(required) -> false;
worker_pool_size(_) -> undefined.
health_check_interval(type) -> emqx_schema:duration_ms(); health_check_interval(type) -> emqx_schema:duration_ms();
health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(desc) -> ?DESC("health_check_interval");
health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;

View File

@ -150,15 +150,5 @@ emqx_ee_connector_influxdb {
zh: """时间精度""" zh: """时间精度"""
} }
} }
pool_size {
desc {
en: """InfluxDB Pool Size. Default value is CPU threads."""
zh: """InfluxDB 连接池大小,默认为 CPU 线程数。"""
}
label {
en: """InfluxDB Pool Size"""
zh: """InfluxDB 连接池大小"""
}
}
} }

View File

@ -135,8 +135,7 @@ fields(basic) ->
{precision, {precision,
mk(enum([ns, us, ms, s, m, h]), #{ mk(enum([ns, us, ms, s, m, h]), #{
required => false, default => ms, desc => ?DESC("precision") required => false, default => ms, desc => ?DESC("precision")
})}, })}
{pool_size, mk(pos_integer(), #{desc => ?DESC("pool_size")})}
]; ];
fields(influxdb_udp) -> fields(influxdb_udp) ->
fields(basic); fields(basic);
@ -190,15 +189,13 @@ values(udp, put) ->
#{ #{
host => <<"127.0.0.1">>, host => <<"127.0.0.1">>,
port => 8089, port => 8089,
precision => ms, precision => ms
pool_size => 8
}; };
values(api_v1, put) -> values(api_v1, put) ->
#{ #{
host => <<"127.0.0.1">>, host => <<"127.0.0.1">>,
port => 8086, port => 8086,
precision => ms, precision => ms,
pool_size => 8,
database => <<"my_db">>, database => <<"my_db">>,
username => <<"my_user">>, username => <<"my_user">>,
password => <<"my_password">>, password => <<"my_password">>,
@ -209,7 +206,6 @@ values(api_v2, put) ->
host => <<"127.0.0.1">>, host => <<"127.0.0.1">>,
port => 8086, port => 8086,
precision => ms, precision => ms,
pool_size => 8,
bucket => <<"my_bucket">>, bucket => <<"my_bucket">>,
org => <<"my_org">>, org => <<"my_org">>,
token => <<"my_token">>, token => <<"my_token">>,
@ -302,14 +298,13 @@ client_config(
InstId, InstId,
Config = #{ Config = #{
host := Host, host := Host,
port := Port, port := Port
pool_size := PoolSize
} }
) -> ) ->
[ [
{host, binary_to_list(Host)}, {host, binary_to_list(Host)},
{port, Port}, {port, Port},
{pool_size, PoolSize}, {pool_size, erlang:system_info(schedulers)},
{pool, binary_to_atom(InstId, utf8)}, {pool, binary_to_atom(InstId, utf8)},
{precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
] ++ protocol_config(Config). ] ++ protocol_config(Config).