From bfbf377a4584dce74aaf4fd84225e3a320c7e9c0 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 17 Dec 2021 10:21:55 +0800 Subject: [PATCH] feat(coap): support subscribe/unsubscribe operations --- .../src/coap/emqx_coap_channel.erl | 47 +++++++++++++++++-- .../src/coap/emqx_coap_medium.erl | 4 +- .../coap/handler/emqx_coap_pubsub_handler.erl | 2 +- apps/emqx_gateway/src/emqx_gateway.erl | 2 - apps/emqx_gateway/src/emqx_gateway_conf.erl | 2 + 5 files changed, 48 insertions(+), 9 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index bfd505750..0d35716b4 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -226,11 +226,50 @@ handle_call({send_request, Msg}, From, Channel) -> Result = call_session(handle_out, {{send_request, From}, Msg}, Channel), erlang:setelement(1, Result, noreply); -handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) -> - {reply, {error, nosupport}, Channel}; +handle_call({subscribe, Topic, SubOpts}, _From, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{clientid := ClientId, + mountpoint := Mountpoint}, + session = Session}) -> + Token = maps:get(token, + maps:get(sub_props, SubOpts, #{}), + undefined), + NSubOpts = maps:merge( + emqx_gateway_utils:default_subopts(), + SubOpts), + MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic), + _ = emqx_broker:subscribe(MountedTopic, ClientId, NSubOpts), -handle_call({unsubscribe, _Topic}, _From, Channel) -> - {reply, {error, noimpl}, Channel}; + _ = run_hooks(Ctx, 'session.subscribed', + [ClientInfo, MountedTopic, NSubOpts]), + %% modifty session state + SubReq = {Topic, Token}, + TempMsg = #coap_message{}, + Result = emqx_coap_session:process_subscribe( + SubReq, TempMsg, #{}, Session), + NSession = maps:get(session, Result), + {reply, ok, Channel#channel{session = NSession}}; + +handle_call({unsubscribe, Topic}, _From, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{mountpoint := Mountpoint}, + session = Session}) -> + MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic), + ok = emqx_broker:unsubscribe(MountedTopic), + _ = run_hooks(Ctx, 'session.unsubscribe', + [ClientInfo, MountedTopic, #{}]), + + %% modifty session state + UnSubReq = Topic, + TempMsg = #coap_message{}, + Result = emqx_coap_session:process_subscribe( + UnSubReq, TempMsg, #{}, Session), + NSession = maps:get(session, Result), + {reply, ok, Channel#channel{session = NSession}}; handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> Subs = emqx_coap_session:info(subscriptions, Session), diff --git a/apps/emqx_gateway/src/coap/emqx_coap_medium.erl b/apps/emqx_gateway/src/coap/emqx_coap_medium.erl index 8dafc7bbb..020e38496 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_medium.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_medium.erl @@ -53,8 +53,8 @@ out(Msg, Result) -> proto_out(Proto) -> proto_out(Proto, #{}). -proto_out(Proto, Resut) -> - Resut#{proto => Proto}. +proto_out(Proto, Result) -> + Result#{proto => Proto}. reply(Method, Req) when not is_record(Method, coap_message) -> reply(Method, <<>>, Req); diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl index a5ee55b16..608eae92a 100644 --- a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl @@ -146,7 +146,7 @@ subscribe(#coap_message{token = Token} = Msg, Topic, Ctx, CInfo) -> SubOpts = get_sub_opts(Msg), MountTopic = mount(CInfo, Topic), emqx_broker:subscribe(MountTopic, ClientId, SubOpts), - run_hooks(Ctx, 'session.subscribed', [CInfo, Topic, SubOpts]), + run_hooks(Ctx, 'session.subscribed', [CInfo, MountTopic, SubOpts]), ?SUB(MountTopic, Token, Msg); _ -> reply({error, unauthorized}, Msg) diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index 96cc5d4ae..1992104a0 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -16,8 +16,6 @@ -module(emqx_gateway). --behaviour(emqx_config_handler). - -include("include/emqx_gateway.hrl"). %% Gateway APIs diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index 351093e0f..b25bc4aba 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -17,6 +17,8 @@ %% @doc The gateway configuration management module -module(emqx_gateway_conf). +-behaviour(emqx_config_handler). + %% Load/Unload -export([ load/0 , unload/0