Merge pull request #12373 from JimMoen/fix-prom-data-aggre

Follow up #12299
This commit is contained in:
JianBo He 2024-01-25 10:37:47 +08:00 committed by GitHub
commit ff0fd65f9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 268 additions and 189 deletions

View File

@ -21,7 +21,7 @@ endif
# Dashboard version # Dashboard version
# from https://github.com/emqx/emqx-dashboard5 # from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.6.1 export EMQX_DASHBOARD_VERSION ?= v1.6.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.3 export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.8
PROFILE ?= emqx PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise REL_PROFILES := emqx emqx-enterprise

View File

@ -1177,6 +1177,9 @@ format_resource(
) )
). ).
%% FIXME:
%% missing metrics:
%% 'retried.success' and 'retried.failed'
format_metrics(#{ format_metrics(#{
counters := #{ counters := #{
'dropped' := Dropped, 'dropped' := Dropped,

View File

@ -4,6 +4,7 @@
{emqx, {path, "../emqx"}}, {emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}}, {emqx_utils, {path, "../emqx_utils"}},
{emqx_auth, {path, "../emqx_auth"}}, {emqx_auth, {path, "../emqx_auth"}},
{emqx_resource, {path, "../emqx_resource"}},
{prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.2"}}} {prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.2"}}}
]}. ]}.

View File

