From 509ab6f35a44182deefb6c60339cc923b314729d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 16 Jan 2024 14:19:31 +0100 Subject: [PATCH] feat(api): add /sessions_count api to count sessions --- apps/emqx/src/emqx_cm_registry.erl | 2 +- ...leaner.erl => emqx_cm_registry_keeper.erl} | 61 +++++++++++++++++-- apps/emqx/src/emqx_cm_sup.erl | 4 +- apps/emqx/src/emqx_schema.erl | 4 +- .../src/emqx_mgmt_api_clients.erl | 35 ++++++++++- rel/i18n/emqx_mgmt_api_clients.hocon | 7 +++ rel/i18n/emqx_schema.hocon | 2 +- 7 files changed, 103 insertions(+), 12 deletions(-) rename apps/emqx/src/{emqx_cm_registry_cleaner.erl => emqx_cm_registry_keeper.erl} (65%) diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 0236cbc06..1fd140388 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -283,4 +283,4 @@ fold_hist(F, List) -> %% Return the session registration history retain duration. -spec retain_duration() -> non_neg_integer(). retain_duration() -> - emqx:get_config([broker, session_registration_history_retain]). + emqx:get_config([broker, session_history_retain]). diff --git a/apps/emqx/src/emqx_cm_registry_cleaner.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl similarity index 65% rename from apps/emqx/src/emqx_cm_registry_cleaner.erl rename to apps/emqx/src/emqx_cm_registry_keeper.erl index 41f5bfc6b..8d697732a 100644 --- a/apps/emqx/src/emqx_cm_registry_cleaner.erl +++ b/apps/emqx/src/emqx_cm_registry_keeper.erl @@ -15,10 +15,13 @@ %%-------------------------------------------------------------------- %% @doc This module implements the global session registry history cleaner. --module(emqx_cm_registry_cleaner). +-module(emqx_cm_registry_keeper). -behaviour(gen_server). --export([start_link/0]). +-export([ + start_link/0, + count/1 +]). %% gen_server callbacks -export([ @@ -30,8 +33,15 @@ code_change/3 ]). +-include_lib("stdlib/include/ms_transform.hrl"). -include("emqx_cm.hrl"). +-define(CACHE_COUNT_THRESHOLD, 1000). +-define(MIN_COUNT_INTERVAL_SECONDS, 5). +-define(CLEANUP_CHUNK_SIZE, 10000). + +-define(IS_HIST_ENABLED(RETAIN), (RETAIN > 0)). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -44,6 +54,45 @@ init(_) -> ignore end. +%% @doc Count the number of sessions. +%% Include sessions which are expired since the given timestamp if `since' is greater than 0. +-spec count(non_neg_integer()) -> non_neg_integer(). +count(Since) -> + Retain = retain_duration(), + Now = now_ts(), + %% Get table size if hist is not enabled or + %% Since is before the earliest possible retention time. + IsCountAll = (not ?IS_HIST_ENABLED(Retain) orelse (Now - Retain >= Since)), + case IsCountAll of + true -> + mnesia:table_info(?CHAN_REG_TAB, size); + false -> + %% make a gen call to avoid many callers doing the same concurrently + gen_server:call(?MODULE, {count, Since}, infinity) + end. + +handle_call({count, Since}, _From, State) -> + {LastCountTime, LastCount} = + case State of + #{last_count_time := T, last_count := C} -> + {T, C}; + _ -> + {0, 0} + end, + Now = now_ts(), + Total = mnesia:table_info(?CHAN_REG_TAB, size), + %% Always count if the table is small enough + %% or when the last count is too old + IsTableSmall = (Total < ?CACHE_COUNT_THRESHOLD), + IsLastCountOld = (Now - LastCountTime > ?MIN_COUNT_INTERVAL_SECONDS), + case IsTableSmall orelse IsLastCountOld of + true -> + Count = do_count(Since), + CountFinishedAt = now_ts(), + {reply, Count, State#{last_count_time => CountFinishedAt, last_count => Count}}; + false -> + {reply, LastCount, State} + end; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -84,7 +133,7 @@ cleanup_one_chunk(NextClientId) -> IsExpired = fun(#channel{pid = Ts}) -> is_integer(Ts) andalso (Ts < Now - Retain) end, - cleanup_loop(NextClientId, 10000, IsExpired). + cleanup_loop(NextClientId, ?CLEANUP_CHUNK_SIZE, IsExpired). cleanup_loop(ClientId, 0, _IsExpired) -> ClientId; @@ -114,7 +163,7 @@ is_hist_enabled() -> %% Return the session registration history retain duration in seconds. -spec retain_duration() -> non_neg_integer(). retain_duration() -> - emqx:get_config([broker, session_registration_history_retain]). + emqx:get_config([broker, session_history_retain]). cleanup_delay() -> Default = timer:minutes(2), @@ -137,3 +186,7 @@ send_delay_start(Delay) -> now_ts() -> erlang:system_time(seconds). + +do_count(Since) -> + Ms = ets:fun2ms(fun(#channel{pid = V}) -> is_pid(V) orelse (is_integer(V) andalso (V >= Since)) end), + ets:select_count(?CHAN_REG_TAB, Ms). diff --git a/apps/emqx/src/emqx_cm_sup.erl b/apps/emqx/src/emqx_cm_sup.erl index 348c3fca0..3306b7ccd 100644 --- a/apps/emqx/src/emqx_cm_sup.erl +++ b/apps/emqx/src/emqx_cm_sup.erl @@ -49,7 +49,7 @@ init([]) -> Locker = child_spec(emqx_cm_locker, 5000, worker), CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]), Registry = child_spec(emqx_cm_registry, 5000, worker), - RegistryCleaner = child_spec(emqx_cm_registry_cleaner, 5000, worker), + RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker), DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor), Children = @@ -59,7 +59,7 @@ init([]) -> Locker, CmPool, Registry, - RegistryCleaner, + RegistryKeeper, Manager, DSSessionGCSup ], diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ec4edfb6b..d5989687d 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1356,13 +1356,13 @@ fields("broker") -> desc => ?DESC(broker_enable_session_registry) } )}, - {session_registration_history_retain, + {session_history_retain, sc( duration_s(), #{ default => <<"0s">>, importance => ?IMPORTANCE_LOW, - desc => ?DESC("broker_session_registration_history_retain") + desc => ?DESC("broker_session_history_retain") } )}, {session_locking_strategy, diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index f394ffefa..935c690fe 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -45,7 +45,8 @@ subscribe_batch/2, unsubscribe/2, unsubscribe_batch/2, - set_keepalive/2 + set_keepalive/2, + sessions_count/2 ]). -export([ @@ -96,7 +97,8 @@ paths() -> "/clients/:clientid/subscribe/bulk", "/clients/:clientid/unsubscribe", "/clients/:clientid/unsubscribe/bulk", - "/clients/:clientid/keepalive" + "/clients/:clientid/keepalive", + "/sessions_count" ]. schema("/clients") -> @@ -385,6 +387,30 @@ schema("/clients/:clientid/keepalive") -> ) } } + }; +schema("/sessions_count") -> + #{ + 'operationId' => sessions_count, + get => #{ + description => ?DESC(get_sessions_count), + tags => ?TAGS, + parameters => [ + {since, + hoconsc:mk(non_neg_integer(), #{ + in => query, + required => false, + default => 0, + desc => + <<"Include sessions expired after this time (UNIX Epoch in seconds precesion)">>, + example => 1705391625 + })} + ], + responses => #{ + 200 => hoconsc:mk(binary(), #{ + desc => <<"Number of sessions">> + }) + } + } }. fields(clients) -> @@ -1059,3 +1085,8 @@ client_example() -> <<"recv_cnt">> => 4, <<"recv_msg.qos0">> => 0 }. + +sessions_count(get, #{query_string := QString}) -> + Since = maps:get(<<"since">>, QString, undefined), + Count = emqx_cm_registry_keeper:count(Since), + {200, integer_to_binary(Count)}. diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index 64d4e5279..1e9193df6 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -60,4 +60,11 @@ set_keepalive_seconds.desc: set_keepalive_seconds.label: """Set the online client keepalive by seconds""" +get_sessions_count.desc: +"""Get the number of sessions. By default it returns the number of non-expired sessions. +if `broker.session_history_retain` is set to a duration greater than `0s`, +this API can also count expired sessions by providing the `since` parameter.""" +get_sessions_count.label: +"""Count number of sessions""" + } diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 94ba275f3..4c9f1b83e 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1429,7 +1429,7 @@ Advantages of Disabling
- Reduced Memory Usage: Turning off the session registry can lower the overall memory footprint of the system. - Improved Performance: Without the overhead of maintaining a global registry, the node can process client connections faster.""" -broker_session_registration_history_retain.desc: +broker_session_history_retain.desc: """The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance. This retained history can be used to monitor how many sessions were registered in the past configured duration. Note: This config has no effect if `enable_session_registry` is set to `false`.