diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 4fb909483..0cdf15f9b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -254,7 +254,7 @@ cb_gateway_unload(State = #state{gw = Gateway = #{type := GwType}, gw_state = GwState}) -> try #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType), - CbMod:on_gateway_unload(Gateway, GwState, GwState), + CbMod:on_gateway_unload(Gateway, GwState), {ok, State#state{child_pids = [], gw_state = undefined, status = stopped}} @@ -316,6 +316,7 @@ cb_gateway_update(NewGateway, {error, {Class, Reason1, Stk}} end. +start_child_process([]) -> []; start_child_process([Indictor|_] = ChildPidOrSpecs) -> case erlang:is_pid(Indictor) of true -> diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 85d57a51b..938da15ba 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -60,6 +60,7 @@ fields(mqttsn_structs) -> , {idle_timeout, t(duration())} , {predefined, hoconsc:array(ref(mqttsn_predefined))} , {clientinfo_override, t(ref(clientinfo_override))} + , {authentication, t(ref(authentication))} , {listener, t(ref(udp_listener_group))} ]; @@ -78,6 +79,7 @@ fields(lwm2m_structs) -> , {mountpoint, t(string())} , {update_msg_publish_condition, t(union([always, contains_object_list]))} , {translators, t(ref(translators))} + , {authentication, t(ref(authentication))} , {listener, t(ref(udp_listener_group))} ]; @@ -201,11 +203,11 @@ fields(coap) -> fields(coap_structs) -> [ {enable_stats, t(boolean(), undefined, true)} - , {authentication, t(ref(authentication))} , {heartbeat, t(duration(), undefined, "30s")} , {notify_type, t(union([non, con, qos]), undefined, qos)} , {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} , {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} + , {authentication, t(ref(authentication))} , {listener, t(ref(udp_listener_group))} ]; diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 97d62da52..8a4d24691 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -120,7 +120,8 @@ format_listenon({Addr, Port}) when is_tuple(Addr) -> , SocketOpts :: esockd:option() , Cfg :: map() }). -normalize_rawconf(RawConf = #{listener := LisMap}) -> +normalize_rawconf(RawConf) -> + LisMap = maps:get(listener, RawConf, #{}), Cfg0 = maps:without([listener], RawConf), lists:append(maps:fold(fun(Type, Liss, AccIn1) -> Listeners = diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 363af9f16..4fb5cd05a 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -47,6 +47,8 @@ unreg() -> %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- +start_grpc_server(_GwType, undefined) -> + undefined; start_grpc_server(GwType, Options = #{bind := ListenOn}) -> Services = #{protos => [emqx_exproto_pb], services => #{ @@ -60,6 +62,8 @@ start_grpc_server(GwType, Options = #{bind := ListenOn}) -> _ = grpc:start_server(GwType, ListenOn, Services, SvrOptions), ?ULOG("Start ~s gRPC server on ~p successfully.~n", [GwType, ListenOn]). +start_grpc_client_channel(_GwType, undefined) -> + undefined; start_grpc_client_channel(GwType, Options = #{address := UriStr}) -> UriMap = uri_string:parse(UriStr), Scheme = maps:get(scheme, UriMap), @@ -88,10 +92,10 @@ on_gateway_load(_Gateway = #{ type := GwType, PoolSize = emqx_vm:schedulers() * 2, {ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize, {emqx_exproto_gcli, start_link, []}), - _ = start_grpc_client_channel(GwType, maps:get(handler, RawConf)), + _ = start_grpc_client_channel(GwType, maps:get(handler, RawConf, undefined)), %% XXX: How to monitor it ? - _ = start_grpc_server(GwType, maps:get(server, RawConf)), + _ = start_grpc_server(GwType, maps:get(server, RawConf, undefined)), NRawConf = maps:without( [server, handler], diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 03eebfd22..3dfb8546d 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -53,17 +53,17 @@ on_gateway_load(_Gateway = #{ type := GwType, %% We Also need to start `emqx_sn_broadcast` & %% `emqx_sn_registry` process - SnGwId = maps:get(gateway_id, RawConf), - case maps:get(broadcast, RawConf) of + case maps:get(broadcast, RawConf, false) of false -> ok; true -> %% FIXME: Port = 1884, + SnGwId = maps:get(gateway_id, RawConf, undefined), _ = emqx_sn_broadcast:start_link(SnGwId, Port), ok end, - PredefTopics = maps:get(predefined, RawConf), + PredefTopics = maps:get(predefined, RawConf, []), {ok, RegistrySvr} = emqx_sn_registry:start_link(GwType, PredefTopics), NRawConf = maps:without( diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 43c1d9a86..0b241a5c8 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -55,19 +55,19 @@ metrics() -> init_per_group(GrpName, Cfg) -> put(grpname, GrpName), Svrs = emqx_exproto_echo_svr:start(), - emqx_ct_helpers:start_apps([emqx_gateway], fun set_special_cfg/1), + emqx_ct_helpers:start_apps([emqx_authn, emqx_gateway], fun set_special_cfg/1), emqx_logger:set_log_level(debug), [{servers, Svrs}, {listener_type, GrpName} | Cfg]. end_per_group(_, Cfg) -> - emqx_ct_helpers:stop_apps([emqx_gateway]), + emqx_ct_helpers:stop_apps([emqx_gateway, emqx_authn]), emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). set_special_cfg(emqx_gateway) -> LisType = get(grpname), emqx_config:put( [gateway, exproto], - #{authenticator => allow_anonymous, + #{authentication => #{enable => false}, server => #{bind => 9100}, handler => #{address => "http://127.0.0.1:9001"}, listener => listener_confs(LisType) @@ -77,7 +77,7 @@ set_special_cfg(_App) -> listener_confs(Type) -> Default = #{bind => 7993, acceptors => 8}, - #{Type => maps:merge(Default, maps:from_list(socketopts(Type)))}. + #{Type => #{'1' => maps:merge(Default, maps:from_list(socketopts(Type)))}}. %%-------------------------------------------------------------------- %% Tests cases diff --git a/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl index 7887ece0a..da03b17c5 100644 --- a/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl @@ -35,11 +35,11 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Cfg) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), - emqx_ct_helpers:start_apps([emqx_gateway]), + emqx_ct_helpers:start_apps([emqx_authn, emqx_gateway]), Cfg. end_per_suite(_Cfg) -> - emqx_ct_helpers:stop_apps([emqx_gateway]), + emqx_ct_helpers:stop_apps([emqx_authn, emqx_gateway]), ok. %%-------------------------------------------------------------------- @@ -49,21 +49,15 @@ end_per_suite(_Cfg) -> t_load_unload(_) -> OldCnt = length(emqx_gateway_registry:list()), RgOpts = [{cbkmod, ?MODULE}], - GwOpts = [paramsin], - ok = emqx_gateway_registry:load(test, RgOpts, GwOpts), + ok = emqx_gateway_registry:reg(test, RgOpts), ?assertEqual(OldCnt+1, length(emqx_gateway_registry:list())), #{cbkmod := ?MODULE, - rgopts := RgOpts, - gwopts := GwOpts, - state := #{gwstate := 1}} = emqx_gateway_registry:lookup(test), + rgopts := RgOpts} = emqx_gateway_registry:lookup(test), - {error, already_existed} = emqx_gateway_registry:load(test, [{cbkmod, ?MODULE}], GwOpts), + {error, already_existed} = emqx_gateway_registry:reg(test, [{cbkmod, ?MODULE}]), - ok = emqx_gateway_registry:unload(test), + ok = emqx_gateway_registry:unreg(test), undefined = emqx_gateway_registry:lookup(test), OldCnt = length(emqx_gateway_registry:list()), ok. - -init([paramsin]) -> - {ok, _GwState = #{gwstate => 1}}. diff --git a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl index ff6e416e4..2e3e5a040 100644 --- a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl @@ -134,12 +134,12 @@ groups() -> ]. init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx]), + emqx_ct_helpers:start_apps([emqx_authn]), Config. end_per_suite(Config) -> timer:sleep(300), - emqx_ct_helpers:stop_apps([emqx]), + emqx_ct_helpers:stop_apps([emqx_authn]), Config. init_per_testcase(_AllTestCase, Config) -> diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 2b8475cf7..2b8f62f58 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -84,11 +84,11 @@ all() -> init_per_suite(Config) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), - emqx_ct_helpers:start_apps([emqx_gateway]), + emqx_ct_helpers:start_apps([emqx_authn, emqx_gateway]), Config. end_per_suite(_) -> - emqx_ct_helpers:stop_apps([emqx_gateway]). + emqx_ct_helpers:stop_apps([emqx_gateway, emqx_authn]). %%-------------------------------------------------------------------- %% Test cases @@ -98,7 +98,7 @@ end_per_suite(_) -> %% Connect t_connect(_) -> - SockName = {'mqttsn#1:udp', 1884}, + SockName = {'mqttsn:udp', 1884}, ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), {ok, Socket} = gen_udp:open(0, [binary]), diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl index 3d05b73c3..328e9fa79 100644 --- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl @@ -45,11 +45,11 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Cfg) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), - emqx_ct_helpers:start_apps([emqx_gateway]), + emqx_ct_helpers:start_apps([emqx_authn, emqx_gateway]), Cfg. end_per_suite(_Cfg) -> - emqx_ct_helpers:stop_apps([emqx_gateway]), + emqx_ct_helpers:stop_apps([emqx_gateway, emqx_authn]), ok. %%--------------------------------------------------------------------