diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl
index 0236cbc06..1fd140388 100644
--- a/apps/emqx/src/emqx_cm_registry.erl
+++ b/apps/emqx/src/emqx_cm_registry.erl
@@ -283,4 +283,4 @@ fold_hist(F, List) ->
%% Return the session registration history retain duration.
-spec retain_duration() -> non_neg_integer().
retain_duration() ->
- emqx:get_config([broker, session_registration_history_retain]).
+ emqx:get_config([broker, session_history_retain]).
diff --git a/apps/emqx/src/emqx_cm_registry_cleaner.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl
similarity index 65%
rename from apps/emqx/src/emqx_cm_registry_cleaner.erl
rename to apps/emqx/src/emqx_cm_registry_keeper.erl
index 41f5bfc6b..8d697732a 100644
--- a/apps/emqx/src/emqx_cm_registry_cleaner.erl
+++ b/apps/emqx/src/emqx_cm_registry_keeper.erl
@@ -15,10 +15,13 @@
%%--------------------------------------------------------------------
%% @doc This module implements the global session registry history cleaner.
--module(emqx_cm_registry_cleaner).
+-module(emqx_cm_registry_keeper).
-behaviour(gen_server).
--export([start_link/0]).
+-export([
+ start_link/0,
+ count/1
+]).
%% gen_server callbacks
-export([
@@ -30,8 +33,15 @@
code_change/3
]).
+-include_lib("stdlib/include/ms_transform.hrl").
-include("emqx_cm.hrl").
+-define(CACHE_COUNT_THRESHOLD, 1000).
+-define(MIN_COUNT_INTERVAL_SECONDS, 5).
+-define(CLEANUP_CHUNK_SIZE, 10000).
+
+-define(IS_HIST_ENABLED(RETAIN), (RETAIN > 0)).
+
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -44,6 +54,45 @@ init(_) ->
ignore
end.
+%% @doc Count the number of sessions.
+%% Include sessions which are expired since the given timestamp if `since' is greater than 0.
+-spec count(non_neg_integer()) -> non_neg_integer().
+count(Since) ->
+ Retain = retain_duration(),
+ Now = now_ts(),
+ %% Get table size if hist is not enabled or
+ %% Since is before the earliest possible retention time.
+ IsCountAll = (not ?IS_HIST_ENABLED(Retain) orelse (Now - Retain >= Since)),
+ case IsCountAll of
+ true ->
+ mnesia:table_info(?CHAN_REG_TAB, size);
+ false ->
+ %% make a gen call to avoid many callers doing the same concurrently
+ gen_server:call(?MODULE, {count, Since}, infinity)
+ end.
+
+handle_call({count, Since}, _From, State) ->
+ {LastCountTime, LastCount} =
+ case State of
+ #{last_count_time := T, last_count := C} ->
+ {T, C};
+ _ ->
+ {0, 0}
+ end,
+ Now = now_ts(),
+ Total = mnesia:table_info(?CHAN_REG_TAB, size),
+ %% Always count if the table is small enough
+ %% or when the last count is too old
+ IsTableSmall = (Total < ?CACHE_COUNT_THRESHOLD),
+ IsLastCountOld = (Now - LastCountTime > ?MIN_COUNT_INTERVAL_SECONDS),
+ case IsTableSmall orelse IsLastCountOld of
+ true ->
+ Count = do_count(Since),
+ CountFinishedAt = now_ts(),
+ {reply, Count, State#{last_count_time => CountFinishedAt, last_count => Count}};
+ false ->
+ {reply, LastCount, State}
+ end;
handle_call(_Request, _From, State) ->
{reply, ok, State}.
@@ -84,7 +133,7 @@ cleanup_one_chunk(NextClientId) ->
IsExpired = fun(#channel{pid = Ts}) ->
is_integer(Ts) andalso (Ts < Now - Retain)
end,
- cleanup_loop(NextClientId, 10000, IsExpired).
+ cleanup_loop(NextClientId, ?CLEANUP_CHUNK_SIZE, IsExpired).
cleanup_loop(ClientId, 0, _IsExpired) ->
ClientId;
@@ -114,7 +163,7 @@ is_hist_enabled() ->
%% Return the session registration history retain duration in seconds.
-spec retain_duration() -> non_neg_integer().
retain_duration() ->
- emqx:get_config([broker, session_registration_history_retain]).
+ emqx:get_config([broker, session_history_retain]).
cleanup_delay() ->
Default = timer:minutes(2),
@@ -137,3 +186,7 @@ send_delay_start(Delay) ->
now_ts() ->
erlang:system_time(seconds).
+
+do_count(Since) ->
+ Ms = ets:fun2ms(fun(#channel{pid = V}) -> is_pid(V) orelse (is_integer(V) andalso (V >= Since)) end),
+ ets:select_count(?CHAN_REG_TAB, Ms).
diff --git a/apps/emqx/src/emqx_cm_sup.erl b/apps/emqx/src/emqx_cm_sup.erl
index 348c3fca0..3306b7ccd 100644
--- a/apps/emqx/src/emqx_cm_sup.erl
+++ b/apps/emqx/src/emqx_cm_sup.erl
@@ -49,7 +49,7 @@ init([]) ->
Locker = child_spec(emqx_cm_locker, 5000, worker),
CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]),
Registry = child_spec(emqx_cm_registry, 5000, worker),
- RegistryCleaner = child_spec(emqx_cm_registry_cleaner, 5000, worker),
+ RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker),
Manager = child_spec(emqx_cm, 5000, worker),
DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
Children =
@@ -59,7 +59,7 @@ init([]) ->
Locker,
CmPool,
Registry,
- RegistryCleaner,
+ RegistryKeeper,
Manager,
DSSessionGCSup
],
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index ec4edfb6b..d5989687d 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -1356,13 +1356,13 @@ fields("broker") ->
desc => ?DESC(broker_enable_session_registry)
}
)},
- {session_registration_history_retain,
+ {session_history_retain,
sc(
duration_s(),
#{
default => <<"0s">>,
importance => ?IMPORTANCE_LOW,
- desc => ?DESC("broker_session_registration_history_retain")
+ desc => ?DESC("broker_session_history_retain")
}
)},
{session_locking_strategy,
diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
index f394ffefa..935c690fe 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
@@ -45,7 +45,8 @@
subscribe_batch/2,
unsubscribe/2,
unsubscribe_batch/2,
- set_keepalive/2
+ set_keepalive/2,
+ sessions_count/2
]).
-export([
@@ -96,7 +97,8 @@ paths() ->
"/clients/:clientid/subscribe/bulk",
"/clients/:clientid/unsubscribe",
"/clients/:clientid/unsubscribe/bulk",
- "/clients/:clientid/keepalive"
+ "/clients/:clientid/keepalive",
+ "/sessions_count"
].
schema("/clients") ->
@@ -385,6 +387,30 @@ schema("/clients/:clientid/keepalive") ->
)
}
}
+ };
+schema("/sessions_count") ->
+ #{
+ 'operationId' => sessions_count,
+ get => #{
+ description => ?DESC(get_sessions_count),
+ tags => ?TAGS,
+ parameters => [
+ {since,
+ hoconsc:mk(non_neg_integer(), #{
+ in => query,
+ required => false,
+ default => 0,
+ desc =>
+ <<"Include sessions expired after this time (UNIX Epoch in seconds precesion)">>,
+ example => 1705391625
+ })}
+ ],
+ responses => #{
+ 200 => hoconsc:mk(binary(), #{
+ desc => <<"Number of sessions">>
+ })
+ }
+ }
}.
fields(clients) ->
@@ -1059,3 +1085,8 @@ client_example() ->
<<"recv_cnt">> => 4,
<<"recv_msg.qos0">> => 0
}.
+
+sessions_count(get, #{query_string := QString}) ->
+ Since = maps:get(<<"since">>, QString, undefined),
+ Count = emqx_cm_registry_keeper:count(Since),
+ {200, integer_to_binary(Count)}.
diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon
index 64d4e5279..1e9193df6 100644
--- a/rel/i18n/emqx_mgmt_api_clients.hocon
+++ b/rel/i18n/emqx_mgmt_api_clients.hocon
@@ -60,4 +60,11 @@ set_keepalive_seconds.desc:
set_keepalive_seconds.label:
"""Set the online client keepalive by seconds"""
+get_sessions_count.desc:
+"""Get the number of sessions. By default it returns the number of non-expired sessions.
+if `broker.session_history_retain` is set to a duration greater than `0s`,
+this API can also count expired sessions by providing the `since` parameter."""
+get_sessions_count.label:
+"""Count number of sessions"""
+
}
diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon
index 94ba275f3..4c9f1b83e 100644
--- a/rel/i18n/emqx_schema.hocon
+++ b/rel/i18n/emqx_schema.hocon
@@ -1429,7 +1429,7 @@ Advantages of Disabling
- Reduced Memory Usage: Turning off the session registry can lower the overall memory footprint of the system.
- Improved Performance: Without the overhead of maintaining a global registry, the node can process client connections faster."""
-broker_session_registration_history_retain.desc:
+broker_session_history_retain.desc:
"""The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance.
This retained history can be used to monitor how many sessions were registered in the past configured duration.
Note: This config has no effect if `enable_session_registry` is set to `false`.