@ -5,7 +5,7 @@
{vsn, "5.0.19"}, {vsn, "5.0.19"},
{modules, []}, {modules, []},
{registered, [emqx_prometheus_sup]}, {registered, [emqx_prometheus_sup]},
{applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_management]}, {applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]},
{mod, {emqx_prometheus_app, []}}, {mod, {emqx_prometheus_app, []}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -24,7 +24,7 @@
-behaviour(emqx_prometheus_cluster). -behaviour(emqx_prometheus_cluster).
-export([ -export([
fetch_data_from_local_node/0, fetch_from_local_node/1,
fetch_cluster_consistented_data/0, fetch_cluster_consistented_data/0,
aggre_or_zip_init_acc/0, aggre_or_zip_init_acc/0,
logic_sum_metrics/0 logic_sum_metrics/0
@ -90,8 +90,6 @@
-define(HTTP_OPTIONS, [{autoredirect, true}, {timeout, 60000}]). -define(HTTP_OPTIONS, [{autoredirect, true}, {timeout, 60000}]).
-define(LOGICAL_SUM_METRIC_NAMES, []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -194,6 +192,11 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) ->
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
%% TODO: license expiry epoch and cert expiry epoch should be cached %% TODO: license expiry epoch and cert expiry epoch should be cached
ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)), ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)),
ok = add_collect_family(
Callback,
stats_metric_cluster_consistened_meta(),
?MG(stats_data_cluster_consistented, RawData)
),
ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)), ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)),
ok = add_collect_family(Callback, cluster_metric_meta(), ?MG(cluster_data, RawData)), ok = add_collect_family(Callback, cluster_metric_meta(), ?MG(cluster_data, RawData)),
@ -216,8 +219,8 @@ collect_mf(_Registry, _Callback) ->
collect(<<"json">>) -> collect(<<"json">>) ->
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
(maybe_license_collect_json_data(RawData))#{ (maybe_license_collect_json_data(RawData))#{
stats => collect_json_data(?MG(stats_data, RawData)), stats => collect_stats_json_data(RawData),
metrics => collect_json_data(?MG(vm_data, RawData)), metrics => collect_vm_json_data(?MG(vm_data, RawData)),
packets => collect_json_data(?MG(emqx_packet_data, RawData)), packets => collect_json_data(?MG(emqx_packet_data, RawData)),
messages => collect_json_data(?MG(emqx_message_data, RawData)), messages => collect_json_data(?MG(emqx_message_data, RawData)),
delivery => collect_json_data(?MG(emqx_delivery_data, RawData)), delivery => collect_json_data(?MG(emqx_delivery_data, RawData)),
@ -243,24 +246,25 @@ add_collect_family(Name, Data, Callback, Type) ->
Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)). Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)).
%% behaviour %% behaviour
fetch_data_from_local_node() -> fetch_from_local_node(Mode) ->
{node(self()), #{ {node(self()), #{
stats_data => stats_data(), stats_data => stats_data(Mode),
vm_data => vm_data(), vm_data => vm_data(Mode),
cluster_data => cluster_data(), cluster_data => cluster_data(Mode),
%% Metrics %% Metrics
emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta()), emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta(), Mode),
emqx_message_data => emqx_metric_data(message_metric_meta()), emqx_message_data => emqx_metric_data(message_metric_meta(), Mode),
emqx_delivery_data => emqx_metric_data(delivery_metric_meta()), emqx_delivery_data => emqx_metric_data(delivery_metric_meta(), Mode),
emqx_client_data => emqx_metric_data(client_metric_meta()), emqx_client_data => emqx_metric_data(client_metric_meta(), Mode),
emqx_session_data => emqx_metric_data(session_metric_meta()), emqx_session_data => emqx_metric_data(session_metric_meta(), Mode),
emqx_olp_data => emqx_metric_data(olp_metric_meta()), emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode),
emqx_acl_data => emqx_metric_data(acl_metric_meta()), emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode),
emqx_authn_data => emqx_metric_data(authn_metric_meta()) emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode)
}}. }}.
fetch_cluster_consistented_data() -> fetch_cluster_consistented_data() ->
(maybe_license_fetch_data())#{ (maybe_license_fetch_data())#{
stats_data_cluster_consistented => stats_data_cluster_consistented(),
cert_data => cert_data() cert_data => cert_data()
}. }.
@ -280,7 +284,7 @@ aggre_or_zip_init_acc() ->
}. }.
logic_sum_metrics() -> logic_sum_metrics() ->
?LOGICAL_SUM_METRIC_NAMES. [].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Collector %% Collector
@ -469,42 +473,57 @@ emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D)).
stats_metric_meta() -> stats_metric_meta() ->
[ [
%% connections %% connections
{emqx_connections_count, counter, 'connections.count'}, {emqx_connections_count, gauge, 'connections.count'},
{emqx_connections_max, counter, 'connections.max'}, {emqx_connections_max, gauge, 'connections.max'},
{emqx_live_connections_count, counter, 'live_connections.count'}, {emqx_live_connections_count, gauge, 'live_connections.count'},
{emqx_live_connections_max, counter, 'live_connections.max'}, {emqx_live_connections_max, gauge, 'live_connections.max'},
%% sessions %% sessions
{emqx_sessions_count, counter, 'sessions.count'}, {emqx_sessions_count, gauge, 'sessions.count'},
{emqx_sessions_max, counter, 'sessions.max'}, {emqx_sessions_max, gauge, 'sessions.max'},
{emqx_channels_count, counter, 'channels.count'}, {emqx_channels_count, gauge, 'channels.count'},
{emqx_channels_max, counter, 'channels.max'}, {emqx_channels_max, gauge, 'channels.max'},
%% pub/sub stats %% pub/sub stats
{emqx_topics_count, counter, 'topics.count'}, {emqx_suboptions_count, gauge, 'suboptions.count'},
{emqx_topics_max, counter, 'topics.max'}, {emqx_suboptions_max, gauge, 'suboptions.max'},
{emqx_suboptions_count, counter, 'suboptions.count'}, {emqx_subscribers_count, gauge, 'subscribers.count'},
{emqx_suboptions_max, counter, 'suboptions.max'}, {emqx_subscribers_max, gauge, 'subscribers.max'},
{emqx_subscribers_count, counter, 'subscribers.count'}, {emqx_subscriptions_count, gauge, 'subscriptions.count'},
{emqx_subscribers_max, counter, 'subscribers.max'}, {emqx_subscriptions_max, gauge, 'subscriptions.max'},
{emqx_subscriptions_count, counter, 'subscriptions.count'}, {emqx_subscriptions_shared_count, gauge, 'subscriptions.shared.count'},
{emqx_subscriptions_max, counter, 'subscriptions.max'}, {emqx_subscriptions_shared_max, gauge, 'subscriptions.shared.max'},
{emqx_subscriptions_shared_count, counter, 'subscriptions.shared.count'},
{emqx_subscriptions_shared_max, counter, 'subscriptions.shared.max'},
%% retained
{emqx_retained_count, counter, 'retained.count'},
{emqx_retained_max, counter, 'retained.max'},
%% delayed %% delayed
{emqx_delayed_count, counter, 'delayed.count'}, {emqx_delayed_count, gauge, 'delayed.count'},
{emqx_delayed_max, counter, 'delayed.max'} {emqx_delayed_max, gauge, 'delayed.max'}
]. ].
stats_data() -> stats_metric_cluster_consistened_meta() ->
[
%% topics
{emqx_topics_max, gauge, 'topics.max'},
{emqx_topics_count, gauge, 'topics.count'},
%% retained
{emqx_retained_count, gauge, 'retained.count'},
{emqx_retained_max, gauge, 'retained.max'}
].
stats_data(Mode) ->
Stats = emqx_stats:getstats(),
lists:foldl(
fun({Name, _Type, MetricKAtom}, AccIn) ->
AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Stats)}]}
end,
#{},
stats_metric_meta()
).
stats_data_cluster_consistented() ->
Stats = emqx_stats:getstats(), Stats = emqx_stats:getstats(),
lists:foldl( lists:foldl(
fun({Name, _Type, MetricKAtom}, AccIn) -> fun({Name, _Type, MetricKAtom}, AccIn) ->
AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]} AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]}
end, end,
#{}, #{},
stats_metric_meta() stats_metric_cluster_consistened_meta()
). ).
%%======================================== %%========================================
@ -521,11 +540,18 @@ vm_metric_meta() ->
{emqx_vm_used_memory, gauge, 'used_memory'} {emqx_vm_used_memory, gauge, 'used_memory'}
]. ].
vm_data() -> vm_data(Mode) ->
VmStats = emqx_mgmt:vm_stats(), VmStats = emqx_mgmt:vm_stats(),
lists:foldl( lists:foldl(
fun({Name, _Type, MetricKAtom}, AccIn) -> fun({Name, _Type, MetricKAtom}, AccIn) ->
AccIn#{Name => [{[], ?C(MetricKAtom, VmStats)}]} Labels =
case Mode of
node ->
[];
_ ->
[{node, node(self())}]
end,
AccIn#{Name => [{Labels, ?C(MetricKAtom, VmStats)}]}
end, end,
#{}, #{},
vm_metric_meta() vm_metric_meta()
@ -541,23 +567,23 @@ cluster_metric_meta() ->
{emqx_cluster_nodes_stopped, gauge, undefined} {emqx_cluster_nodes_stopped, gauge, undefined}
]. ].
cluster_data() -> cluster_data(Mode) ->
Running = emqx:cluster_nodes(running), Running = emqx:cluster_nodes(running),
Stopped = emqx:cluster_nodes(stopped), Stopped = emqx:cluster_nodes(stopped),
#{ #{
emqx_cluster_nodes_running => [{[], length(Running)}], emqx_cluster_nodes_running => [{with_node_label(Mode, []), length(Running)}],
emqx_cluster_nodes_stopped => [{[], length(Stopped)}] emqx_cluster_nodes_stopped => [{with_node_label(Mode, []), length(Stopped)}]
}. }.
%%======================================== %%========================================
%% Metrics %% Metrics
%%======================================== %%========================================
emqx_metric_data(MetricNameTypeKeyL) -> emqx_metric_data(MetricNameTypeKeyL, Mode) ->
Metrics = emqx_metrics:all(), Metrics = emqx_metrics:all(),
lists:foldl( lists:foldl(
fun({Name, _Type, MetricKAtom}, AccIn) -> fun({Name, _Type, MetricKAtom}, AccIn) ->
AccIn#{Name => [{[], ?C(MetricKAtom, Metrics)}]} AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Metrics)}]}
end, end,
#{}, #{},
MetricNameTypeKeyL MetricNameTypeKeyL
@ -870,10 +896,25 @@ date_to_expiry_epoch(DateTime) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `application/json` %% merge / zip formatting funcs for type `application/json`
collect_stats_json_data(RawData) ->
StatsData = ?MG(stats_data, RawData),
StatsClData = ?MG(stats_data_cluster_consistented, RawData),
D = maps:merge(StatsData, StatsClData),
collect_json_data(D).
%% always return json array %% always return json array
collect_cert_json_data(Data) -> collect_cert_json_data(Data) ->
collect_json_data_(Data). collect_json_data_(Data).
collect_vm_json_data(Data) ->
DataListPerNode = collect_json_data_(Data),
case {?GET_PROM_DATA_MODE(), DataListPerNode} of
{?PROM_DATA_MODE__NODE, [NData | _]} ->
NData;
{_, _} ->
DataListPerNode
end.
collect_json_data(Data0) -> collect_json_data(Data0) ->
DataListPerNode = collect_json_data_(Data0), DataListPerNode = collect_json_data_(Data0),
case {?GET_PROM_DATA_MODE(), DataListPerNode} of case {?GET_PROM_DATA_MODE(), DataListPerNode} of
@ -913,6 +954,13 @@ zip_json_prom_stats_metrics(Key, Points, AllResultedAcc) ->
metrics_name(MetricsAll) -> metrics_name(MetricsAll) ->
[Name || {Name, _, _} <- MetricsAll]. [Name || {Name, _, _} <- MetricsAll].
with_node_label(?PROM_DATA_MODE__NODE, Labels) ->
Labels;
with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) ->
Labels;
with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) ->
[{node, node(self())} | Labels].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% bpapi %% bpapi

