diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index d15b8605a..bed769ad6 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -638,7 +638,8 @@ process_connection({open, Req}, Result, Channel); process_connection({close, Msg}, _, Channel, _) -> Reply = emqx_coap_message:piggyback({ok, deleted}, Msg), - {shutdown, close, Reply, Channel}. + NChannel = ensure_disconnected(normal, Channel), + {shutdown, normal, Reply, NChannel}. process_subscribe({Sub, Msg}, Result, #channel{session = Session} = Channel, Iter) -> Result2 = emqx_coap_session:process_subscribe(Sub, Msg, Result, Session), diff --git a/apps/emqx_gateway/test/emqx_coap_SUITE.erl b/apps/emqx_gateway/test/emqx_coap_SUITE.erl index 8bac1ffbd..ba3d77329 100644 --- a/apps/emqx_gateway/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_coap_SUITE.erl @@ -292,12 +292,41 @@ t_clients_get_subscription_api(_) -> end, with_connection(Fun). +t_on_offline_event(_) -> + Fun = fun(Channel) -> + emqx_hooks:add('client.connected', {emqx_sys, on_client_connected, []}), + emqx_hooks:add('client.disconnected', {emqx_sys, on_client_disconnected, []}), + + ConnectedSub = <<"$SYS/brokers/+/gateway/coap/clients/+/connected">>, + emqx_broker:subscribe(ConnectedSub), + timer:sleep(100), + + Token = connection(Channel), + ?assertMatch(#message{}, receive_deliver(500)), + + DisconnectedSub = <<"$SYS/brokers/+/gateway/coap/clients/+/disconnected">>, + emqx_broker:subscribe(DisconnectedSub), + timer:sleep(100), + + disconnection(Channel, Token), + + ?assertMatch(#message{}, receive_deliver(500)), + + emqx_broker:unsubscribe(ConnectedSub), + emqx_broker:unsubscribe(DisconnectedSub), + + emqx_hooks:del('client.connected', {emqx_sys, on_client_connected}), + emqx_hooks:del('client.disconnected', {emqx_sys, on_client_disconnected}), + timer:sleep(500) + end, + do(Fun). + %%-------------------------------------------------------------------- %% helpers connection(Channel) -> URI = ?MQTT_PREFIX ++ - "/connection?clientid=client1&username=admin&password=public", + "/connection?clientid=client1&username=admin&password=public", Req = make_req(post), {ok, created, Data} = do_request(Channel, URI, Req), #coap_content{payload = BinToken} = Data, @@ -378,3 +407,11 @@ with_connection(Action) -> timer:sleep(100) end, do(Fun). + +receive_deliver(Wait) -> + receive + {deliver, _, Msg} -> + Msg + after Wait -> + {error, timeout} + end. diff --git a/apps/emqx_modules/test/emqx_event_message_SUITE.erl b/apps/emqx_modules/test/emqx_event_message_SUITE.erl index c74db36f6..beb9175fe 100644 --- a/apps/emqx_modules/test/emqx_event_message_SUITE.erl +++ b/apps/emqx_modules/test/emqx_event_message_SUITE.erl @@ -42,9 +42,12 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([emqx_modules]), - ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE), + load_config(), Config. +load_config() -> + ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE). + end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_modules]).