From da755b88c7aa55e8e3a5eb31cc8a9217b7a362d2 Mon Sep 17 00:00:00 2001 From: tigercl Date: Thu, 28 Feb 2019 18:25:17 +0800 Subject: [PATCH] Add monitors and alarm handler (#2266) * Add monitors and alarm handler --- Makefile | 3 +- etc/emqx.conf | 57 ++++++++++ priv/emqx.schema | 67 ++++++++++++ src/emqx.app.src | 2 +- src/emqx_alarm_handler.erl | 169 ++++++++++++++++++++++++++++++ src/emqx_alarm_mgr.erl | 144 ------------------------- src/emqx_app.erl | 4 + src/emqx_kernel_sup.erl | 1 - src/emqx_logger_handler.erl | 42 ++++++++ src/emqx_os_mon.erl | 153 +++++++++++++++++++++++++++ src/emqx_sys_mon.erl | 3 +- src/emqx_sys_sup.erl | 32 +++--- src/emqx_vm_mon.erl | 118 +++++++++++++++++++++ test/emqx_alarm_handler_SUITE.erl | 145 +++++++++++++++++++++++++ test/emqx_broker_SUITE.erl | 15 +-- test/emqx_mqtt_packet_SUITE.erl | 1 + test/emqx_os_mon_SUITE.erl | 56 ++++++++++ test/emqx_vm_mon_SUITE.erl | 50 +++++++++ 18 files changed, 888 insertions(+), 174 deletions(-) create mode 100644 src/emqx_alarm_handler.erl delete mode 100644 src/emqx_alarm_mgr.erl create mode 100644 src/emqx_logger_handler.erl create mode 100644 src/emqx_os_mon.erl create mode 100644 src/emqx_vm_mon.erl create mode 100644 test/emqx_alarm_handler_SUITE.erl create mode 100644 test/emqx_os_mon_SUITE.erl create mode 100644 test/emqx_vm_mon_SUITE.erl diff --git a/Makefile b/Makefile index 0b9e6cc3e..fe94d309d 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \ - emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message + emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \ + emqx_vm_mon emqx_alarm_handler CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/etc/emqx.conf b/etc/emqx.conf index 92d4f59b8..5361ab7da 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2055,4 +2055,61 @@ sysmon.busy_port = false ## Value: true | false sysmon.busy_dist_port = true +## The time interval for the periodic cpu check +## +## Value: Duration +## -h: hour, e.g. '2h' for 2 hours +## -m: minute, e.g. '5m' for 5 minutes +## -s: second, e.g. '30s' for 30 seconds +## +## Default: 60s +os_mon.cpu_check_interval = 60s + +## The threshold, as percentage of system cpu, for how much system cpu can be used before the corresponding alarm is set. +## +## Default: 80% +os_mon.cpu_high_watermark = 80% + +## The threshold, as percentage of system cpu, for how much system cpu can be used before the corresponding alarm is clear. +## +## Default: 60% +os_mon.cpu_low_watermark = 60% + +## The time interval for the periodic memory check +## +## Value: Duration +## -h: hour, e.g. '2h' for 2 hours +## -m: minute, e.g. '5m' for 5 minutes +## -s: second, e.g. '30s' for 30 seconds +## +## Default: 60s +os_mon.mem_check_interval = 60s + +## The threshold, as percentage of system memory, for how much system memory can be allocated before the corresponding alarm is set. +## +## Default: 70% +os_mon.sysmem_high_watermark = 70% + +## The threshold, as percentage of system memory, for how much system memory can be allocated by one Erlang process before the corresponding alarm is set. +## +## Default: 5% +os_mon.procmem_high_watermark = 5% + +## The time interval for the periodic process limit check +## +## Value: Duration +## +## Default: 30s +vm_mon.check_interval = 30s + +## The threshold, as percentage of processes, for how many processes can simultaneously exist at the local node before the corresponding alarm is set. +## +## Default: 80% +vm_mon.process_high_watermark = 80% + +## The threshold, as percentage of processes, for how many processes can simultaneously exist at the local node before the corresponding alarm is clear. +## +## Default: 60% +vm_mon.process_low_watermark = 60% + {{ additional_configs }} diff --git a/priv/emqx.schema b/priv/emqx.schema index e490e024f..708c2ed13 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1886,3 +1886,70 @@ end}. {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. + +%%-------------------------------------------------------------------- +%% Operating System Monitor +%%-------------------------------------------------------------------- + +{mapping, "os_mon.cpu_check_interval", "emqx.os_mon", [ + {default, 60}, + {datatype, {duration, s}} +]}. + +{mapping, "os_mon.cpu_high_watermark", "emqx.os_mon", [ + {default, "80%"}, + {datatype, {percent, float}} +]}. + +{mapping, "os_mon.cpu_low_watermark", "emqx.os_mon", [ + {default, "60%"}, + {datatype, {percent, float}} +]}. + +{mapping, "os_mon.mem_check_interval", "emqx.os_mon", [ + {default, 60}, + {datatype, {duration, s}} +]}. + +{mapping, "os_mon.sysmem_high_watermark", "emqx.os_mon", [ + {default, "70%"}, + {datatype, {percent, float}} +]}. + +{mapping, "os_mon.procmem_high_watermark", "emqx.os_mon", [ + {default, "5%"}, + {datatype, {percent, float}} +]}. + +{translation, "emqx.os_mon", fun(Conf) -> + [{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)}, + {cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf)}, + {cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf)}, + {mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)}, + {sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf)}, + {procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf)}] +end}. + +%%-------------------------------------------------------------------- +%% VM Monitor +%%-------------------------------------------------------------------- +{mapping, "vm_mon.check_interval", "emqx.vm_mon", [ + {default, 30}, + {datatype, {duration, s}} +]}. + +{mapping, "vm_mon.process_high_watermark", "emqx.vm_mon", [ + {default, "80%"}, + {datatype, {percent, float}} +]}. + +{mapping, "vm_mon.process_low_watermark", "emqx.vm_mon", [ + {default, "60%"}, + {datatype, {percent, float}} +]}. + +{translation, "emqx.vm_mon", fun(Conf) -> + [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, + {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)}, + {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}] +end}. \ No newline at end of file diff --git a/src/emqx.app.src b/src/emqx.app.src index f9c174ae3..c88be22c7 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -4,7 +4,7 @@ {modules,[]}, {registered,[emqx_sup]}, {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,cowboy, - replayq]}, + replayq,sasl,os_mon]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl new file mode 100644 index 000000000..8a9260ab9 --- /dev/null +++ b/src/emqx_alarm_handler.erl @@ -0,0 +1,169 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_alarm_handler). + +-behaviour(gen_event). + +-include("emqx.hrl"). +-include("logger.hrl"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([init/1, + handle_event/2, + handle_call/2, + handle_info/2, + terminate/2]). + +-export([load/0, + get_alarms/0]). + +-record(common_alarm, {id, desc}). +-record(alarm_history, {id, clear_at}). + +-define(ALARM_TAB, emqx_alarm). +-define(ALARM_HISTORY_TAB, emqx_alarm_history). + +%%------------------------------------------------------------------------------ +%% Mnesia bootstrap +%%------------------------------------------------------------------------------ + +mnesia(boot) -> + ok = ekka_mnesia:create_table(?ALARM_TAB, [ + {type, set}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, common_alarm}, + {attributes, record_info(fields, common_alarm)}]), + ok = ekka_mnesia:create_table(?ALARM_HISTORY_TAB, [ + {type, set}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, alarm_history}, + {attributes, record_info(fields, alarm_history)}]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?ALARM_TAB), + ok = ekka_mnesia:copy_table(?ALARM_HISTORY_TAB). + +%%---------------------------------------------------------------------- +%% API +%%---------------------------------------------------------------------- + +load() -> + gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {?MODULE, []}). + +get_alarms() -> + gen_event:call(alarm_handler, ?MODULE, get_alarms). + +%%---------------------------------------------------------------------- +%% gen_event callbacks +%%---------------------------------------------------------------------- + +init({_Args, {alarm_handler, ExistingAlarms}}) -> + init_tables(ExistingAlarms), + {ok, []}; +init(_) -> + init_tables([]), + {ok, []}. + +handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) -> + handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State); +handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> + ?LOG(notice, "Alarm report: set ~p", [Alarm]), + case encode_alarm(Alarm) of + {ok, Json} -> + emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json)); + {error, Reason} -> + ?LOG(error, "Failed to encode alarm: ~p", [Reason]) + end, + set_alarm_(AlarmId, AlarmDesc), + {ok, State}; +handle_event({clear_alarm, AlarmId}, State) -> + ?LOG(notice, "Alarm report: clear ~p", [AlarmId]), + emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)), + clear_alarm_(AlarmId), + {ok, State}; +handle_event(_, State) -> + {ok, State}. + +handle_info(_, State) -> {ok, State}. + +handle_call(get_alarms, State) -> + {ok, get_alarms_(), State}; +handle_call(_Query, State) -> {ok, {error, bad_query}, State}. + +terminate(swap, _State) -> + {emqx_alarm_handler, get_alarms_()}; +terminate(_, _) -> + ok. + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +init_tables(ExistingAlarms) -> + mnesia:clear_table(?ALARM_TAB), + lists:foreach(fun({Id, _Desc}) -> + set_alarm_history(Id) + end, ExistingAlarms). + +encode_alarm({AlarmId, #alarm{severity = Severity, + title = Title, + summary = Summary, + timestamp = Ts}}) -> + emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, + {desc, [{severity, Severity}, + {title, iolist_to_binary(Title)}, + {summary, iolist_to_binary(Summary)}, + {ts, emqx_time:now_secs(Ts)}]}]); +encode_alarm({AlarmId, AlarmDesc}) -> + emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, + {desc, maybe_to_binary(AlarmDesc)}]). + +alarm_msg(Topic, Payload) -> + Msg = emqx_message:make(?MODULE, Topic, Payload), + emqx_message:set_headers(#{'Content-Type' => <<"application/json">>}, + emqx_message:set_flag(sys, Msg)). + +topic(alert, AlarmId) -> + emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); +topic(clear, AlarmId) -> + emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>). + +maybe_to_binary(Data) when is_binary(Data) -> + Data; +maybe_to_binary(Data) -> + iolist_to_binary(io_lib:format("~p", [Data])). + +set_alarm_(Id, Desc) -> + mnesia:dirty_write(?ALARM_TAB, #common_alarm{id = Id, desc = Desc}). + +clear_alarm_(Id) -> + mnesia:dirty_delete(?ALARM_TAB, Id), + set_alarm_history(Id). + +get_alarms_() -> + Alarms = ets:tab2list(?ALARM_TAB), + [{Id, Desc} || #common_alarm{id = Id, desc = Desc} <- Alarms]. + +set_alarm_history(Id) -> + mnesia:dirty_write(?ALARM_HISTORY_TAB, #alarm_history{id = Id, + clear_at = undefined}). + + diff --git a/src/emqx_alarm_mgr.erl b/src/emqx_alarm_mgr.erl deleted file mode 100644 index cfb99678e..000000000 --- a/src/emqx_alarm_mgr.erl +++ /dev/null @@ -1,144 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_alarm_mgr). - --behaviour(gen_event). - --include("emqx.hrl"). --include("logger.hrl"). - --export([start_link/0]). --export([alarm_fun/0, get_alarms/0, set_alarm/1, clear_alarm/1]). --export([add_alarm_handler/1, add_alarm_handler/2, delete_alarm_handler/1]). - -%% gen_event callbacks --export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, - code_change/3]). - --define(ALARM_MGR, ?MODULE). - -start_link() -> - start_with( - fun(Pid) -> - gen_event:add_handler(Pid, ?MODULE, []) - end). - -start_with(Fun) -> - case gen_event:start_link({local, ?ALARM_MGR}) of - {ok, Pid} -> Fun(Pid), {ok, Pid}; - Error -> Error - end. - -alarm_fun() -> alarm_fun(false). - -alarm_fun(Bool) -> - fun(alert, _Alarm) when Bool =:= true -> alarm_fun(true); - (alert, Alarm) when Bool =:= false -> set_alarm(Alarm), alarm_fun(true); - (clear, AlarmId) when Bool =:= true -> clear_alarm(AlarmId), alarm_fun(false); - (clear, _AlarmId) when Bool =:= false -> alarm_fun(false) - end. - --spec(set_alarm(emqx_types:alarm()) -> ok). -set_alarm(Alarm) when is_record(Alarm, alarm) -> - gen_event:notify(?ALARM_MGR, {set_alarm, Alarm}). - --spec(clear_alarm(any()) -> ok). -clear_alarm(AlarmId) when is_binary(AlarmId) -> - gen_event:notify(?ALARM_MGR, {clear_alarm, AlarmId}). - --spec(get_alarms() -> list(emqx_types:alarm())). -get_alarms() -> - gen_event:call(?ALARM_MGR, ?MODULE, get_alarms). - -add_alarm_handler(Module) when is_atom(Module) -> - gen_event:add_handler(?ALARM_MGR, Module, []). - -add_alarm_handler(Module, Args) when is_atom(Module) -> - gen_event:add_handler(?ALARM_MGR, Module, Args). - -delete_alarm_handler(Module) when is_atom(Module) -> - gen_event:delete_handler(?ALARM_MGR, Module, []). - -%%------------------------------------------------------------------------------ -%% Default Alarm handler -%%------------------------------------------------------------------------------ - -init(_) -> {ok, #{alarms => []}}. - -handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)-> - handle_event({set_alarm, Alarm#alarm{timestamp = os:timestamp()}}, State); - -handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #{alarms := Alarms}) -> - case encode_alarm(Alarm) of - {ok, Json} -> - emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json)); - {error, Reason} -> - ?ERROR("[AlarmMgr] Failed to encode alarm: ~p", [Reason]) - end, - {ok, State#{alarms := [Alarm|Alarms]}}; - -handle_event({clear_alarm, AlarmId}, State = #{alarms := Alarms}) -> - case emqx_json:safe_encode([{id, AlarmId}, {ts, os:system_time(second)}]) of - {ok, Json} -> - emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json)); - {error, Reason} -> - ?ERROR("[AlarmMgr] Failed to encode clear: ~p", [Reason]) - end, - {ok, State#{alarms := lists:keydelete(AlarmId, 2, Alarms)}, hibernate}; - -handle_event(Event, State)-> - ?ERROR("[AlarmMgr] unexpected event: ~p", [Event]), - {ok, State}. - -handle_info(Info, State) -> - ?ERROR("[AlarmMgr] unexpected info: ~p", [Info]), - {ok, State}. - -handle_call(get_alarms, State = #{alarms := Alarms}) -> - {ok, Alarms, State}; - -handle_call(Req, State) -> - ?ERROR("[AlarmMgr] unexpected call: ~p", [Req]), - {ok, ignored, State}. - -terminate(swap, State) -> - {?MODULE, State}; -terminate(_, _) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%------------------------------------------------------------------------------ -%% Internal functions -%%------------------------------------------------------------------------------ - -encode_alarm(#alarm{id = AlarmId, severity = Severity, title = Title, - summary = Summary, timestamp = Ts}) -> - emqx_json:safe_encode([{id, AlarmId}, {severity, Severity}, - {title, iolist_to_binary(Title)}, - {summary, iolist_to_binary(Summary)}, - {ts, emqx_time:now_secs(Ts)}]). - -alarm_msg(Type, AlarmId, Json) -> - Msg = emqx_message:make(?ALARM_MGR, topic(Type, AlarmId), Json), - emqx_message:set_headers( #{'Content-Type' => <<"application/json">>}, - emqx_message:set_flag(sys, Msg)). - -topic(alert, AlarmId) -> - emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); -topic(clear, AlarmId) -> - emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>). - diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 30483dbde..e1583e128 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -40,6 +40,10 @@ start(_Type, _Args) -> emqx_listeners:start(), start_autocluster(), register(emqx, self()), + + emqx_alarm_handler:load(), + emqx_logger_handler:init(), + print_vsn(), {ok, Sup}. diff --git a/src/emqx_kernel_sup.erl b/src/emqx_kernel_sup.erl index bd98ccbdf..cfed226cd 100644 --- a/src/emqx_kernel_sup.erl +++ b/src/emqx_kernel_sup.erl @@ -26,7 +26,6 @@ start_link() -> init([]) -> {ok, {{one_for_one, 10, 100}, [child_spec(emqx_pool_sup, supervisor), - child_spec(emqx_alarm_mgr, worker), child_spec(emqx_hooks, worker), child_spec(emqx_stats, worker), child_spec(emqx_metrics, worker), diff --git a/src/emqx_logger_handler.erl b/src/emqx_logger_handler.erl new file mode 100644 index 000000000..5a8dc08d1 --- /dev/null +++ b/src/emqx_logger_handler.erl @@ -0,0 +1,42 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_logger_handler). + +-export([log/2]). +-export([init/0]). + +init() -> + logger:add_handler(emqx_logger_handler, + emqx_logger_handler, + #{level => error, + filters => [{easy_filter, {fun filter_by_level/2, []}}], + filters_default => stop}). + +-spec log(LogEvent, Config) -> ok when LogEvent :: logger:log_event(), Config :: logger:handler_config(). +log(#{msg := {report, #{report := [{supervisor, SupName}, + {errorContext, Error}, + {reason, Reason}, + {offender, _}]}}}, _Config) -> + alarm_handler:set_alarm({supervisor_report, [{supervisor, SupName}, + {errorContext, Error}, + {reason, Reason}]}), + ok; +log(_LogEvent, _Config) -> + ok. + +filter_by_level(LogEvent = #{level := error}, _Extra) -> + LogEvent; +filter_by_level(_LogEvent, _Extra) -> + stop. diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl new file mode 100644 index 000000000..ba2cc35da --- /dev/null +++ b/src/emqx_os_mon.erl @@ -0,0 +1,153 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_os_mon). + +-behaviour(gen_server). + +-include("logger.hrl"). + +-export([start_link/1]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-export([get_cpu_check_interval/0, + set_cpu_check_interval/1, + get_cpu_high_watermark/0, + set_cpu_high_watermark/1, + get_cpu_low_watermark/0, + set_cpu_low_watermark/1, + get_mem_check_interval/0, + set_mem_check_interval/1, + get_sysmem_high_watermark/0, + set_sysmem_high_watermark/1, + get_procmem_high_watermark/0, + set_procmem_high_watermark/1]). + +-define(OS_MON, ?MODULE). + +%%---------------------------------------------------------------------- +%% API +%%---------------------------------------------------------------------- + +start_link(Opts) -> + gen_server:start_link({local, ?OS_MON}, ?MODULE, [Opts], []). + +get_cpu_check_interval() -> + call(get_cpu_check_interval). + +set_cpu_check_interval(Seconds) -> + call({set_cpu_check_interval, Seconds}). + +get_cpu_high_watermark() -> + call(get_cpu_high_watermark). + +set_cpu_high_watermark(Float) -> + call({set_cpu_high_watermark, Float}). + +get_cpu_low_watermark() -> + call(get_cpu_low_watermark). + +set_cpu_low_watermark(Float) -> + call({set_cpu_low_watermark, Float}). + +get_mem_check_interval() -> + memsup:get_check_interval() div 1000. + +set_mem_check_interval(Seconds) -> + memsup:set_check_interval(Seconds div 60). + +get_sysmem_high_watermark() -> + memsup:get_sysmem_high_watermark() / 100. + +set_sysmem_high_watermark(Float) -> + memsup:set_sysmem_high_watermark(Float). + +get_procmem_high_watermark() -> + memsup:get_procmem_high_watermark() / 100. + +set_procmem_high_watermark(Float) -> + memsup:set_procmem_high_watermark(Float). + +%%---------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------- + +init([Opts]) -> + _ = cpu_sup:util(), + set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)), + set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)), + set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)), + {ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts, 0.80), + cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts, 0.60), + cpu_check_interval => proplists:get_value(cpu_check_interval, Opts, 60), + timer => undefined})}. + +handle_call(get_cpu_check_interval, _From, State) -> + {reply, maps:get(cpu_check_interval, State, undefined), State}; +handle_call({set_cpu_check_interval, Seconds}, _From, State) -> + {reply, ok, State#{cpu_check_interval := Seconds}}; + +handle_call(get_cpu_high_watermark, _From, State) -> + {reply, maps:get(cpu_high_watermark, State, undefined), State}; +handle_call({set_cpu_high_watermark, Float}, _From, State) -> + {reply, ok, State#{cpu_high_watermark := Float}}; + +handle_call(get_cpu_low_watermark, _From, State) -> + {reply, maps:get(cpu_low_watermark, State, undefined), State}; +handle_call({set_cpu_low_watermark, Float}, _From, State) -> + {reply, ok, State#{cpu_low_watermark := Float}}; + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({timeout, Timer, check}, State = #{timer := Timer, + cpu_high_watermark := CPUHighWatermark, + cpu_low_watermark := CPULowWatermark}) -> + case cpu_sup:util() of + 0 -> + {noreply, State#{timer := undefined}}; + {error, Reason} -> + ?LOG(warning, "Failed to get cpu utilization: ~p", [Reason]), + {noreply, ensure_check_timer(State)}; + Busy when Busy / 100 >= CPUHighWatermark -> + alarm_handler:set_alarm({cpu_high_watermark, Busy}), + {noreply, ensure_check_timer(State)}; + Busy when Busy / 100 < CPULowWatermark -> + alarm_handler:clear_alarm(cpu_high_watermark), + {noreply, ensure_check_timer(State)} + end. + +terminate(_Reason, #{timer := Timer}) -> + emqx_misc:cancel_timer(Timer). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------- +call(Req) -> + gen_server:call(?OS_MON, Req, infinity). + +ensure_check_timer(State = #{cpu_check_interval := Interval}) -> + State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. \ No newline at end of file diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index 44a256f3f..11a3c8ebe 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -163,5 +163,6 @@ safe_publish(Event, WarnMsg) -> emqx_broker:safe_publish(sysmon_msg(Topic, iolist_to_binary(WarnMsg))). sysmon_msg(Topic, Payload) -> - emqx_message:make(?SYSMON, #{sys => true}, Topic, Payload). + Msg = emqx_message:make(?SYSMON, Topic, Payload), + emqx_message:set_flag(sys, Msg). diff --git a/src/emqx_sys_sup.erl b/src/emqx_sys_sup.erl index 24609acdb..9341b8528 100644 --- a/src/emqx_sys_sup.erl +++ b/src/emqx_sys_sup.erl @@ -24,17 +24,23 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Sys = #{id => sys, - start => {emqx_sys, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_sys]}, - Sysmon = #{id => sys_mon, - start => {emqx_sys_mon, start_link, [emqx_config:get_env(sysmon, [])]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_sys_mon]}, - {ok, {{one_for_one, 10, 100}, [Sys, Sysmon]}}. + {ok, {{one_for_one, 10, 100}, [child_spec(emqx_sys, worker), + child_spec(emqx_sys_mon, worker, [emqx_config:get_env(sysmon, [])]), + child_spec(emqx_os_mon, worker, [emqx_config:get_env(os_mon, [])]), + child_spec(emqx_vm_mon, worker, [emqx_config:get_env(vm_mon, [])])]}}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +child_spec(M, worker) -> + child_spec(M, worker, []). + +child_spec(M, worker, A) -> + #{id => M, + start => {M, start_link, A}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [M]}. diff --git a/src/emqx_vm_mon.erl b/src/emqx_vm_mon.erl new file mode 100644 index 000000000..a4562198e --- /dev/null +++ b/src/emqx_vm_mon.erl @@ -0,0 +1,118 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_vm_mon). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-export([get_check_interval/0, + set_check_interval/1, + get_process_high_watermark/0, + set_process_high_watermark/1, + get_process_low_watermark/0, + set_process_low_watermark/1]). + +-define(VM_MON, ?MODULE). + +%%---------------------------------------------------------------------- +%% API +%%---------------------------------------------------------------------- + +start_link(Opts) -> + gen_server:start_link({local, ?VM_MON}, ?MODULE, [Opts], []). + +get_check_interval() -> + call(get_check_interval). + +set_check_interval(Seconds) -> + call({set_check_interval, Seconds}). + +get_process_high_watermark() -> + call(get_process_high_watermark). + +set_process_high_watermark(Float) -> + call({set_process_high_watermark, Float}). + +get_process_low_watermark() -> + call(get_process_low_watermark). + +set_process_low_watermark(Float) -> + call({set_process_low_watermark, Float}). + +%%---------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------- + +init([Opts]) -> + {ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts, 30), + process_high_watermark => proplists:get_value(process_high_watermark, Opts, 0.70), + process_low_watermark => proplists:get_value(process_low_watermark, Opts, 0.50), + timer => undefined})}. + +handle_call(get_check_interval, _From, State) -> + {reply, maps:get(check_interval, State, undefined), State}; +handle_call({set_check_interval, Seconds}, _From, State) -> + {reply, ok, State#{check_interval := Seconds}}; + +handle_call(get_process_high_watermark, _From, State) -> + {reply, maps:get(process_high_watermark, State, undefined), State}; +handle_call({set_process_high_watermark, Float}, _From, State) -> + {reply, ok, State#{process_high_watermark := Float}}; + +handle_call(get_process_low_watermark, _From, State) -> + {reply, maps:get(process_low_watermark, State, undefined), State}; +handle_call({set_process_low_watermark, Float}, _From, State) -> + {reply, ok, State#{process_low_watermark := Float}}; + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({timeout, Timer, check}, State = #{timer := Timer, + process_high_watermark := ProcHighWatermark, + process_low_watermark := ProcLowWatermark}) -> + ProcessCount = erlang:system_info(process_count), + case ProcessCount / erlang:system_info(process_limit) of + Percent when Percent >= ProcHighWatermark -> + alarm_handler:set_alarm({too_many_processes, ProcessCount}); + Percent when Percent < ProcLowWatermark -> + alarm_handler:clear_alarm(too_many_processes) + end, + {noreply, ensure_check_timer(State)}. + +terminate(_Reason, #{timer := Timer}) -> + emqx_misc:cancel_timer(Timer). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------- +call(Req) -> + gen_server:call(?VM_MON, Req, infinity). + +ensure_check_timer(State = #{check_interval := Interval}) -> + State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. \ No newline at end of file diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl new file mode 100644 index 000000000..e58385ebb --- /dev/null +++ b/test/emqx_alarm_handler_SUITE.erl @@ -0,0 +1,145 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_alarm_handler_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). + +-include("emqx_mqtt.hrl"). +-include("emqx.hrl"). + +all() -> [t_alarm_handler, t_logger_handler]. + +init_per_suite(Config) -> + [start_apps(App, {SchemaFile, ConfigFile}) || + {App, SchemaFile, ConfigFile} + <- [{emqx, local_path("priv/emqx.schema"), + local_path("etc/emqx.conf")}]], + Config. + +end_per_suite(_Config) -> + application:stop(emqx). + +local_path(RelativePath) -> + filename:join([get_base_dir(), RelativePath]). + +get_base_dir() -> + {file, Here} = code:is_loaded(?MODULE), + filename:dirname(filename:dirname(Here)). + +start_apps(App, {SchemaFile, ConfigFile}) -> + read_schema_configs(App, {SchemaFile, ConfigFile}), + set_special_configs(App), + application:ensure_all_started(App). + +read_schema_configs(App, {SchemaFile, ConfigFile}) -> + ct:pal("Read configs - SchemaFile: ~p, ConfigFile: ~p", [SchemaFile, ConfigFile]), + Schema = cuttlefish_schema:files([SchemaFile]), + Conf = conf_parse:file(ConfigFile), + NewConfig = cuttlefish_generator:map(Schema, Conf), + Vals = proplists:get_value(App, NewConfig, []), + [application:set_env(App, Par, Value) || {Par, Value} <- Vals]. + +set_special_configs(_App) -> + ok. + +with_connection(DoFun) -> + {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, + [binary, {packet, raw}, {active, false}], + 3000), + try + DoFun(Sock) + after + emqx_client_sock:close(Sock) + end. + +t_alarm_handler(_) -> + with_connection( + fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5}), + #{version => ?MQTT_PROTO_V5} + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + + Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>), + Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>), + SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}, + emqx_client_sock:send(Sock, + raw_send_serialize( + ?SUBSCRIBE_PACKET( + 1, + [{Topic1, SubOpts}, + {Topic2, SubOpts}]), + #{version => ?MQTT_PROTO_V5})), + + {ok, Data2} = gen_tcp:recv(Sock, 0), + {ok, ?SUBACK_PACKET(1, #{}, [2, 2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), + + alarm_handler:set_alarm({alarm_for_test, #alarm{id = alarm_for_test, + severity = error, + title="alarm title", + summary="alarm summary"}}), + + {ok, Data3} = gen_tcp:recv(Sock, 0), + + {ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + + ?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())), + + alarm_handler:clear_alarm(alarm_for_test), + + {ok, Data4} = gen_tcp:recv(Sock, 0), + + {ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), + + ?assertEqual(false, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())) + + end). + +t_logger_handler(_) -> + %% Meck supervisor report + logger:log(error, #{label => {supervisor, start_error}, + report => [{supervisor, {local, tmp_sup}}, + {errorContext, shutdown}, + {reason, reached_max_restart_intensity}, + {offender, [{pid, meck}, + {id, meck}, + {mfargs, {meck, start_link, []}}, + {restart_type, permanent}, + {shutdown, 5000}, + {child_type, worker}]}]}, + #{logger_formatter => #{title => "SUPERVISOR REPORT"}, + report_cb => fun logger:format_otp_report/1}), + ?assertEqual(true, lists:keymember(supervisor_report, 1, emqx_alarm_handler:get_alarms())). + +raw_send_serialize(Packet) -> + emqx_frame:serialize(Packet). + +raw_send_serialize(Packet, Opts) -> + emqx_frame:serialize(Packet, Opts). + +raw_recv_parse(P, ProtoVersion) -> + emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, + version => ProtoVersion}}). + diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 81796dbdc..e479bbaba 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -29,8 +29,7 @@ all() -> [{group, pubsub}, {group, session}, {group, metrics}, - {group, stats}, - {group, alarms}]. + {group, stats}]. groups() -> [ @@ -41,8 +40,7 @@ groups() -> 'pubsub#', 'pubsub+']}, {session, [sequence], [start_session]}, {metrics, [sequence], [inc_dec_metric]}, - {stats, [sequence], [set_get_stat]}, - {alarms, [sequence], [set_alarms]} + {stats, [sequence], [set_get_stat]} ]. init_per_suite(Config) -> @@ -171,12 +169,3 @@ inc_dec_metric(_) -> set_get_stat(_) -> emqx_stats:setstat('retained/max', 99), 99 = emqx_stats:getstat('retained/max'). - -set_alarms(_) -> - AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, - emqx_alarm_mgr:set_alarm(AlarmTest), - Alarms = emqx_alarm_mgr:get_alarms(), - ct:log("Alarms Length: ~p ~n", [length(Alarms)]), - ?assertEqual(1, length(Alarms)), - emqx_alarm_mgr:clear_alarm(<<"1">>), - [] = emqx_alarm_mgr:get_alarms(). diff --git a/test/emqx_mqtt_packet_SUITE.erl b/test/emqx_mqtt_packet_SUITE.erl index 580fec4c2..8ac76bfd9 100644 --- a/test/emqx_mqtt_packet_SUITE.erl +++ b/test/emqx_mqtt_packet_SUITE.erl @@ -10,6 +10,7 @@ %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and +%% limitations under the License. -module(emqx_mqtt_packet_SUITE). diff --git a/test/emqx_os_mon_SUITE.erl b/test/emqx_os_mon_SUITE.erl new file mode 100644 index 000000000..67a3959a5 --- /dev/null +++ b/test/emqx_os_mon_SUITE.erl @@ -0,0 +1,56 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_os_mon_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). + +all() -> [t_api]. + +init_per_suite(Config) -> + application:ensure_all_started(os_mon), + Config. + +end_per_suite(_Config) -> + application:stop(os_mon). + +t_api(_) -> + gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), + {ok, _} = emqx_os_mon:start_link([{cpu_check_interval, 1}, + {cpu_high_watermark, 0.05}, + {cpu_low_watermark, 0.80}, + {mem_check_interval, 60}, + {sysmem_high_watermark, 0.70}, + {procmem_high_watermark, 0.05}]), + ?assertEqual(1, emqx_os_mon:get_cpu_check_interval()), + ?assertEqual(0.05, emqx_os_mon:get_cpu_high_watermark()), + ?assertEqual(0.80, emqx_os_mon:get_cpu_low_watermark()), + ?assertEqual(60, emqx_os_mon:get_mem_check_interval()), + ?assertEqual(0.7, emqx_os_mon:get_sysmem_high_watermark()), + ?assertEqual(0.05, emqx_os_mon:get_procmem_high_watermark()), + % timer:sleep(2000), + % ?assertEqual(true, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())), + + emqx_os_mon:set_cpu_high_watermark(0.8), + emqx_os_mon:set_cpu_low_watermark(0.75), + ?assertEqual(0.8, emqx_os_mon:get_cpu_high_watermark()), + ?assertEqual(0.75, emqx_os_mon:get_cpu_low_watermark()), + % timer:sleep(3000), + % ?assertEqual(false, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())), + ok. diff --git a/test/emqx_vm_mon_SUITE.erl b/test/emqx_vm_mon_SUITE.erl new file mode 100644 index 000000000..41a717293 --- /dev/null +++ b/test/emqx_vm_mon_SUITE.erl @@ -0,0 +1,50 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2013-2019 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_vm_mon_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). + +all() -> [t_api]. + +init_per_suite(Config) -> + application:ensure_all_started(sasl), + Config. + +end_per_suite(_Config) -> + application:stop(sasl). + +t_api(_) -> + gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), + {ok, _} = emqx_vm_mon:start_link([{check_interval, 1}, + {process_high_watermark, 0}, + {process_low_watermark, 0.6}]), + timer:sleep(2000), + ?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), + emqx_vm_mon:set_process_high_watermark(0.8), + emqx_vm_mon:set_process_low_watermark(0.75), + ?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()), + ?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()), + timer:sleep(3000), + ?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())), + emqx_vm_mon:set_check_interval(20), + ?assertEqual(20, emqx_vm_mon:get_check_interval()), + ok.