View File

@ -26,7 +26,7 @@
%% for bpapi %% for bpapi
-behaviour(emqx_prometheus_cluster). -behaviour(emqx_prometheus_cluster).
-export([ -export([
fetch_data_from_local_node/0, fetch_from_local_node/1,
fetch_cluster_consistented_data/0, fetch_cluster_consistented_data/0,
aggre_or_zip_init_acc/0, aggre_or_zip_init_acc/0,
logic_sum_metrics/0 logic_sum_metrics/0
@ -64,8 +64,8 @@
| emqx_authz_status | emqx_authz_status
| emqx_authz_nomatch | emqx_authz_nomatch
| emqx_authz_total | emqx_authz_total
| emqx_authz_success | emqx_authz_allow
| emqx_authz_failed. | emqx_authz_deny.
%% Please don't remove this attribute, prometheus uses it to %% Please don't remove this attribute, prometheus uses it to
%% automatically register collectors. %% automatically register collectors.
@ -126,10 +126,10 @@ collect_metrics(Name, Metrics) ->
collect_auth(Name, Metrics). collect_auth(Name, Metrics).
%% behaviour %% behaviour
fetch_data_from_local_node() -> fetch_from_local_node(Mode) ->
{node(self()), #{ {node(self()), #{
authn_data => authn_data(), authn_data => authn_data(Mode),
authz_data => authz_data() authz_data => authz_data(Mode)
}}. }}.
fetch_cluster_consistented_data() -> fetch_cluster_consistented_data() ->
@ -186,9 +186,9 @@ collect_auth(K = emqx_authz_nomatch, Data) ->
counter_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_auth(K = emqx_authz_total, Data) -> collect_auth(K = emqx_authz_total, Data) ->
counter_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_auth(K = emqx_authz_success, Data) -> collect_auth(K = emqx_authz_allow, Data) ->
counter_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_auth(K = emqx_authz_failed, Data) -> collect_auth(K = emqx_authz_deny, Data) ->
counter_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
%%==================== %%====================
%% Authz rules count %% Authz rules count
@ -224,38 +224,41 @@ authn_metric_meta() ->
authn_metric(names) -> authn_metric(names) ->
emqx_prometheus_cluster:metric_names(authn_metric_meta()). emqx_prometheus_cluster:metric_names(authn_metric_meta()).
-spec authn_data() -> #{Key => [Point]} when -spec authn_data(atom()) -> #{Key => [Point]} when
Key :: authn_metric_name(), Key :: authn_metric_name(),
Point :: {[Label], Metric}, Point :: {[Label], Metric},
Label :: IdLabel, Label :: IdLabel,
IdLabel :: {id, AuthnName :: binary()}, IdLabel :: {id, AuthnName :: binary()},
Metric :: number(). Metric :: number().
authn_data() -> authn_data(Mode) ->
Authns = emqx_config:get([authentication]), Authns = emqx_config:get([authentication]),
lists:foldl( lists:foldl(
fun(Key, AccIn) -> fun(Key, AccIn) ->
AccIn#{Key => authn_backend_to_points(Key, Authns)} AccIn#{Key => authn_backend_to_points(Mode, Key, Authns)}
end, end,
#{}, #{},
authn_metric(names) authn_metric(names)
). ).
-spec authn_backend_to_points(Key, list(Authn)) -> list(Point) when -spec authn_backend_to_points(atom(), Key, list(Authn)) -> list(Point) when
Key :: authn_metric_name(), Key :: authn_metric_name(),
Authn :: map(), Authn :: map(),
Point :: {[Label], Metric}, Point :: {[Label], Metric},
Label :: IdLabel, Label :: IdLabel,
IdLabel :: {id, AuthnName :: binary()}, IdLabel :: {id, AuthnName :: binary()},
Metric :: number(). Metric :: number().
authn_backend_to_points(Key, Authns) -> authn_backend_to_points(Mode, Key, Authns) ->
do_authn_backend_to_points(Key, Authns, []). do_authn_backend_to_points(Mode, Key, Authns, []).
do_authn_backend_to_points(_K, [], AccIn) -> do_authn_backend_to_points(_Mode, _K, [], AccIn) ->
lists:reverse(AccIn); lists:reverse(AccIn);
do_authn_backend_to_points(K, [Authn | Rest], AccIn) -> do_authn_backend_to_points(Mode, K, [Authn | Rest], AccIn) ->
Id = authenticator_id(Authn), Id = authenticator_id(Authn),
Point = {[{id, Id}], do_metric(K, Authn, lookup_authn_metrics_local(Id))}, Point = {
do_authn_backend_to_points(K, Rest, [Point | AccIn]). with_node_label(Mode, [{id, Id}]),
do_metric(K, Authn, lookup_authn_metrics_local(Id))
},
do_authn_backend_to_points(Mode, K, Rest, [Point | AccIn]).
lookup_authn_metrics_local(Id) -> lookup_authn_metrics_local(Id) ->
case emqx_authn_api:lookup_from_local_node(?GLOBAL, Id) of case emqx_authn_api:lookup_from_local_node(?GLOBAL, Id) of
@ -310,45 +313,48 @@ authz_metric_meta() ->
{emqx_authz_status, gauge}, {emqx_authz_status, gauge},
{emqx_authz_nomatch, counter}, {emqx_authz_nomatch, counter},
{emqx_authz_total, counter}, {emqx_authz_total, counter},
{emqx_authz_success, counter}, {emqx_authz_allow, counter},
{emqx_authz_failed, counter} {emqx_authz_deny, counter}
]. ].
authz_metric(names) -> authz_metric(names) ->
emqx_prometheus_cluster:metric_names(authz_metric_meta()). emqx_prometheus_cluster:metric_names(authz_metric_meta()).
-spec authz_data() -> #{Key => [Point]} when -spec authz_data(atom()) -> #{Key => [Point]} when
Key :: authz_metric_name(), Key :: authz_metric_name(),
Point :: {[Label], Metric}, Point :: {[Label], Metric},
Label :: TypeLabel, Label :: TypeLabel,
TypeLabel :: {type, AuthZType :: binary()}, TypeLabel :: {type, AuthZType :: binary()},
Metric :: number(). Metric :: number().
authz_data() -> authz_data(Mode) ->
Authzs = emqx_config:get([authorization, sources]), Authzs = emqx_config:get([authorization, sources]),
lists:foldl( lists:foldl(
fun(Key, AccIn) -> fun(Key, AccIn) ->
AccIn#{Key => authz_backend_to_points(Key, Authzs)} AccIn#{Key => authz_backend_to_points(Mode, Key, Authzs)}
end, end,
#{}, #{},
authz_metric(names) authz_metric(names)
). ).
-spec authz_backend_to_points(Key, list(Authz)) -> list(Point) when -spec authz_backend_to_points(atom(), Key, list(Authz)) -> list(Point) when
Key :: authz_metric_name(), Key :: authz_metric_name(),
Authz :: map(), Authz :: map(),
Point :: {[Label], Metric}, Point :: {[Label], Metric},
Label :: TypeLabel, Label :: TypeLabel,
TypeLabel :: {type, AuthZType :: binary()}, TypeLabel :: {type, AuthZType :: binary()},
Metric :: number(). Metric :: number().
authz_backend_to_points(Key, Authzs) -> authz_backend_to_points(Mode, Key, Authzs) ->
do_authz_backend_to_points(Key, Authzs, []). do_authz_backend_to_points(Mode, Key, Authzs, []).
do_authz_backend_to_points(_K, [], AccIn) -> do_authz_backend_to_points(_Mode, _K, [], AccIn) ->
lists:reverse(AccIn); lists:reverse(AccIn);
do_authz_backend_to_points(K, [Authz | Rest], AccIn) -> do_authz_backend_to_points(Mode, K, [Authz | Rest], AccIn) ->
Type = maps:get(type, Authz), Type = maps:get(type, Authz),
Point = {[{type, Type}], do_metric(K, Authz, lookup_authz_metrics_local(Type))}, Point = {
do_authz_backend_to_points(K, Rest, [Point | AccIn]). with_node_label(Mode, [{type, Type}]),
do_metric(K, Authz, lookup_authz_metrics_local(Type))
},
do_authz_backend_to_points(Mode, K, Rest, [Point | AccIn]).
lookup_authz_metrics_local(Type) -> lookup_authz_metrics_local(Type) ->
case emqx_authz_api_sources:lookup_from_local_node(Type) of case emqx_authz_api_sources:lookup_from_local_node(Type) of
@ -357,8 +363,8 @@ lookup_authz_metrics_local(Type) ->
emqx_authz_status => emqx_prometheus_cluster:status_to_number(Status), emqx_authz_status => emqx_prometheus_cluster:status_to_number(Status),
emqx_authz_nomatch => ?MG0(nomatch, Counters), emqx_authz_nomatch => ?MG0(nomatch, Counters),
emqx_authz_total => ?MG0(total, Counters), emqx_authz_total => ?MG0(total, Counters),
emqx_authz_success => ?MG0(success, Counters), emqx_authz_allow => ?MG0(allow, Counters),
emqx_authz_failed => ?MG0(failed, Counters) emqx_authz_deny => ?MG0(deny, Counters)
}; };
{error, _Reason} -> {error, _Reason} ->
maps:from_keys(authz_metric(names) -- [emqx_authz_enable], 0) maps:from_keys(authz_metric(names) -- [emqx_authz_enable], 0)
@ -479,5 +485,14 @@ mnesia_size(Tab) ->
do_metric(emqx_authn_enable, #{enable := B}, _) -> do_metric(emqx_authn_enable, #{enable := B}, _) ->
emqx_prometheus_cluster:boolean_to_number(B); emqx_prometheus_cluster:boolean_to_number(B);
do_metric(emqx_authz_enable, #{enable := B}, _) ->
emqx_prometheus_cluster:boolean_to_number(B);
do_metric(K, _, Metrics) -> do_metric(K, _, Metrics) ->
?MG0(K, Metrics). ?MG0(K, Metrics).
with_node_label(?PROM_DATA_MODE__NODE, Labels) ->
Labels;
with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) ->
Labels;
with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) ->
[{node, node(self())} | Labels].

