From 149ef6d7cc64a898b739003731c87e9d6c5e5a4a Mon Sep 17 00:00:00 2001 From: lafirest Date: Mon, 2 Aug 2021 16:55:57 +0800 Subject: [PATCH] style(emqx_gateway): improve some emqx_coap code --- apps/emqx_gateway/etc/emqx_gateway.conf | 4 +- .../src/coap/emqx_coap_channel.erl | 53 +++---- .../emqx_gateway/src/coap/emqx_coap_frame.erl | 24 +-- .../src/coap/emqx_coap_observe_res.erl | 19 +-- .../src/coap/emqx_coap_session.erl | 74 ++++----- apps/emqx_gateway/src/coap/emqx_coap_tm.erl | 143 +++++++++--------- .../src/coap/emqx_coap_transport.erl | 42 ++--- .../src/coap/include/emqx_coap.hrl | 2 +- 8 files changed, 179 insertions(+), 182 deletions(-) diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index b6dbff834..cdb776af1 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -36,7 +36,7 @@ gateway: { subscribe_qos: qos0 publish_qos: qos1 listener.udp.1: { - bind: 5687 + bind: 5683 } } @@ -49,7 +49,7 @@ gateway: { subscribe_qos: qos2 publish_qos: coap listener.udp.1: { - bind: 5683 + bind: 5687 } } diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index d5b9b7293..c208d00f8 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -67,9 +67,9 @@ -define(DISCONNECT_WAIT_TIME, timer:seconds(10)). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). -%%%=================================================================== -%%% API -%%%=================================================================== +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). @@ -101,7 +101,7 @@ init(ConnInfo = #{peername := {PeerHost, _}, ClientInfo = set_peercert_infos( Peercert, #{ zone => default - , protocol => 'mqtt-coap' + , protocol => 'coap' , peerhost => PeerHost , sockport => SockPort , clientid => emqx_guid:to_base62(emqx_guid:gen()) @@ -132,8 +132,8 @@ auth_subscribe(Topic, clientinfo := ClientInfo}) -> emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic). -transfer_result(Result, From, Value) -> - ?TRANSFER_RESULT(Result, [out], From, Value). +transfer_result(From, Value, Result) -> + ?TRANSFER_RESULT([out], From, Value, Result). %%-------------------------------------------------------------------- %% Handle incoming packet @@ -147,17 +147,17 @@ handle_in(#coap_message{method = post, <<>> -> handle_command(Msg, Channel); _ -> - call_session(Channel, received, [Msg]) + call_session(received, [Msg], Channel) end; handle_in(Msg, Channel) -> - call_session(ensure_keepalive_timer(Channel), received, [Msg]). + call_session(received, [Msg], ensure_keepalive_timer(Channel)). %%-------------------------------------------------------------------- %% Handle Delivers from broker to client %%-------------------------------------------------------------------- handle_deliver(Delivers, Channel) -> - call_session(Channel, deliver, [Delivers]). + call_session(deliver, [Delivers], Channel). %%-------------------------------------------------------------------- %% Handle timeout @@ -165,14 +165,14 @@ handle_deliver(Delivers, Channel) -> handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) -> case emqx_keepalive:check(NewVal, KeepAlive) of {ok, NewKeepAlive} -> - Channel2 = ensure_keepalive_timer(Channel, fun make_timer/4), + Channel2 = ensure_keepalive_timer(fun make_timer/4, Channel), {ok, Channel2#channel{keepalive = NewKeepAlive}}; {error, timeout} -> {shutdown, timeout, Channel} end; handle_timeout(_, {transport, Msg}, Channel) -> - call_session(Channel, timeout, [Msg]); + call_session(timeout, [Msg], Channel); handle_timeout(_, disconnect, Channel) -> {shutdown, normal, Channel}; @@ -207,9 +207,9 @@ handle_info(Info, Channel) -> terminate(_Reason, _Channel) -> ok. -%%%=================================================================== -%%% Internal functions -%%%=================================================================== +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- set_peercert_infos(NoSSL, ClientInfo) when NoSSL =:= nossl; NoSSL =:= undefined -> @@ -232,9 +232,9 @@ make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) -> Channel#channel{timers = Timers#{Name => TRef}}. ensure_keepalive_timer(Channel) -> - ensure_keepalive_timer(Channel, fun ensure_timer/4). + ensure_keepalive_timer(fun ensure_timer/4, Channel). -ensure_keepalive_timer(#channel{config = Cfg} = Channel, Fun) -> +ensure_keepalive_timer(Fun, #channel{config = Cfg} = Channel) -> Interval = maps:get(heartbeat, Cfg), Fun(keepalive, Interval, keepalive, Channel). @@ -285,9 +285,9 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx, conninfo = ConnInfo}) -> ConnProps = #{}, case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of - Error = {error, _Reason} -> Error; - _NConnProps -> - {ok, Input, Channel} + Error = {error, _Reason} -> Error; + _NConnProps -> + {ok, Input, Channel} end. enrich_clientinfo({Queries, Msg}, @@ -339,11 +339,10 @@ ensure_connected(Channel = #channel{ctx = Ctx, Channel#channel{conninfo = NConnInfo}. process_connect(Channel = #channel{ctx = Ctx, - session = Session, conninfo = ConnInfo, clientinfo = ClientInfo}, Msg) -> - SessFun = fun(_,_) -> Session end, + SessFun = fun(_,_) -> emqx_coap_session:new() end, case emqx_gateway_ctx:open_session( Ctx, true, @@ -367,14 +366,16 @@ run_hooks(Ctx, Name, Args, Acc) -> emqx_hooks:run_fold(Name, Args, Acc). reply(Channel, Method, Payload, Req) -> - call_session(Channel, reply, [Req, Method, Payload]). + call_session(reply, [Req, Method, Payload], Channel). ack(Channel, Method, Payload, Req) -> - call_session(Channel, piggyback, [Req, Method, Payload]). + call_session(piggyback, [Req, Method, Payload], Channel). -call_session(#channel{session = Session, - config = Cfg} = Channel, F, A) -> - case erlang:apply(emqx_coap_session, F, [Session, Cfg | A]) of +call_session(F, + A, + #channel{session = Session, + config = Cfg} = Channel) -> + case erlang:apply(emqx_coap_session, F, A ++ [Cfg, Session]) of #{out := Out, session := Session2} -> {ok, {outgoing, Out}, Channel#channel{session = Session2}}; diff --git a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl index 039190646..9a53f3e01 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl @@ -54,9 +54,9 @@ -define(OPTION_PROXY_SCHEME, 39). -define(OPTION_SIZE1, 60). -%%%=================================================================== -%%% API -%%%=================================================================== +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- initial_parse_state(_) -> #{}. @@ -64,9 +64,9 @@ initial_parse_state(_) -> serialize_opts() -> #{}. -%%%=================================================================== -%%% serialize_pkt -%%%=================================================================== +%%-------------------------------------------------------------------- +%% serialize_pkt +%%-------------------------------------------------------------------- %% empty message serialize_pkt(#coap_message{type = Type, method = undefined, id = MsgId}, _Opts) -> <>; @@ -223,9 +223,9 @@ method_to_class_code({error, proxying_not_supported}) -> {5, 05}; method_to_class_code(Method) -> erlang:throw({bad_method, Method}). -%%%=================================================================== -%%% parse -%%%=================================================================== +%%-------------------------------------------------------------------- +%% parse +%%-------------------------------------------------------------------- parse(<>, ParseState) -> {ok, #coap_message{ type = decode_type(Type) @@ -410,9 +410,9 @@ is_message(#coap_message{}) -> is_message(_) -> false. -%%%=================================================================== -%%% Internal functions -%%%=================================================================== +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- -spec is_repeatable_option(message_option_name()) -> boolean(). is_repeatable_option(if_match) -> true; is_repeatable_option(etag) -> true; diff --git a/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl index 199ad0658..3cf925448 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl @@ -17,7 +17,7 @@ -module(emqx_coap_observe_res). %% API --export([ new/0, insert/3, remove/2 +-export([ new_manager/0, insert/3, remove/2 , res_changed/2, foreach/2]). -export_type([manager/0]). @@ -26,6 +26,7 @@ -type topic() :: binary(). -type token() :: binary(). -type seq_id() :: 0 .. ?MAX_SEQ_ID. + -type res() :: #{ token := token() , seq_id := seq_id() }. @@ -35,12 +36,12 @@ %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- --spec new() -> manager(). -new() -> +-spec new_manager() -> manager(). +new_manager() -> #{}. --spec insert(manager(), topic(), token()) -> manager(). -insert(Manager, Topic, Token) -> +-spec insert(topic(), token(), manager()) -> manager(). +insert(Topic, Token, Manager) -> case maps:get(Topic, Manager, undefined) of undefined -> Manager#{Topic => new_res(Token)}; @@ -48,12 +49,12 @@ insert(Manager, Topic, Token) -> Manager end. --spec remove(manager(), topic()) -> manager(). -remove(Manager, Topic) -> +-spec remove(topic(), manager()) -> manager(). +remove(Topic, Manager) -> maps:remove(Topic, Manager). --spec res_changed(manager(), topic()) -> undefined | {token(), seq_id(), manager()}. -res_changed(Manager, Topic) -> +-spec res_changed(topic(), manager()) -> undefined | {token(), seq_id(), manager()}. +res_changed(Topic, Manager) -> case maps:get(Topic, Manager, undefined) of undefined -> undefined; diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index dac4ac924..8b9eed14c 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -47,52 +47,52 @@ new() -> _ = emqx_misc:rand_seed(), #session{ transport_manager = emqx_coap_tm:new() - , observe_manager = emqx_coap_observe_res:new() + , observe_manager = emqx_coap_observe_res:new_manager() , next_msg_id = rand:uniform(?MAX_MESSAGE_ID)}. %%%------------------------------------------------------------------- %%% Process Message %%%------------------------------------------------------------------- -received(Session, Cfg, #coap_message{type = ack} = Msg) -> - handle_response(Session, Cfg, Msg); +received(#coap_message{type = ack} = Msg, Cfg, Session) -> + handle_response(Msg, Cfg, Session); -received(Session, Cfg, #coap_message{type = reset} = Msg) -> - handle_response(Session, Cfg, Msg); +received(#coap_message{type = reset} = Msg, Cfg, Session) -> + handle_response(Msg, Cfg, Session); -received(Session, Cfg, #coap_message{method = Method} = Msg) when is_atom(Method) -> - handle_request(Session, Cfg, Msg); +received(#coap_message{method = Method} = Msg, Cfg, Session) when is_atom(Method) -> + handle_request(Msg, Cfg, Session); -received(Session, Cfg, Msg) -> - handle_response(Session, Cfg, Msg). +received(Msg, Cfg, Session) -> + handle_response(Msg, Cfg, Session). -reply(Session, Cfg, Req, Method) -> - reply(Session, Cfg, Req, Method, <<>>). +reply(Req, Method, Cfg, Session) -> + reply(Req, Method, <<>>, Cfg, Session). -reply(Session, Cfg, Req, Method, Payload) -> +reply(Req, Method, Payload, Cfg, Session) -> Response = emqx_coap_message:response(Method, Payload, Req), - handle_out(Session, Cfg, Response). + handle_out(Response, Cfg, Session). -ack(Session, Cfg, Req) -> - piggyback(Session, Cfg, Req, <<>>). +ack(Req, Cfg, Session) -> + piggyback(Req, <<>>, Cfg, Session). -piggyback(Session, Cfg, Req, Payload) -> +piggyback(Req, Payload, Cfg, Session) -> Response = emqx_coap_message:ack(Req), Response2 = emqx_coap_message:set_payload(Payload, Response), - handle_out(Session, Cfg, Response2). + handle_out(Response2, Cfg, Session). -deliver(Session, Cfg, Delivers) -> +deliver(Delivers, Cfg, Session) -> Fun = fun({_, Topic, Message}, #{out := OutAcc, session := #session{observe_manager = OM, next_msg_id = MsgId} = SAcc} = Acc) -> - case emqx_coap_observe_res:res_changed(OM, Topic) of + case emqx_coap_observe_res:res_changed(Topic, OM) of undefined -> Acc; {Token, SeqId, OM2} -> Msg = mqtt_to_coap(Message, MsgId, Token, SeqId, Cfg), SAcc2 = SAcc#session{next_msg_id = next_msg_id(MsgId), observe_manager = OM2}, - #{out := Out} = Result = call_transport_manager(SAcc2, Cfg, Msg, handle_out), + #{out := Out} = Result = call_transport_manager(handle_out, Msg, Cfg, SAcc2), Result#{out := [Out | OutAcc]} end end, @@ -101,35 +101,35 @@ deliver(Session, Cfg, Delivers) -> session => Session}, Delivers). -timeout(Session, Cfg, Timer) -> - call_transport_manager(Session, Cfg, Timer, ?FUNCTION_NAME). +timeout(Timer, Cfg, Session) -> + call_transport_manager(?FUNCTION_NAME, Timer, Cfg, Session). -transfer_result(Result, From, Value) -> - ?TRANSFER_RESULT(Result, [out, subscribe], From, Value). +transfer_result(From, Value, Result) -> + ?TRANSFER_RESULT([out, subscribe], From, Value, Result). %%%------------------------------------------------------------------- %%% Internal functions %%%------------------------------------------------------------------- -handle_request(Session, Cfg, Msg) -> - call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME). +handle_request(Msg, Cfg, Session) -> + call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session). -handle_response(Session, Cfg, Msg) -> - call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME). +handle_response(Msg, Cfg, Session) -> + call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session). -handle_out(Session, Cfg, Msg) -> - call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME). +handle_out(Msg, Cfg, Session) -> + call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session). -call_transport_manager(#session{transport_manager = TM} = Session, - Cfg, +call_transport_manager(Fun, Msg, - Fun) -> + Cfg, + #session{transport_manager = TM} = Session) -> try - Result = emqx_coap_tm:Fun(Msg, TM, Cfg), + Result = emqx_coap_tm:Fun(Msg, Cfg, TM), {ok, _, Session2} = emqx_misc:pipeline([fun process_tm/2, fun process_subscribe/2], Result, Session), - emqx_coap_channel:transfer_result(Result, session, Session2) + emqx_coap_channel:transfer_result(session, Session2, Result) catch Type:Reason:Stack -> ?ERROR("process transmission with, message:~p failed~n Type:~p,Reason:~p~n,StackTrace:~p~n", [Msg, Type, Reason, Stack]), @@ -146,10 +146,10 @@ process_subscribe(#{subscribe := Sub}, #session{observe_manager = OM} = Session undefined -> {ok, Session}; {Topic, Token} -> - OM2 = emqx_coap_observe_res:insert(OM, Topic, Token), + OM2 = emqx_coap_observe_res:insert(Topic, Token, OM), {ok, Session#session{observe_manager = OM2}}; Topic -> - OM2 = emqx_coap_observe_res:remove(OM, Topic), + OM2 = emqx_coap_observe_res:remove(Topic, OM), {ok, Session#session{observe_manager = OM2}} end; process_subscribe(_, Session) -> diff --git a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl index 677292529..8830d7447 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% transport manager +%% the transport state machine manager -module(emqx_coap_tm). -export([ new/0 @@ -23,23 +23,23 @@ , handle_out/3 , timeout/3]). --export_type([manager/0, event_result/2]). +-export_type([manager/0, event_result/1]). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). -type direction() :: in | out. --type transport_id() :: {direction(), non_neg_integer()}. +-type state_machine_id() :: {direction(), non_neg_integer()}. --record(transport, { id :: transport_id() - , state :: atom() - , timers :: maps:map() - , data :: any()}). --type transport() :: #transport{}. +-record(state_machine, { id :: state_machine_id() + , state :: atom() + , timers :: maps:map() + , transport :: emqx_coap_transport:transport()}). +-type state_machine() :: #state_machine{}. -type message_id() :: 0 .. ?MAX_MESSAGE_ID. --type manager() :: #{message_id() => transport()}. +-type manager() :: #{message_id() => state_machine()}. -type ttimeout() :: {state_timeout, pos_integer(), any()} | {stop_timeout, pos_integer()}. @@ -47,32 +47,31 @@ -type topic() :: binary(). -type token() :: binary(). -type sub_register() :: {topic(), token()} | topic(). - --type event_result(State, Data) :: +-type event_result(State) :: #{next => State, outgoing => emqx_coap_message(), timeouts => list(ttimeout()), has_sub => undefined | sub_register(), - data => Data}. + transport => emqx_coap_transport:transprot()}. -%%%=================================================================== -%%% API -%%%=================================================================== +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- new() -> #{}. -handle_request(#coap_message{id = MsgId} = Msg, TM, Cfg) -> +handle_request(#coap_message{id = MsgId} = Msg, Cfg, TM) -> Id = {in, MsgId}, case maps:get(Id, TM, undefined) of - undefined -> - Data = emqx_coap_transport:new(), - Transport = new_transport(Id, Data), - process_event(in, Msg, TM, Transport, Cfg); - TP -> - process_event(in, Msg, TM, TP, Cfg) + undefined -> + Transport = emqx_coap_transport:new(), + Machine = new_state_machine(Id, Transport), + process_event(in, Msg, TM, Machine, Cfg); + Machine -> + process_event(in, Msg, TM, Machine, Cfg) end. -handle_response(#coap_message{type = Type, id = MsgId} = Msg, TM, Cfg) -> +handle_response(#coap_message{type = Type, id = MsgId} = Msg, Cfg, TM) -> Id = {out, MsgId}, case maps:get(Id, TM, undefined) of undefined -> @@ -83,56 +82,50 @@ handle_response(#coap_message{type = Type, id = MsgId} = Msg, TM, Cfg) -> #{out => #coap_message{type = reset, id = MsgId}} end; - TP -> - process_event(in, Msg, TM, TP, Cfg) + Machine -> + process_event(in, Msg, TM, Machine, Cfg) end. -handle_out(#coap_message{id = MsgId} = Msg, TM, Cfg) -> +handle_out(#coap_message{id = MsgId} = Msg, Cfg, TM) -> Id = {out, MsgId}, case maps:get(Id, TM, undefined) of undefined -> - Data = emqx_coap_transport:new(), - Transport = new_transport(Id, Data), - process_event(out, Msg, TM, Transport, Cfg); + Transport = emqx_coap_transport:new(), + Machine = new_state_machine(Id, Transport), + process_event(out, Msg, TM, Machine, Cfg); _ -> ?WARN("Repeat sending message with id:~p~n", [Id]), ?EMPTY_RESULT end. -timeout({Id, Type, Msg}, TM, Cfg) -> +timeout({Id, Type, Msg}, Cfg, TM) -> case maps:get(Id, TM, undefined) of undefined -> ?EMPTY_RESULT; - #transport{timers = Timers} = TP -> + #state_machine{timers = Timers} = Machine -> %% maybe timer has been canceled case maps:is_key(Type, Timers) of true -> - process_event(Type, Msg, TM, TP, Cfg); + process_event(Type, Msg, TM, Machine, Cfg); _ -> ?EMPTY_RESULT end end. %%-------------------------------------------------------------------- -%% @doc -%% @spec -%% @end +%% Internal functions %%-------------------------------------------------------------------- - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== -new_transport(Id, Data) -> - #transport{id = Id, - state = idle, - timers = #{}, - data = Data}. +new_state_machine(Id, Transport) -> + #state_machine{id = Id, + state = idle, + timers = #{}, + transport = Transport}. process_event(stop_timeout, _, TM, - #transport{id = Id, - timers = Timers}, + #state_machine{id = Id, + timers = Timers}, _) -> lists:foreach(fun({_, Ref}) -> emqx_misc:cancel_timer(Ref) @@ -143,42 +136,42 @@ process_event(stop_timeout, process_event(Event, Msg, TM, - #transport{id = Id, - state = State, - data = Data} = TP, + #state_machine{id = Id, + state = State, + transport = Transport} = Machine, Cfg) -> - Result = emqx_coap_transport:State(Event, Msg, Data, Cfg), - {ok, _, TP2} = emqx_misc:pipeline([fun process_state_change/2, - fun process_data_change/2, - fun process_timeouts/2], - Result, - TP), - TM2 = TM#{Id => TP2}, - emqx_coap_session:transfer_result(Result, tm, TM2). + Result = emqx_coap_transport:State(Event, Msg, Transport, Cfg), + {ok, _, Machine2} = emqx_misc:pipeline([fun process_state_change/2, + fun process_transport_change/2, + fun process_timeouts/2], + Result, + Machine), + TM2 = TM#{Id => Machine2}, + emqx_coap_session:transfer_result(tm, TM2, Result). -process_state_change(#{next := Next}, TP) -> - {ok, cancel_state_timer(TP#transport{state = Next})}; -process_state_change(_, TP) -> - {ok, TP}. +process_state_change(#{next := Next}, Machine) -> + {ok, cancel_state_timer(Machine#state_machine{state = Next})}; +process_state_change(_, Machine) -> + {ok, Machine}. -cancel_state_timer(#transport{timers = Timers} = TP) -> +cancel_state_timer(#state_machine{timers = Timers} = Machine) -> case maps:get(state_timer, Timers, undefined) of undefined -> - TP; + Machine; Ref -> _ = emqx_misc:cancel_timer(Ref), - TP#transport{timers = maps:remove(state_timer, Timers)} + Machine#state_machine{timers = maps:remove(state_timer, Timers)} end. -process_data_change(#{data := Data}, TP) -> - {ok, TP#transport{data = Data}}; -process_data_change(_, TP) -> - {ok, TP}. +process_transport_change(#{transport := Transport}, Machine) -> + {ok, Machine#state_machine{transport = Transport}}; +process_transport_change(_, Machine) -> + {ok, Machine}. -process_timeouts(#{timeouts := []}, TP) -> - {ok, TP}; +process_timeouts(#{timeouts := []}, Machine) -> + {ok, Machine}; process_timeouts(#{timeouts := Timeouts}, - #transport{id = Id, timers = Timers} = TP) -> + #state_machine{id = Id, timers = Timers} = Machine) -> NewTimers = lists:foldl(fun({state_timeout, _, _} = Timer, Acc) -> process_timer(Id, Timer, Acc); ({stop_timeout, I}, Acc) -> @@ -186,11 +179,11 @@ process_timeouts(#{timeouts := Timeouts}, end, Timers, Timeouts), - {ok, TP#transport{timers = NewTimers}}; + {ok, Machine#state_machine{timers = NewTimers}}; -process_timeouts(_, TP) -> - {ok, TP}. +process_timeouts(_, Machine) -> + {ok, Machine}. process_timer(Id, {Type, Interval, Msg}, Timers) -> - Ref = emqx_misc:start_timer(Interval, {transport, {Id, Type, Msg}}), + Ref = emqx_misc:start_timer(Interval, {state_machine, {Id, Type, Msg}}), Timers#{Type => Ref}. diff --git a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl index 7363b6254..b4c8ae333 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl @@ -9,21 +9,23 @@ -define(EXCHANGE_LIFETIME, 247000). -define(NON_LIFETIME, 145000). --record(data, { cache :: undefined | emqx_coap_message() - , retry_interval :: non_neg_integer() - , retry_count :: non_neg_integer() - }). +-record(transport, { cache :: undefined | emqx_coap_message() + , retry_interval :: non_neg_integer() + , retry_count :: non_neg_integer() + }). --type data() :: #data{}. +-type transport() :: #transport{}. -export([ new/0, idle/4, maybe_reset/4 , maybe_resend/4, wait_ack/4, until_stop/4]). --spec new() -> data(). +-export_type([transport/0]). + +-spec new() -> transport(). new() -> - #data{cache = undefined, - retry_interval = 0, - retry_count = 0}. + #transport{cache = undefined, + retry_interval = 0, + retry_count = 0}. idle(in, #coap_message{type = non, id = MsgId, method = Method} = Msg, @@ -50,14 +52,14 @@ idle(in, #coap_message{id = MsgId, type = con, method = Method} = Msg, - Data, + Transport, #{resource := Resource} = Cfg) -> Ret = #{next => maybe_resend, timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]}, case Method of undefined -> ResetMsg = #coap_message{type = reset, id = MsgId}, - Ret#{data => Data#data{cache = ResetMsg}, + Ret#{transport => Transport#transport{cache = ResetMsg}, out => ResetMsg}; _ -> {RetMsg, SubInfo} = @@ -72,7 +74,7 @@ idle(in, end, RetMsg2 = RetMsg#coap_message{type = ack}, Ret#{out => RetMsg2, - data => Data#data{cache = RetMsg2}, + transport => Transport#transport{cache = RetMsg2}, subscribe => SubInfo} end; @@ -81,11 +83,11 @@ idle(out, #coap_message{type = non} = Msg, _, _) -> out => Msg, timeouts => [{stop_timeout, ?NON_LIFETIME}]}; -idle(out, Msg, Data, _) -> +idle(out, Msg, Transport, _) -> _ = emqx_misc:rand_seed(), Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR), #{next => wait_ack, - data => Data#data{cache = Msg}, + transport => Transport#transport{cache = Msg}, out => Msg, timeouts => [ {state_timeout, Timeout, ack_timeout} , {stop_timeout, ?EXCHANGE_LIFETIME}]}. @@ -99,7 +101,7 @@ maybe_reset(in, Message, _, _) -> end, ?EMPTY_RESULT. -maybe_resend(in, _, _, #data{cache = Cache}) -> +maybe_resend(in, _, _, #transport{cache = Cache}) -> #{out => Cache}. wait_ack(in, #coap_message{type = Type}, _, _) -> @@ -115,14 +117,14 @@ wait_ack(in, #coap_message{type = Type}, _, _) -> wait_ack(state_timeout, ack_timeout, _, - #data{cache = Msg, - retry_interval = Timeout, - retry_count = Count} =Data) -> + #transport{cache = Msg, + retry_interval = Timeout, + retry_count = Count} =Transport) -> case Count < ?MAX_RETRANSMIT of true -> Timeout2 = Timeout * 2, - #{data => Data#data{retry_interval = Timeout2, - retry_count = Count + 1}, + #{transport => Transport#transport{retry_interval = Timeout2, + retry_count = Count + 1}, out => Msg, timeouts => [{state_timeout, Timeout2, ack_timeout}]}; _ -> diff --git a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl index 0e0b33365..911d10a22 100644 --- a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl +++ b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl @@ -23,7 +23,7 @@ -define(MAXIMUM_MAX_AGE, 4294967295). -define(EMPTY_RESULT, #{}). --define(TRANSFER_RESULT(R1, Keys, From, Value), +-define(TRANSFER_RESULT(Keys, From, Value, R1), begin R2 = maps:with(Keys, R1), R2#{From => Value}