From f2e7563b0c491ef86df0861a613523c6017a40a1 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 5 May 2022 13:08:33 +0800 Subject: [PATCH 1/3] feat(limiter): add limiter view/update api --- .../emqx_limiter/src/emqx_limiter_schema.erl | 6 +- .../src/emqx_mgmt_api_configs.erl | 72 +++++++++++++++---- 2 files changed, 65 insertions(+), 13 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index a47350d5c..3474b91d9 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -29,7 +29,8 @@ to_initial/1, namespace/0, get_bucket_cfg_path/2, - desc/1 + desc/1, + types/0 ]). -define(KILOBYTE, 1024). @@ -187,6 +188,9 @@ to_rate(Str) -> get_bucket_cfg_path(Type, BucketName) -> [limiter, Type, bucket, BucketName]. +types() -> + [bytes_in, message_in, connection, message_routing, batch]. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index f6fc8c045..05da32f9b 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -25,6 +25,7 @@ -export([ config/3, + limiter/3, config_reset/3, configs/3, get_full_config/0, @@ -71,7 +72,12 @@ api_spec() -> namespace() -> "configuration". paths() -> - ["/configs", "/configs_reset/:rootname", "/configs/global_zone"] ++ + [ + "/configs", + "/configs_reset/:rootname", + "/configs/global_zone", + "/configs/limiter/:limiter_type" + ] ++ lists:map(fun({Name, _Type}) -> ?PREFIX ++ binary_to_list(Name) end, config_list()). schema("/configs") -> @@ -156,6 +162,42 @@ schema("/configs/global_zone") -> } } }; +schema("/configs/limiter/:limiter_type") -> + Schema = hoconsc:ref(emqx_limiter_schema, limiter_opts), + Parameters = [ + {limiter_type, + hoconsc:mk( + hoconsc:enum(emqx_limiter_schema:types()), + #{ + in => query, + required => true, + example => <<"bytes_in">>, + desc => <<"The limiter type">> + } + )} + ], + #{ + 'operationId' => limiter, + get => #{ + tags => [conf], + description => <<"Get config of this limiter">>, + parameters => Parameters, + responses => #{ + 200 => Schema, + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"config not found">>) + } + }, + put => #{ + tags => [conf], + description => <<"Update config of this limiter">>, + parameters => Parameters, + 'requestBody' => Schema, + responses => #{ + 200 => Schema, + 400 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED']) + } + } + }; schema(Path) -> {RootKey, {_Root, Schema}} = find_schema(Path), #{ @@ -201,18 +243,13 @@ fields(Field) -> %%%============================================================================================== %% HTTP API Callbacks -config(get, _Params, Req) -> +config(Method, Params, Req) -> Path = conf_path(Req), - {ok, Conf} = emqx_map_lib:deep_find(Path, get_full_config()), - {200, Conf}; -config(put, #{body := Body}, Req) -> - Path = conf_path(Req), - case emqx_conf:update(Path, Body, ?OPTS) of - {ok, #{raw_config := RawConf}} -> - {200, RawConf}; - {error, Reason} -> - {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}} - end. + do_config(Method, Params, Path). + +limiter(Method, #{query_string := QS} = Params, _Req) -> + #{<<"limiter_type">> := Type} = QS, + do_config(Method, Params, [<<"limiter">>, erlang:atom_to_binary(Type)]). global_zone_configs(get, _Params, _Req) -> Paths = global_zone_roots(), @@ -340,3 +377,14 @@ global_zone_roots() -> global_zone_schema() -> Roots = hocon_schema:roots(emqx_zone_schema), lists:map(fun({RootKey, {_Root, Schema}}) -> {RootKey, Schema} end, Roots). + +do_config(get, _Params, Path) -> + {ok, Conf} = emqx_map_lib:deep_find(Path, get_full_config()), + {200, Conf}; +do_config(put, #{body := Body}, Path) -> + case emqx_conf:update(Path, Body, ?OPTS) of + {ok, #{raw_config := RawConf}} -> + {200, RawConf}; + {error, Reason} -> + {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}} + end. From acb78d53ebc3101e326136244a111a6af5d5025c Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 5 May 2022 15:38:04 +0800 Subject: [PATCH 2/3] fix(limiter): fix rate parser, support this format: xMB/s --- .../emqx_limiter/src/emqx_limiter_schema.erl | 32 +++++++++++++++---- .../src/emqx_dashboard_swagger.erl | 8 ++--- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 3474b91d9..7856f836a 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -199,18 +199,36 @@ ref(Field) -> hoconsc:ref(?MODULE, Field). to_burst_rate(Str) -> to_rate(Str, false, true). +%% rate can be: 10 10MB 10MB/s 10MB/2s infinity +%% e.g. the bytes_in regex tree is: +%% +%% __ infinity +%% | - xMB +%% rate -| | +%% __ ?Size(/?Time) -| - xMB/s +%% | | +%% - xMB/?Time -| +%% - xMB/ys to_rate(Str, CanInfinity, CanZero) -> - Tokens = [string:trim(T) || T <- string:tokens(Str, "/")], - case Tokens of - ["infinity"] when CanInfinity -> + Regex = "^\s*(?:([0-9]+[a-zA-Z]*)(?:/([0-9]*)([m s h d M S H D]{1,2}))?\s*$)|infinity\s*$", + {ok, MP} = re:compile(Regex), + case re:run(Str, MP, [{capture, all_but_first, list}]) of + {match, []} when CanInfinity -> {ok, infinity}; %% if time unit is 1s, it can be omitted - [QuotaStr] -> + {match, [QuotaStr]} -> Fun = fun(Quota) -> {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS} end, to_capacity(QuotaStr, Str, CanZero, Fun); - [QuotaStr, Interval] -> + {match, [QuotaStr, TimeVal, TimeUnit]} -> + Interval = + case TimeVal of + %% for xM/s + [] -> "1" ++ TimeUnit; + %% for xM/ys + _ -> TimeVal ++ TimeUnit + end, Fun = fun(Quota) -> try case emqx_schema:to_duration_ms(Interval) of @@ -246,11 +264,11 @@ check_capacity(_Str, Quota, _CanZero, Cont) -> Cont(Quota). to_capacity(Str) -> - Regex = "^\s*(?:([0-9]+)([a-zA-z]*))|infinity\s*$", + Regex = "^\s*(?:([0-9]+)([a-zA-Z]*))|infinity\s*$", to_quota(Str, Regex). to_initial(Str) -> - Regex = "^\s*([0-9]+)([a-zA-z]*)\s*$", + Regex = "^\s*([0-9]+)([a-zA-Z]*)\s*$", to_quota(Str, Regex). to_quota(Str, Regex) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 9924b085c..db468726b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -667,16 +667,16 @@ typename_to_spec("log_level()", _Mod) -> enum => [debug, info, notice, warning, error, critical, alert, emergency, all] }; typename_to_spec("rate()", _Mod) -> - #{type => string, example => <<"10M/s">>}; + #{type => string, example => <<"10MB">>}; typename_to_spec("capacity()", _Mod) -> - #{type => string, example => <<"100M">>}; + #{type => string, example => <<"100MB">>}; typename_to_spec("burst_rate()", _Mod) -> %% 0/0s = no burst - #{type => string, example => <<"10M/1s">>}; + #{type => string, example => <<"10MB">>}; typename_to_spec("failure_strategy()", _Mod) -> #{type => string, example => <<"force">>}; typename_to_spec("initial()", _Mod) -> - #{type => string, example => <<"0M">>}; + #{type => string, example => <<"0MB">>}; typename_to_spec("bucket_name()", _Mod) -> #{type => string, example => <<"retainer">>}; typename_to_spec(Name, Mod) -> From 1a7c870b7d8a6a3ef5b3e9be401e3e41ae1b34d8 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 5 May 2022 16:06:22 +0800 Subject: [PATCH 3/3] test(limiter): add more cases for rate parser --- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 8ad7cb3f1..bb751f9ee 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -568,11 +568,32 @@ t_decimal(_) -> t_schema_unit(_) -> M = emqx_limiter_schema, ?assertEqual(limiter, M:namespace()), + + %% infinity ?assertEqual({ok, infinity}, M:to_rate(" infinity ")), + + %% xMB ?assertMatch({ok, _}, M:to_rate("100")), - ?assertMatch({error, _}, M:to_rate("0")), + ?assertMatch({ok, _}, M:to_rate(" 100 ")), + ?assertMatch({ok, _}, M:to_rate("100MB")), + + %% xMB/s + ?assertMatch({ok, _}, M:to_rate("100/s")), + ?assertMatch({ok, _}, M:to_rate("100MB/s")), + + %% xMB/ys ?assertMatch({ok, _}, M:to_rate("100/10s")), + ?assertMatch({ok, _}, M:to_rate("100MB/10s")), + + ?assertMatch({error, _}, M:to_rate("infini")), + ?assertMatch({error, _}, M:to_rate("0")), + ?assertMatch({error, _}, M:to_rate("MB")), + ?assertMatch({error, _}, M:to_rate("10s")), + ?assertMatch({error, _}, M:to_rate("100MB/")), + ?assertMatch({error, _}, M:to_rate("100MB/xx")), + ?assertMatch({error, _}, M:to_rate("100MB/1")), ?assertMatch({error, _}, M:to_rate("100/10x")), + ?assertEqual({ok, infinity}, M:to_capacity("infinity")), ?assertEqual({ok, 100}, M:to_capacity("100")), ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),