View File

@ -16,6 +16,7 @@
-module(emqx_prometheus_cluster). -module(emqx_prometheus_cluster).
-include("emqx_prometheus.hrl"). -include("emqx_prometheus.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-export([ -export([
raw_data/2, raw_data/2,
@ -23,7 +24,7 @@
collect_json_data/2, collect_json_data/2,
aggre_cluster/3, aggre_cluster/3,
with_node_name_label/2, %% with_node_name_label/2,
point_to_map_fun/1, point_to_map_fun/1,
@ -34,33 +35,35 @@
-callback fetch_cluster_consistented_data() -> map(). -callback fetch_cluster_consistented_data() -> map().
-callback fetch_data_from_local_node() -> {node(), map()}. -callback fetch_from_local_node(atom()) -> {node(), map()}.
-callback aggre_or_zip_init_acc() -> map(). -callback aggre_or_zip_init_acc() -> map().
-callback logic_sum_metrics() -> list().
-define(MG(K, MAP), maps:get(K, MAP)). -define(MG(K, MAP), maps:get(K, MAP)).
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)). -define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
raw_data(Module, undefined) -> raw_data(Module, undefined) ->
%% TODO: for push gateway, the format mode should be configurable %% TODO: for push gateway, the format mode should be configurable
raw_data(Module, ?PROM_DATA_MODE__NODE); raw_data(Module, ?PROM_DATA_MODE__NODE);
raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED) -> raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED = Mode) ->
AllNodesMetrics = aggre_cluster(Module), AllNodesMetrics = aggre_cluster(Module, Mode),
Cluster = Module:fetch_cluster_consistented_data(), Cluster = Module:fetch_cluster_consistented_data(),
maps:merge(AllNodesMetrics, Cluster); maps:merge(AllNodesMetrics, Cluster);
raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) -> raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED = Mode) ->
AllNodesMetrics = with_node_name_label(Module), AllNodesMetrics = zip_cluster_data(Module, Mode),
Cluster = Module:fetch_cluster_consistented_data(), Cluster = Module:fetch_cluster_consistented_data(),
maps:merge(AllNodesMetrics, Cluster); maps:merge(AllNodesMetrics, Cluster);
raw_data(Module, ?PROM_DATA_MODE__NODE) -> raw_data(Module, ?PROM_DATA_MODE__NODE = Mode) ->
{_Node, LocalNodeMetrics} = Module:fetch_data_from_local_node(), {_Node, LocalNodeMetrics} = Module:fetch_from_local_node(Mode),
Cluster = Module:fetch_cluster_consistented_data(), Cluster = Module:fetch_cluster_consistented_data(),
maps:merge(LocalNodeMetrics, Cluster). maps:merge(LocalNodeMetrics, Cluster).
metrics_data_from_all_nodes(Module) -> fetch_data_from_all_nodes(Module, Mode) ->
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
_ResL = emqx_prometheus_proto_v2:raw_prom_data( _ResL = emqx_prometheus_proto_v2:raw_prom_data(
Nodes, Module, fetch_data_from_local_node, [] Nodes, Module, fetch_from_local_node, [Mode]
). ).
collect_json_data(Data, Func) when is_function(Func, 3) -> collect_json_data(Data, Func) when is_function(Func, 3) ->
@ -74,17 +77,13 @@ collect_json_data(Data, Func) when is_function(Func, 3) ->
collect_json_data(_, _) -> collect_json_data(_, _) ->
error(badarg). error(badarg).
aggre_cluster(Module) -> aggre_cluster(Module, Mode) ->
do_aggre_cluster( do_aggre_cluster(
Module:logic_sum_metrics(), Module:logic_sum_metrics(),
metrics_data_from_all_nodes(Module), fetch_data_from_all_nodes(Module, Mode),
Module:aggre_or_zip_init_acc() Module:aggre_or_zip_init_acc()
). ).
with_node_name_label(Module) ->
ResL = metrics_data_from_all_nodes(Module),
do_with_node_name_label(ResL, Module:aggre_or_zip_init_acc()).
aggre_cluster(LogicSumKs, ResL, Init) -> aggre_cluster(LogicSumKs, ResL, Init) ->
do_aggre_cluster(LogicSumKs, ResL, Init). do_aggre_cluster(LogicSumKs, ResL, Init).
@ -119,61 +118,65 @@ aggre_metric(LogicSumKs, NodeMetrics, AccIn0) ->
do_aggre_metric(K, LogicSumKs, NodeMetrics, AccL) -> do_aggre_metric(K, LogicSumKs, NodeMetrics, AccL) ->
lists:foldl( lists:foldl(
fun({Labels, Metric}, AccIn) -> fun(Point = {_Labels, _Metric}, AccIn) ->
NMetric = sum(K, LogicSumKs, Point, AccIn)
case lists:member(K, LogicSumKs) of
true ->
logic_sum(Metric, ?PG0(Labels, AccIn));
false ->
Metric + ?PG0(Labels, AccIn)
end,
[{Labels, NMetric} | AccIn]
end, end,
AccL, AccL,
NodeMetrics NodeMetrics
). ).
with_node_name_label(ResL, Init) -> sum(K, LogicSumKs, {Labels, Metric} = Point, MetricAccL) ->
do_with_node_name_label(ResL, Init). case lists:keytake(Labels, 1, MetricAccL) of
{value, {Labels, MetricAcc}, NMetricAccL} ->
NPoint = {Labels, do_sum(K, LogicSumKs, Metric, MetricAcc)},
[NPoint | NMetricAccL];
false ->
[Point | MetricAccL]
end.
do_with_node_name_label([], AccIn) -> do_sum(K, LogicSumKs, Metric, MetricAcc) ->
case lists:member(K, LogicSumKs) of
true ->
logic_sum(Metric, MetricAcc);
false ->
Metric + MetricAcc
end.
zip_cluster_data(Module, Mode) ->
zip_cluster(
fetch_data_from_all_nodes(Module, Mode),
Module:aggre_or_zip_init_acc()
).
zip_cluster([], AccIn) ->
AccIn; AccIn;
do_with_node_name_label([{ok, {NodeName, NodeMetric}} | Rest], AccIn) -> zip_cluster([{ok, {_NodeName, NodeMetric}} | Rest], AccIn) ->
do_with_node_name_label( zip_cluster(
Rest, Rest,
maps:fold( maps:fold(
fun(K, V, AccIn0) -> fun(K, V, AccIn0) ->
AccIn0#{ AccIn0#{
K => zip_with_node_name(NodeName, V, ?MG(K, AccIn0)) K => do_zip_cluster(V, ?MG(K, AccIn0))
} }
end, end,
AccIn, AccIn,
NodeMetric NodeMetric
) )
); );
do_with_node_name_label([{_, _} | Rest], AccIn) -> zip_cluster([{_, _} | Rest], AccIn) ->
do_with_node_name_label(Rest, AccIn). zip_cluster(Rest, AccIn).
zip_with_node_name(NodeName, NodeMetrics, AccIn0) -> do_zip_cluster(NodeMetrics, AccIn0) ->
lists:foldl( lists:foldl(
fun(K, AccIn) -> fun(K, AccIn) ->
NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)), AccMetricL = ?MG(K, AccIn),
NAccL = ?MG(K, NodeMetrics) ++ AccMetricL,
AccIn#{K => NAccL} AccIn#{K => NAccL}
end, end,
AccIn0, AccIn0,
maps:keys(NodeMetrics) maps:keys(NodeMetrics)
). ).
do_zip_with_node_name(NodeName, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NLabels = [{node, NodeName} | Labels],
[{NLabels, Metric} | AccIn]
end,
AccL,
NodeMetrics
).
point_to_map_fun(Key) -> point_to_map_fun(Key) ->
fun({Lables, Metric}, AccIn2) -> fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables), LablesKVMap = maps:from_list(Lables),
@ -192,11 +195,11 @@ logic_sum(_, _) ->
boolean_to_number(true) -> 1; boolean_to_number(true) -> 1;
boolean_to_number(false) -> 0. boolean_to_number(false) -> 0.
status_to_number(connected) -> 1; status_to_number(?status_connected) -> 1;
%% for auth status_to_number(?status_connecting) -> 0;
status_to_number(stopped) -> 0; status_to_number(?status_disconnected) -> 0;
%% for data_integration status_to_number(?rm_status_stopped) -> 0;
status_to_number(disconnected) -> 0. status_to_number(_) -> 0.
metric_names(MetricWithType) when is_list(MetricWithType) -> metric_names(MetricWithType) when is_list(MetricWithType) ->
[Name || {Name, _Type} <- MetricWithType]. [Name || {Name, _Type} <- MetricWithType].

