From f2b552e29e32e0891ecec8a4e7a2990bb0740d5e Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 17 Aug 2019 17:50:35 +0800 Subject: [PATCH] Ensure stats timer --- src/emqx_channel.erl | 24 ++++++++++++++---------- src/emqx_connection.erl | 38 +++++++++++++++++++++++--------------- src/emqx_ws_connection.erl | 36 +++++++++++++++++++++++------------- 3 files changed, 60 insertions(+), 38 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index d8a69d0e0..b9b18cd8d 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -46,6 +46,8 @@ , terminate/2 ]). +-export([ensure_timer/2]). + -export([gc/3]). -import(emqx_access_control, @@ -53,6 +55,8 @@ , check_acl/3 ]). +-import(emqx_misc, [start_timer/2]). + -export_type([channel/0]). -record(channel, { @@ -659,17 +663,17 @@ handle_info(Info, Channel) -> -> {ok, channel()} | {ok, Result :: term(), channel()} | {stop, Reason :: term(), channel()}). +timeout(TRef, {emit_stats, Stats}, Channel = #channel{stats_timer = TRef}) -> + ClientId = info(client_id, Channel), + ok = emqx_cm:set_chan_stats(ClientId, Stats), + {ok, Channel#channel{stats_timer = undefined}}; + timeout(TRef, retry_deliver, Channel = #channel{%%session = Session, retry_timer = TRef}) -> %% case emqx_session:retry(Session) of %% TODO: ... {ok, Channel#channel{retry_timer = undefined}}; -timeout(TRef, emit_stats, Channel = #channel{stats_timer = TRef}) -> - ClientId = info(client_id, Channel), - %% ok = emqx_cm:set_chan_stats(ClientId, stats(Channel)), - {ok, Channel#channel{stats_timer = undefined}}; - timeout(_TRef, Msg, Channel) -> ?LOG(error, "Unexpected timeout: ~p~n", [Msg]), {ok, Channel}. @@ -678,17 +682,17 @@ timeout(_TRef, Msg, Channel) -> %% Ensure timers %%-------------------------------------------------------------------- +ensure_timer(emit_stats, Channel = #channel{stats_timer = undefined, + idle_timeout = IdleTimeout + }) -> + Channel#channel{stats_timer = start_timer(IdleTimeout, emit_stats)}; + ensure_timer(retry, Channel = #channel{session = Session, retry_timer = undefined}) -> Interval = emqx_session:info(retry_interval, Session), TRef = emqx_misc:start_timer(Interval, retry_deliver), Channel#channel{retry_timer = TRef}; -ensure_timer(stats, Channel = #channel{stats_timer = undefined, - idle_timeout = IdleTimeout}) -> - TRef = emqx_misc:start_timer(IdleTimeout, emit_stats), - Channel#channel{stats_timer = TRef}; - %% disabled or timer existed ensure_timer(_Name, Channel) -> Channel. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index c525a1fc7..cfb743a25 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -360,7 +360,8 @@ handle(info, {Inet, _Sock, Data}, State = #state{chan_state = ChanState}) ?LOG(debug, "RECV ~p", [Data]), emqx_pd:update_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), - NChanState = emqx_channel:gc(1, Oct, ChanState), + NChanState = emqx_channel:ensure_timer( + emit_stats, emqx_channel:gc(1, Oct, ChanState)), process_incoming(Data, State#state{chan_state = NChanState}); handle(info, {Error, _Sock, Reason}, State) @@ -398,24 +399,19 @@ handle(info, activate_socket, State) -> shutdown(Reason, NState) end; -handle(info, {inet_reply, _Sock, ok}, State) -> +handle(info, {inet_reply, _Sock, ok}, State = #state{chan_state = ChanState}) -> %% something sent - keep_state(State); + NChanState = emqx_channel:ensure_timer(emit_stats, ChanState), + keep_state(State#state{chan_state = NChanState}); handle(info, {inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle(info, {timeout, TRef, Msg}, State = #state{chan_state = ChanState}) - when is_reference(TRef) -> - case emqx_channel:timeout(TRef, Msg, ChanState) of - {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {ok, Packets, NChanState} -> - handle_outgoing(Packets, fun keep_state/1, - State#state{chan_state = NChanState}); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end; +handle(info, {timeout, TRef, emit_stats}, State) when is_reference(TRef) -> + handle_timeout(TRef, {emit_stats, stats(State)}, State); + +handle(info, {timeout, TRef, Msg}, State) when is_reference(TRef) -> + handle_timeout(TRef, Msg, State); handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]), @@ -528,7 +524,19 @@ send(IoData, SuccFun, State = #state{transport = Transport, shutdown(Reason, State) end. -%% TODO: maybe_gc(1, Oct, State) +%%-------------------------------------------------------------------- +%% Handle timeout + +handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> + case emqx_channel:timeout(TRef, Msg, ChanState) of + {ok, NChanState} -> + keep_state(State#state{chan_state = NChanState}); + {ok, Packets, NChanState} -> + handle_outgoing(Packets, fun keep_state/1, + State#state{chan_state = NChanState}); + {stop, Reason, NChanState} -> + stop(Reason, State#state{chan_state = NChanState}) + end. %%-------------------------------------------------------------------- %% Ensure keepalive diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 981fad4c3..530a300f8 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -198,7 +198,8 @@ websocket_handle({binary, Data}, State = #state{chan_state = ChanState}) emqx_pd:update_counter(recv_cnt, 1), emqx_pd:update_counter(recv_oct, Oct), ok = emqx_metrics:inc('bytes.received', Oct), - NChanState = emqx_channel:gc(1, Oct, ChanState), + NChanState = emqx_channel:ensure_timer( + emit_stats, emqx_channel:gc(1, Oct, ChanState)), process_incoming(Data, State#state{chan_state = NChanState}); %% Pings should be replied with pongs, cowboy does it automatically @@ -281,16 +282,11 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> stop(keepalive_error, State) end; -websocket_info({timeout, TRef, Msg}, State = #state{chan_state = ChanState}) - when is_reference(TRef) -> - case emqx_channel:timeout(TRef, Msg, ChanState) of - {ok, NChanState} -> - {ok, State#state{chan_state = NChanState}}; - {ok, Packets, NChanState} -> - reply(enqueue(Packets, State#state{chan_state = NChanState})); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end; +websocket_info({timeout, TRef, emit_stats}, State) when is_reference(TRef) -> + handle_timeout(TRef, {emit_stats, stats(State)}, State); + +websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> + handle_timeout(TRef, Msg, State); websocket_info({shutdown, discard, {ClientId, ByPid}}, State) -> ?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]), @@ -341,6 +337,19 @@ connected(State = #state{chan_state = ChanState}) -> stop(Reason, NState) end. +%%-------------------------------------------------------------------- +%% Handle timeout + +handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> + case emqx_channel:timeout(TRef, Msg, ChanState) of + {ok, NChanState} -> + {ok, State#state{chan_state = NChanState}}; + {ok, Packets, NChanState} -> + reply(enqueue(Packets, State#state{chan_state = NChanState})); + {stop, Reason, NChanState} -> + stop(Reason, State#state{chan_state = NChanState}) + end. + %%-------------------------------------------------------------------- %% Ensure keepalive @@ -429,9 +438,10 @@ inc_outgoing_stats(Type) -> reply(State = #state{pendings = []}) -> {ok, State}; -reply(State = #state{pendings = Pendings}) -> +reply(State = #state{chan_state = ChanState, pendings = Pendings}) -> Reply = handle_outgoing(Pendings, State), - {reply, Reply, State#state{pendings = []}}. + NChanState = emqx_channel:ensure_timer(emit_stats, ChanState), + {reply, Reply, State#state{chan_state = NChanState, pendings = []}}. stop(Reason, State = #state{pendings = []}) -> {stop, State#state{reason = Reason}};