View File

@ -31,7 +31,7 @@
%% for bpapi %% for bpapi
-behaviour(emqx_prometheus_cluster). -behaviour(emqx_prometheus_cluster).
-export([ -export([
fetch_data_from_local_node/0, fetch_from_local_node/1,
fetch_cluster_consistented_data/0, fetch_cluster_consistented_data/0,
aggre_or_zip_init_acc/0, aggre_or_zip_init_acc/0,
logic_sum_metrics/0 logic_sum_metrics/0
@ -69,21 +69,24 @@
%% Callback for emqx_prometheus_cluster %% Callback for emqx_prometheus_cluster
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
fetch_data_from_local_node() -> -define(ROOT_KEY_ACTIONS, actions).
fetch_from_local_node(Mode) ->
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(), Bridges = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS),
Connectors = emqx_connector:list(),
{node(self()), #{ {node(self()), #{
rule_metric_data => rule_metric_data(Rules), rule_metric_data => rule_metric_data(Mode, Rules),
action_metric_data => action_metric_data(Bridges), action_metric_data => action_metric_data(Mode, Bridges),
connector_metric_data => connector_metric_data(Bridges) connector_metric_data => connector_metric_data(Mode, Connectors)
}}. }}.
fetch_cluster_consistented_data() -> fetch_cluster_consistented_data() ->
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(), Connectors = emqx_connector:list(),
(maybe_collect_schema_registry())#{ (maybe_collect_schema_registry())#{
rules_ov_data => rules_ov_data(Rules), rules_ov_data => rules_ov_data(Rules),
connectors_ov_data => connectors_ov_data(Bridges) connectors_ov_data => connectors_ov_data(Connectors)
}. }.
aggre_or_zip_init_acc() -> aggre_or_zip_init_acc() ->
@ -293,10 +296,10 @@ connectors_ov_metric_meta() ->
connectors_ov_metric(names) -> connectors_ov_metric(names) ->
emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()). emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()).
connectors_ov_data(Brdiges) -> connectors_ov_data(Connectors) ->
#{ #{
%% Both Bridge V1 and V2 %% Both Bridge V1 and V2
emqx_connectors_count => erlang:length(Brdiges) emqx_connectors_count => erlang:length(Connectors)
}. }.
%%======================================== %%========================================
@ -325,26 +328,26 @@ rule_metric_meta() ->
rule_metric(names) -> rule_metric(names) ->
emqx_prometheus_cluster:metric_names(rule_metric_meta()). emqx_prometheus_cluster:metric_names(rule_metric_meta()).
rule_metric_data(Rules) -> rule_metric_data(Mode, Rules) ->
lists:foldl( lists:foldl(
fun(#{id := Id} = Rule, AccIn) -> fun(#{id := Id} = Rule, AccIn) ->
merge_acc_with_rules(Id, get_metric(Rule), AccIn) merge_acc_with_rules(Mode, Id, get_metric(Rule), AccIn)
end, end,
maps:from_keys(rule_metric(names), []), maps:from_keys(rule_metric(names), []),
Rules Rules
). ).
merge_acc_with_rules(Id, RuleMetrics, PointsAcc) -> merge_acc_with_rules(Mode, Id, RuleMetrics, PointsAcc) ->
maps:fold( maps:fold(
fun(K, V, AccIn) -> fun(K, V, AccIn) ->
AccIn#{K => [rule_point(Id, V) | ?MG(K, AccIn)]} AccIn#{K => [rule_point(Mode, Id, V) | ?MG(K, AccIn)]}
end, end,
PointsAcc, PointsAcc,
RuleMetrics RuleMetrics
). ).
rule_point(Id, V) -> rule_point(Mode, Id, V) ->
{[{id, Id}], V}. {with_node_label(Mode, [{id, Id}]), V}.
get_metric(#{id := Id, enable := Bool} = _Rule) -> get_metric(#{id := Id, enable := Bool} = _Rule) ->
case emqx_metrics_worker:get_metrics(rule_metrics, Id) of case emqx_metrics_worker:get_metrics(rule_metrics, Id) of
@ -393,52 +396,48 @@ action_metric_meta() ->
action_metric(names) -> action_metric(names) ->
emqx_prometheus_cluster:metric_names(action_metric_meta()). emqx_prometheus_cluster:metric_names(action_metric_meta()).
action_metric_data(Bridges) -> action_metric_data(Mode, Bridges) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name} = _Bridge, AccIn) -> fun(#{type := Type, name := Name} = _Bridge, AccIn) ->
Id = emqx_bridge_resource:bridge_id(Type, Name), Id = emqx_bridge_resource:bridge_id(Type, Name),
merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) merge_acc_with_bridges(Mode, Id, get_bridge_metric(Type, Name), AccIn)
end, end,
maps:from_keys(action_metric(names), []), maps:from_keys(action_metric(names), []),
Bridges Bridges
). ).
merge_acc_with_bridges(Id, BridgeMetrics, PointsAcc) -> merge_acc_with_bridges(Mode, Id, BridgeMetrics, PointsAcc) ->
maps:fold( maps:fold(
fun(K, V, AccIn) -> fun(K, V, AccIn) ->
AccIn#{K => [action_point(Id, V) | ?MG(K, AccIn)]} AccIn#{K => [action_point(Mode, Id, V) | ?MG(K, AccIn)]}
end, end,
PointsAcc, PointsAcc,
BridgeMetrics BridgeMetrics
). ).
action_point(Id, V) -> action_point(Mode, Id, V) ->
{[{id, Id}], V}. {with_node_label(Mode, [{id, Id}]), V}.
get_bridge_metric(Type, Name) -> get_bridge_metric(Type, Name) ->
case emqx_bridge:get_metrics(Type, Name) of #{counters := Counters, gauges := Gauges} = emqx_bridge_v2:get_metrics(Type, Name),
#{counters := Counters, gauges := Gauges} -> #{
#{ emqx_action_matched => ?MG0(matched, Counters),
emqx_action_matched => ?MG0(matched, Counters), emqx_action_dropped => ?MG0(dropped, Counters),
emqx_action_dropped => ?MG0(dropped, Counters), emqx_action_success => ?MG0(success, Counters),
emqx_action_success => ?MG0(success, Counters), emqx_action_failed => ?MG0(failed, Counters),
emqx_action_failed => ?MG0(failed, Counters), emqx_action_inflight => ?MG0(inflight, Gauges),
emqx_action_inflight => ?MG0(inflight, Gauges), emqx_action_received => ?MG0(received, Counters),
emqx_action_received => ?MG0(received, Counters), emqx_action_late_reply => ?MG0(late_reply, Counters),
emqx_action_late_reply => ?MG0(late_reply, Counters), emqx_action_retried => ?MG0(retried, Counters),
emqx_action_retried => ?MG0(retried, Counters), emqx_action_retried_success => ?MG0('retried.success', Counters),
emqx_action_retried_success => ?MG0('retried.success', Counters), emqx_action_retried_failed => ?MG0('retried.failed', Counters),
emqx_action_retried_failed => ?MG0('retried.failed', Counters), emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters),
emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters), emqx_action_dropped_resource_not_found => ?MG0('dropped.resource_not_found', Counters),
emqx_action_dropped_resource_not_found => ?MG0( emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters),
'dropped.resource_not_found', Counters emqx_action_dropped_other => ?MG0('dropped.other', Counters),
), emqx_action_dropped_expired => ?MG0('dropped.expired', Counters),
emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters), emqx_action_queuing => ?MG0(queuing, Gauges)
emqx_action_dropped_other => ?MG0('dropped.other', Counters), }.
emqx_action_dropped_expired => ?MG0('dropped.expired', Counters),
emqx_action_queuing => ?MG0(queuing, Gauges)
}
end.
%%==================== %%====================
%% Connector Metric %% Connector Metric
@ -453,29 +452,29 @@ connector_metric_meta() ->
connectr_metric(names) -> connectr_metric(names) ->
emqx_prometheus_cluster:metric_names(connector_metric_meta()). emqx_prometheus_cluster:metric_names(connector_metric_meta()).
connector_metric_data(Bridges) -> connector_metric_data(Mode, Connectors) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name} = Bridge, AccIn) -> fun(#{type := Type, name := Name} = Connector, AccIn) ->
Id = emqx_bridge_resource:bridge_id(Type, Name), Id = emqx_connector_resource:connector_id(Type, Name),
merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn) merge_acc_with_connectors(Mode, Id, get_connector_status(Connector), AccIn)
end, end,
maps:from_keys(connectr_metric(names), []), maps:from_keys(connectr_metric(names), []),
Bridges Connectors
). ).
merge_acc_with_connectors(Id, ConnectorMetrics, PointsAcc) -> merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
maps:fold( maps:fold(
fun(K, V, AccIn) -> fun(K, V, AccIn) ->
AccIn#{K => [connector_point(Id, V) | ?MG(K, AccIn)]} AccIn#{K => [connector_point(Mode, Id, V) | ?MG(K, AccIn)]}
end, end,
PointsAcc, PointsAcc,
ConnectorMetrics ConnectorMetrics
). ).
connector_point(Id, V) -> connector_point(Mode, Id, V) ->
{[{id, Id}], V}. {with_node_label(Mode, [{id, Id}]), V}.
get_connector_status(#{resource_data := ResourceData} = _Bridge) -> get_connector_status(#{resource_data := ResourceData} = _Connector) ->
Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
Status = ?MG(status, ResourceData), Status = ?MG(status, ResourceData),
#{ #{
@ -532,3 +531,13 @@ zip_json_data_integration_metrics(Key, Points, [] = _AccIn) ->
zip_json_data_integration_metrics(Key, Points, AllResultedAcc) -> zip_json_data_integration_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points), ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult). lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper funcs
with_node_label(?PROM_DATA_MODE__NODE, Labels) ->
Labels;
with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) ->
Labels;
with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) ->
[{node, node(self())} | Labels].