From 9c855ba8c26f41f002740b9675fd82da825a722d Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 25 Aug 2021 11:43:33 +0800 Subject: [PATCH] chore(gw): cleanup the massive schema defination --- apps/emqx_gateway/etc/emqx_gateway.conf | 130 +++++--- .../src/coap/emqx_coap_channel.erl | 8 +- apps/emqx_gateway/src/emqx_gateway.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_api.erl | 290 ++++++++---------- .../src/emqx_gateway_insta_sup.erl | 16 +- apps/emqx_gateway/src/emqx_gateway_intr.erl | 13 +- apps/emqx_gateway/src/emqx_gateway_schema.erl | 78 +++-- apps/emqx_gateway/src/emqx_gateway_sup.erl | 5 +- apps/emqx_gateway/src/emqx_gateway_utils.erl | 11 + .../src/lwm2m/emqx_lwm2m_xml_object_db.erl | 9 +- apps/emqx_gateway/test/emqx_exproto_SUITE.erl | 2 +- apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl | 38 ++- .../test/emqx_sn_protocol_SUITE.erl | 39 ++- apps/emqx_gateway/test/emqx_stomp_SUITE.erl | 18 +- 14 files changed, 338 insertions(+), 321 deletions(-) diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index 15aeb2e29..206c54b93 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -7,6 +7,17 @@ gateway.stomp { + ## How long time the connection will be disconnected if the + ## connection is established but no bytes received + idle_timeout = 30s + + ## To control whether write statistics data into ETS table + ## for dashbord to read. + enable_stats = true + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + mountpoint = "" + frame { max_headers = 10 max_headers_length = 1024 @@ -18,7 +29,7 @@ gateway.stomp { password = "${Packet.headers.passcode}" } - authenticator { + authentication { name = "authenticator1" mechanism = password-based server_type = built-in-database @@ -36,42 +47,57 @@ gateway.stomp { gateway.coap { - enable_stats = false + ## How long time the connection will be disconnected if the + ## connection is established but no bytes received + idle_timeout = 30s - authenticator { + ## To control whether write statistics data into ETS table + ## for dashbord to read. + enable_stats = true + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + mountpoint = "" + + heartbeat = 30s + notify_type = qos + subscribe_qos = qos0 + publish_qos = qos1 + + authentication { name = "authenticator1" mechanism = password-based server_type = built-in-database user_id_type = clientid } - heartbeat = 30s - notify_type = qos - subscribe_qos = qos0 - publish_qos = qos1 listeners.udp.default { bind = 5683 } } gateway.mqttsn { + + ## How long time the connection will be disconnected if the + ## connection is established but no bytes received + idle_timeout = 30s + + ## To control whether write statistics data into ETS table + ## for dashbord to read. + enable_stats = true + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + mountpoint = "" + ## The MQTT-SN Gateway ID in ADVERTISE message. gateway_id = 1 ## Enable broadcast this gateway to WLAN broadcast = true - ## To control whether write statistics data into ETS table - ## for dashbord to read. - enable_stats = true - ## To control whether accept and process the received ## publish message with qos=-1. enable_qos3 = true - ## Idle timeout for a MQTT-SN channel - idle_timeout = 30s - ## The pre-defined topic name corresponding to the pre-defined topic ## id of N. ## Note that the pre-defined topic id of 0 is reserved. @@ -97,7 +123,55 @@ gateway.mqttsn { } } +gateway.lwm2m { + + ## How long time the connection will be disconnected if the + ## connection is established but no bytes received + idle_timeout = 30s + + ## To control whether write statistics data into ETS table + ## for dashbord to read. + enable_stats = true + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + mountpoint = "lwm2m/%e/" + + xml_dir = "{{ platform_etc_dir }}/lwm2m_xml" + + lifetime_min = 1s + lifetime_max = 86400s + qmode_time_windonw = 22 + auto_observe = false + + ## always | contains_object_list + update_msg_publish_condition = contains_object_list + + translators { + command = "dn/#" + response = "up/resp" + notify = "up/notify" + register = "up/resp" + update = "up/resp" + } + + listeners.udp.default { + bind = 5783 + } +} + gateway.exproto { + + ## How long time the connection will be disconnected if the + ## connection is established but no bytes received + idle_timeout = 30s + + ## To control whether write statistics data into ETS table + ## for dashbord to read. + enable_stats = true + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + mountpoint = "" + ## The gRPC server to accept requests server { bind = 9100 @@ -119,35 +193,7 @@ gateway.exproto { max_connections = 10240 max_conn_rate = 1000 } - #listeners.ssl.default: {} #listeners.udp.default: {} #listeners.dtls.default: {} } - -gateway.lwm2m { - - xml_dir = "{{ platform_etc_dir }}/lwm2m_xml" - - lifetime_min = 1s - lifetime_max = 86400s - qmode_time_windonw = 22 - auto_observe = false - - mountpoint = "lwm2m/%e/" - - ## always | contains_object_list - update_msg_publish_condition = contains_object_list - - translators { - command = "dn/#" - response = "up/resp" - notify = "up/notify" - register = "up/resp" - update = "up/resp" - } - - listeners.udp.default { - bind = 5783 - } -} diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index ccf42343c..510432441 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -106,7 +106,7 @@ init(ConnInfo = #{peername := {PeerHost, _}, #{ctx := Ctx} = Config) -> Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Config, undefined), - EnableAuth = is_authenticator_enabled(Config), + EnableAuth = is_authentication_enabled(Config), ClientInfo = set_peercert_infos( Peercert, #{ zone => default @@ -134,8 +134,8 @@ init(ConnInfo = #{peername := {PeerHost, _}, , keepalive = emqx_keepalive:init(maps:get(heartbeat, Config)) }. -is_authenticator_enabled(Cfg) -> - case maps:get(authenticator, Cfg, #{enable => false}) of +is_authentication_enabled(Cfg) -> + case maps:get(authentication, Cfg, #{enable => false}) of AuthCfg when is_map(AuthCfg) -> maps:get(enable, AuthCfg, true); _ -> false @@ -297,7 +297,7 @@ handle_result(_, _, _, Channel) -> {ok, Channel}. check_auth_state(Msg, #channel{config = Cfg} = Channel) -> - Enable = is_authenticator_enabled(Cfg), + Enable = is_authentication_enabled(Cfg), check_token(Enable, Msg, Channel). check_token(true, diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index d2ab66362..4f80bfe3b 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -51,7 +51,7 @@ load(Name, RawConf) -> }, emqx_gateway_sup:load_gateway(Gateway). --spec unload(gateway_name()) -> ok | {error, any()}. +-spec unload(gateway_name()) -> ok | {error, not_found}. unload(Name) -> emqx_gateway_sup:unload_gateway(Name). diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index e617af65e..28cb45b47 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -20,6 +20,9 @@ -compile(nowarn_unused_function). +-import(emqx_mgmt_util, [ schema/1 + ]). + %% minirest behaviour callbacks -export([api_spec/0]). @@ -41,21 +44,109 @@ } ]). --define(EXAMPLE_STOMP_GATEWAY_CONF, #{ - frame => #{ - max_headers => 10, - max_headers_length => 1024, - max_body_length => 8192 - }, - listener => #{ - tcp => #{<<"default-stomp-listener">> => #{ - bind => <<"61613">> - }} - } - }). +%% XXX: This is whole confs for all type gateways. It is used to fill the +%% default configurations and generate the swagger-schema +%% +%% NOTE: It is a temporary measure to generate swagger-schema +-define(COAP_GATEWAY_CONFS, +#{<<"authentication">> => + #{<<"mechanism">> => <<"password-based">>, + <<"name">> => <<"authenticator1">>, + <<"server_type">> => <<"built-in-database">>, + <<"user_id_type">> => <<"clientid">>}, + <<"enable">> => true, + <<"enable_stats">> => true,<<"heartbeat">> => <<"30s">>, + <<"idle_timeout">> => <<"30s">>, + <<"listeners">> => + #{<<"udp">> => #{<<"default">> => #{<<"bind">> => 5683}}}, + <<"mountpoint">> => <<>>,<<"notify_type">> => <<"qos">>, + <<"publish_qos">> => <<"qos1">>, + <<"subscribe_qos">> => <<"qos0">>} +). --define(EXAMPLE_MQTTSN_GATEWAY_CONF, #{ - }). +-define(EXPROTO_GATEWAY_CONFS, +#{<<"enable">> => true, + <<"enable_stats">> => true, + <<"handler">> => + #{<<"address">> => <<"http://127.0.0.1:9001">>}, + <<"idle_timeout">> => <<"30s">>, + <<"listeners">> => + #{<<"tcp">> => + #{<<"default">> => + #{<<"acceptors">> => 8,<<"bind">> => 7993, + <<"max_conn_rate">> => 1000, + <<"max_connections">> => 10240}}}, + <<"mountpoint">> => <<>>, + <<"server">> => #{<<"bind">> => 9100}} +). + +-define(LWM2M_GATEWAY_CONFS, +#{<<"auto_observe">> => false, + <<"enable">> => true, + <<"enable_stats">> => true, + <<"idle_timeout">> => <<"30s">>, + <<"lifetime_max">> => <<"86400s">>, + <<"lifetime_min">> => <<"1s">>, + <<"listeners">> => + #{<<"udp">> => #{<<"default">> => #{<<"bind">> => 5783}}}, + <<"mountpoint">> => <<"lwm2m/%e/">>, + <<"qmode_time_windonw">> => 22, + <<"translators">> => + #{<<"command">> => <<"dn/#">>,<<"notify">> => <<"up/notify">>, + <<"register">> => <<"up/resp">>, + <<"response">> => <<"up/resp">>, + <<"update">> => <<"up/resp">>}, + <<"update_msg_publish_condition">> => + <<"contains_object_list">>, + <<"xml_dir">> => <<"etc/lwm2m_xml">>} +). + +-define(MQTTSN_GATEWAY_CONFS, +#{<<"broadcast">> => true, + <<"clientinfo_override">> => + #{<<"password">> => <<"abc">>, + <<"username">> => <<"mqtt_sn_user">>}, + <<"enable">> => true, + <<"enable_qos3">> => true,<<"enable_stats">> => true, + <<"gateway_id">> => 1,<<"idle_timeout">> => <<"30s">>, + <<"listeners">> => + #{<<"udp">> => + #{<<"default">> => + #{<<"bind">> => 1884,<<"max_conn_rate">> => 1000, + <<"max_connections">> => 10240000}}}, + <<"mountpoint">> => <<>>, + <<"predefined">> => + [#{<<"id">> => 1, + <<"topic">> => <<"/predefined/topic/name/hello">>}, + #{<<"id">> => 2, + <<"topic">> => <<"/predefined/topic/name/nice">>}]} +). + +-define(STOMP_GATEWAY_CONFS, +#{<<"authentication">> => + #{<<"mechanism">> => <<"password-based">>, + <<"name">> => <<"authenticator1">>, + <<"server_type">> => <<"built-in-database">>, + <<"user_id_type">> => <<"clientid">>}, + <<"clientinfo_override">> => + #{<<"password">> => <<"${Packet.headers.passcode}">>, + <<"username">> => <<"${Packet.headers.login}">>}, + <<"enable">> => true, + <<"enable_stats">> => true, + <<"frame">> => + #{<<"max_body_length">> => 8192,<<"max_headers">> => 10, + <<"max_headers_length">> => 1024}, + <<"idle_timeout">> => <<"30s">>, + <<"listeners">> => + #{<<"tcp">> => + #{<<"default">> => + #{<<"acceptors">> => 16,<<"active_n">> => 100, + <<"bind">> => 61613,<<"max_conn_rate">> => 1000, + <<"max_connections">> => 1024000}}}, + <<"mountpoint">> => <<>>} +). + +%% --- END -define(EXAMPLE_GATEWAY_STATS, #{ max_connection => 10240000, @@ -121,7 +212,7 @@ metadata(gateway_insta) -> summary => <<"Not Found">>, value => #{ code => <<"NOT_FOUND">>, - message => <<"gateway xxx not found">> + message => <<"The gateway not found">> } } } @@ -140,46 +231,13 @@ metadata(gateway_insta) -> parameters => [UriNameParamDef], responses => #{ <<"404">> => NameNotFoundRespDef, - <<"200">> => #{ - description => <<"OK">>, - content => #{ - 'application/json' => #{ - schema => minirest:ref(<<"gateway_conf">>), - examples => #{ - simple1 => #{ - summary => <<"Stomp Gateway">>, - value => emqx_json:encode(?EXAMPLE_STOMP_GATEWAY_CONF) - }, - simple2 => #{ - summary => <<"MQTT-SN Gateway">>, - value => emqx_json:encode(?EXAMPLE_MQTTSN_GATEWAY_CONF) - } - } - } - } - } + <<"200">> => schema(schema_for_gateway_conf()) } }, put => #{ description => <<"Update the gateway configurations/status">>, parameters => [UriNameParamDef], - requestBody => #{ - content => #{ - 'application/json' => #{ - schema => minirest:ref(<<"gateway_conf">>), - examples => #{ - simple1 => #{ - summary => <<"Stom Gateway">>, - value => emqx_json:encode(?EXAMPLE_STOMP_GATEWAY_CONF) - }, - simple2 => #{ - summary => <<"MQTT-SN Gateway">>, - value => emqx_json:encode(?EXAMPLE_MQTTSN_GATEWAY_CONF) - } - } - } - } - }, + requestBody => schema(schema_for_gateway_conf()), responses => #{ <<"404">> => NameNotFoundRespDef, <<"204">> => #{description => <<"Created">>} @@ -210,7 +268,6 @@ metadata(gateway_insta_stats) -> schemas() -> [ #{<<"gateway_overrview">> => schema_for_gateway_overrview()} - , #{<<"gateway_conf">> => schema_for_gateway_conf()} , #{<<"gateway_stats">> => schema_for_gateway_stats()} ]. @@ -262,109 +319,13 @@ schema_for_gateway_overrview() -> schema_for_gateway_conf() -> #{oneOf => - [ schema_for_gateway_conf_stomp() - , schema_for_gateway_conf_mqttsn() - , schema_for_gateway_conf_coap() - , schema_for_gateway_conf_lwm2m() - , schema_for_gateway_conf_exproto() + [ emqx_mgmt_api_configs:gen_schema(?STOMP_GATEWAY_CONFS) + , emqx_mgmt_api_configs:gen_schema(?MQTTSN_GATEWAY_CONFS) + , emqx_mgmt_api_configs:gen_schema(?COAP_GATEWAY_CONFS) + , emqx_mgmt_api_configs:gen_schema(?LWM2M_GATEWAY_CONFS) + , emqx_mgmt_api_configs:gen_schema(?EXPROTO_GATEWAY_CONFS) ]}. -schema_for_clientinfo_override() -> - #{type => object, - properties => #{ - clientid => #{type => string}, - username => #{type => string}, - password => #{type => string} - }}. - -schema_for_authenticator() -> - %% TODO. - #{type => object, properties => #{ - a_key => #{type => string} - }}. - -schema_for_tcp_listener() -> - %% TODO. - #{type => object, properties => #{ - a_key => #{type => string} - }}. - -schema_for_udp_listener() -> - %% TODO. - #{type => object, properties => #{ - a_key => #{type => string} - }}. - -%% It should be generated by _schema.erl module -%% and emqx_gateway.conf -schema_for_gateway_conf_stomp() -> - #{type => object, - properties => #{ - frame => #{ - type => object, - properties => #{ - max_headers => #{type => integer}, - max_headers_length => #{type => integer}, - max_body_length => #{type => integer} - } - }, - clientinfo_override => schema_for_clientinfo_override(), - authenticator => schema_for_authenticator(), - listener => schema_for_tcp_listener() - } - }. - -schema_for_gateway_conf_mqttsn() -> - #{type => object, - properties => #{ - gateway_id => #{type => integer}, - broadcast => #{type => boolean}, - enable_stats => #{type => boolean}, - enable_qos3 => #{type => boolean}, - idle_timeout => #{type => integer}, - predefined => #{ - type => array, - items => #{ - type => object, - properties => #{ - id => #{type => integer}, - topic => #{type => string} - } - } - }, - clientinfo_override => schema_for_clientinfo_override(), - authenticator => schema_for_authenticator(), - listener => schema_for_udp_listener() - }}. - - -schema_for_gateway_conf_coap() -> - #{type => object, - properties => #{ - clientinfo_override => schema_for_clientinfo_override(), - authenticator => schema_for_authenticator(), - listener => schema_for_udp_listener() - }}. - -schema_for_gateway_conf_lwm2m() -> - #{type => object, - properties => #{ - clientinfo_override => schema_for_clientinfo_override(), - authenticator => schema_for_authenticator(), - listener => schema_for_udp_listener() - }}. - -schema_for_gateway_conf_exproto() -> - #{type => object, - properties => #{ - clientinfo_override => schema_for_clientinfo_override(), - authenticator => schema_for_authenticator(), - listener => #{oneOf => [schema_for_tcp_listener(), - schema_for_udp_listener() - ] - } - }}. - schema_for_gateway_stats() -> #{type => object, properties => #{ @@ -382,13 +343,26 @@ gateway(get, Request) -> end, {200, emqx_gateway_intr:gateways(Status)}. -gateway_insta(delete, _Request) -> - {200, ok}; -gateway_insta(get, _Request) -> - {200, ok}; -gateway_insta(put, _Request) -> +gateway_insta(delete, Request) -> + Name = binary_to_existing_atom(cowboy_req:binding(name, Request)), + case emqx_gateway:unload(Name) of + ok -> + {200, ok}; + {error, not_found} -> + {404, <<"Not Found">>}; + {error, Reason} -> + {500, Reason} + end; +gateway_insta(get, Request) -> + Name = binary_to_existing_atom(cowboy_req:binding(name, Request)), + case emqx_gateway:lookup(Name) of + #{rawconf := RawConf} -> + {200, RawConf}; + undefined -> + {404, <<"Not Found">>} + end; +gateway_insta(post, _Request) -> {200, ok}. gateway_insta_stats(get, _Req) -> {401, <<"Implement it later (maybe 5.1)">>}. - diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 47f0104a9..0abcb6426 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -106,14 +106,14 @@ init([Gateway, Ctx0, _GwDscrptr]) -> end. do_init_context(GwName, RawConf, Ctx) -> - Auth = case maps:get(authenticator, RawConf, #{enable => false}) of + Auth = case maps:get(authentication, RawConf, #{enable => false}) of #{enable := false} -> undefined; AuthCfg when is_map(AuthCfg) -> case maps:get(enable, AuthCfg, true) of false -> undefined; _ -> - create_authenticator_for_gateway_insta(GwName, AuthCfg) + create_authentication_for_gateway_insta(GwName, AuthCfg) end; _ -> undefined @@ -121,7 +121,7 @@ do_init_context(GwName, RawConf, Ctx) -> Ctx#{auth => Auth}. do_deinit_context(Ctx) -> - cleanup_authenticator_for_gateway_insta(maps:get(auth, Ctx)), + cleanup_authentication_for_gateway_insta(maps:get(auth, Ctx)), ok. handle_call(info, _From, State = #state{gw = Gateway}) -> @@ -227,24 +227,24 @@ code_change(_OldVsn, State, _Extra) -> %% Internal funcs %%-------------------------------------------------------------------- -create_authenticator_for_gateway_insta(GwName, AuthCfg) -> +create_authentication_for_gateway_insta(GwName, AuthCfg) -> ChainId = atom_to_binary(GwName, utf8), case emqx_authn:create_chain(#{id => ChainId}) of {ok, _ChainInfo} -> case emqx_authn:create_authenticator(ChainId, AuthCfg) of {ok, _} -> ChainId; {error, Reason} -> - logger:error("Failed to create authenticator ~p", [Reason]), - throw({bad_autheticator, Reason}) + logger:error("Failed to create authentication ~p", [Reason]), + throw({bad_authentication, Reason}) end; {error, Reason} -> logger:error("Failed to create authentication chain: ~p", [Reason]), throw({bad_chain, {ChainId, Reason}}) end. -cleanup_authenticator_for_gateway_insta(undefined) -> +cleanup_authentication_for_gateway_insta(undefined) -> ok; -cleanup_authenticator_for_gateway_insta(ChainId) -> +cleanup_authentication_for_gateway_insta(ChainId) -> case emqx_authn:delete_chain(ChainId) of ok -> ok; {error, {not_found, _}} -> diff --git a/apps/emqx_gateway/src/emqx_gateway_intr.erl b/apps/emqx_gateway/src/emqx_gateway_intr.erl index b2a8b0484..2a0c15646 100644 --- a/apps/emqx_gateway/src/emqx_gateway_intr.erl +++ b/apps/emqx_gateway/src/emqx_gateway_intr.erl @@ -40,7 +40,7 @@ gateways(Status) -> case emqx_gateway:lookup(GwName) of undefined -> #{name => GwName, status => unloaded}; GwInfo = #{rawconf := RawConf} -> - GwInfo0 = unix_ts_to_rfc3339( + GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339( [created_at, started_at, stopped_at], GwInfo), GwInfo1 = maps:with([name, @@ -76,14 +76,3 @@ get_listeners_status(GwName, RawConf) -> %% @private listener_name(GwName, Type, LisName) -> list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). - -%% @private -unix_ts_to_rfc3339(Keys, Map) when is_list(Keys) -> - lists:foldl(fun(K, Acc) -> unix_ts_to_rfc3339(K, Acc) end, Map, Keys); -unix_ts_to_rfc3339(Key, Map) -> - case maps:get(Key, Map, undefined) of - undefined -> Map; - Ts -> - Map#{Key => - emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>)} - end. diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 9a0e75a37..0abeb1354 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -62,10 +62,8 @@ fields(gateway) -> fields(stomp_structs) -> [ {frame, t(ref(stomp_frame))} - , {clientinfo_override, t(ref(clientinfo_override))} - , {authenticator, t(authenticator(), undefined, undefined)} , {listeners, t(ref(tcp_listener_group))} - ]; + ] ++ gateway_common_options(); fields(stomp_frame) -> [ {max_headers, t(integer(), undefined, 10)} @@ -76,39 +74,40 @@ fields(stomp_frame) -> fields(mqttsn_structs) -> [ {gateway_id, t(integer())} , {broadcast, t(boolean())} - , {enable_stats, t(boolean())} , {enable_qos3, t(boolean())} - , {idle_timeout, t(duration())} , {predefined, hoconsc:array(ref(mqttsn_predefined))} - , {clientinfo_override, t(ref(clientinfo_override))} - , {authenticator, t(authenticator(), undefined, undefined)} , {listeners, t(ref(udp_listener_group))} - ]; + ] ++ gateway_common_options(); fields(mqttsn_predefined) -> [ {id, t(integer())} - , {topic, t(string())} + , {topic, t(binary())} ]; +fields(coap_structs) -> + [ {heartbeat, t(duration(), undefined, "30s")} + , {notify_type, t(union([non, con, qos]), undefined, qos)} + , {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} + , {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} + , {listeners, t(ref(udp_listener_group))} + ] ++ gateway_common_options(); + fields(lwm2m_structs) -> - [ {xml_dir, t(string())} + [ {xml_dir, t(binary())} , {lifetime_min, t(duration())} , {lifetime_max, t(duration())} , {qmode_time_windonw, t(integer())} , {auto_observe, t(boolean())} - , {mountpoint, t(string())} , {update_msg_publish_condition, t(union([always, contains_object_list]))} , {translators, t(ref(translators))} - , {authenticator, t(authenticator(), undefined, undefined)} , {listeners, t(ref(udp_listener_group))} - ]; + ] ++ gateway_common_options(); fields(exproto_structs) -> [ {server, t(ref(exproto_grpc_server))} , {handler, t(ref(exproto_grpc_handler))} - , {authenticator, t(authenticator(), undefined, undefined)} , {listeners, t(ref(udp_tcp_listener_group))} - ]; + ] ++ gateway_common_options(); fields(exproto_grpc_server) -> [ {bind, t(union(ip_port(), integer()))} @@ -116,18 +115,18 @@ fields(exproto_grpc_server) -> ]; fields(exproto_grpc_handler) -> - [ {address, t(string())} + [ {address, t(binary())} %% TODO: ssl ]; fields(clientinfo_override) -> - [ {username, t(string())} - , {password, t(string())} - , {clientid, t(string())} + [ {username, t(binary())} + , {password, t(binary())} + , {clientid, t(binary())} ]; fields(translators) -> - [{"$name", t(string())}]; + [{"$name", t(binary())}]; fields(udp_listener_group) -> [ {udp, t(ref(udp_listener))} @@ -164,7 +163,6 @@ fields(listener_settings) -> , {max_connections, t(integer(), undefined, 1024)} , {max_conn_rate, t(integer())} , {active_n, t(integer(), undefined, 100)} - %, {zone, t(string())} %, {rate_limit, t(comma_separated_list())} , {access, t(ref(access))} , {proxy_protocol, t(boolean())} @@ -208,27 +206,14 @@ fields(dtls_listener_settings) -> , reuse_sessions => true}) ++ fields(listener_settings); fields(access) -> - [ {"$id", #{type => string(), + [ {"$id", #{type => binary(), nullable => true}}]; -fields(coap) -> - [{"$id", t(ref(coap_structs))}]; - -fields(coap_structs) -> - [ {enable_stats, t(boolean(), undefined, true)} - , {heartbeat, t(duration(), undefined, "30s")} - , {notify_type, t(union([non, con, qos]), undefined, qos)} - , {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} - , {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} - , {authenticator, t(authenticator(), undefined, undefined)} - , {listeners, t(ref(udp_listener_group))} - ]; - fields(ExtraField) -> Mod = list_to_atom(ExtraField++"_schema"), Mod:fields(ExtraField). -authenticator() -> +authentication() -> hoconsc:union( [ undefined , hoconsc:ref(emqx_authn_mnesia, config) @@ -252,6 +237,15 @@ authenticator() -> % %translations(_) -> []. +gateway_common_options() -> + [ {enable, t(boolean(), undefined, true)} + , {enable_stats, t(boolean(), undefined, true)} + , {idle_timeout, t(duration(), undefined, "30s")} + , {mountpoint, t(binary())} + , {clientinfo_override, t(ref(clientinfo_override))} + , {authentication, t(authentication(), undefined, undefined)} + ]. + %%-------------------------------------------------------------------- %% Helpers @@ -289,9 +283,9 @@ ssl(Mapping, Defaults) -> end end, D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end, [ {"enable", t(boolean(), M("enable"), D("enable"))} - , {"cacertfile", t(string(), M("cacertfile"), D("cacertfile"))} - , {"certfile", t(string(), M("certfile"), D("certfile"))} - , {"keyfile", t(string(), M("keyfile"), D("keyfile"))} + , {"cacertfile", t(binary(), M("cacertfile"), D("cacertfile"))} + , {"certfile", t(binary(), M("certfile"), D("certfile"))} + , {"keyfile", t(binary(), M("keyfile"), D("keyfile"))} , {"verify", t(union(verify_peer, verify_none), M("verify"), D("verify"))} , {"fail_if_no_peer_cert", t(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))} , {"secure_renegotiate", t(boolean(), M("secure_renegotiate"), D("secure_renegotiate"))} @@ -299,12 +293,12 @@ ssl(Mapping, Defaults) -> , {"honor_cipher_order", t(boolean(), M("honor_cipher_order"), D("honor_cipher_order"))} , {"handshake_timeout", t(duration(), M("handshake_timeout"), D("handshake_timeout"))} , {"depth", t(integer(), M("depth"), D("depth"))} - , {"password", hoconsc:t(string(), #{mapping => M("key_password"), + , {"password", hoconsc:t(binary(), #{mapping => M("key_password"), default => D("key_password"), sensitive => true })} - , {"dhfile", t(string(), M("dhfile"), D("dhfile"))} - , {"server_name_indication", t(union(disable, string()), M("server_name_indication"), + , {"dhfile", t(binary(), M("dhfile"), D("dhfile"))} + , {"server_name_indication", t(union(disable, binary()), M("server_name_indication"), D("server_name_indication"))} , {"tls_versions", t(comma_separated_list(), M("tls_versions"), D("tls_versions"))} , {"ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))} diff --git a/apps/emqx_gateway/src/emqx_gateway_sup.erl b/apps/emqx_gateway/src/emqx_gateway_sup.erl index 87e41d93b..57ac7e7c7 100644 --- a/apps/emqx_gateway/src/emqx_gateway_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_sup.erl @@ -52,7 +52,10 @@ load_gateway(Gateway = #{name := GwName}) -> emqx_gateway_gw_sup:create_insta(GwSup, Gateway, GwDscrptr) end. --spec unload_gateway(gateway_name()) -> ok | {error, not_found}. +-spec unload_gateway(gateway_name()) + -> ok + | {error, not_found} + | {error, any()}. unload_gateway(GwName) -> case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of false -> {error, not_found}; diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 593729d67..a4978ee8b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -28,6 +28,7 @@ -export([ apply/2 , format_listenon/1 + , unix_ts_to_rfc3339/2 ]). -export([ normalize_rawconf/1 @@ -107,6 +108,16 @@ format_listenon({Addr, Port}) when is_list(Addr) -> format_listenon({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). +unix_ts_to_rfc3339(Keys, Map) when is_list(Keys) -> + lists:foldl(fun(K, Acc) -> unix_ts_to_rfc3339(K, Acc) end, Map, Keys); +unix_ts_to_rfc3339(Key, Map) -> + case maps:get(Key, Map, undefined) of + undefined -> Map; + Ts -> + Map#{Key => + emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>)} + end. + -spec normalize_rawconf(rawconf()) -> list({ Type :: udp | tcp | ssl | dtls , Name :: atom() diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl index ea5f878d3..1d7fb6d5e 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl @@ -49,6 +49,7 @@ %% API Function Definitions %% ------------------------------------------------------------------ +-spec start_link(binary() | string()) -> {ok, pid()} | ignore | {error, any()}. start_link(XmlDir) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []). @@ -85,10 +86,10 @@ stop() -> %% gen_server Function Definitions %% ------------------------------------------------------------------ -init([XmlDir]) -> +init([XmlDir0]) -> _ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]), _ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]), - load(XmlDir), + load(to_list(XmlDir0)), {ok, #state{}}. handle_call(_Request, _From, State) -> @@ -140,3 +141,7 @@ load_xml(FileName) -> [ObjectXml] = xmerl_xpath:string("/LWM2M/Object", Xml), ObjectXml. +to_list(B) when is_binary(B) -> + binary_to_list(B); +to_list(S) when is_list(S) -> + S. diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 0b241a5c8..4902aacf5 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -70,7 +70,7 @@ set_special_cfg(emqx_gateway) -> #{authentication => #{enable => false}, server => #{bind => 9100}, handler => #{address => "http://127.0.0.1:9001"}, - listener => listener_confs(LisType) + listeners => listener_confs(LisType) }); set_special_cfg(_App) -> ok. diff --git a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl index 2e3e5a040..79664928d 100644 --- a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl @@ -29,26 +29,24 @@ -include_lib("common_test/include/ct.hrl"). -define(CONF_DEFAULT, <<" -gateway: { - lwm2m: { - xml_dir: \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\" - lifetime_min: 1s - lifetime_max: 86400s - qmode_time_windonw: 22 - auto_observe: false - mountpoint: \"lwm2m/%e/\" - update_msg_publish_condition: contains_object_list - translators: { - command: \"dn/#\" - response: \"up/resp\" - notify: \"up/notify\" - register: \"up/resp\" - update: \"up/resp\" - } - listener.udp.1 { - bind: 5783 - } - } +gateway.lwm2m { + xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\" + lifetime_min = 1s + lifetime_max = 86400s + qmode_time_windonw = 22 + auto_observe = false + mountpoint = \"lwm2m/%e/\" + update_msg_publish_condition = contains_object_list + translators { + command = \"dn/#\" + response = \"up/resp\" + notify = \"up/notify\" + register = \"up/resp\" + update = \"up/resp\" + } + listeners.udp.default { + bind = 5783 + } } ">>). diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 2b8f62f58..2fbd031ff 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -52,26 +52,25 @@ integer_to_list(erlang:system_time())])). -define(CONF_DEFAULT, <<" -gateway: { - mqttsn: { - gateway_id: 1 - broadcast: true - enable_stats: true - enable_qos3: true - predefined: [ - {id: 1, topic: \"/predefined/topic/name/hello\"}, - {id: 2, topic: \"/predefined/topic/name/nice\"} - ] - clientinfo_override: { - username: \"user1\" - password: \"pw123\" - } - listener.udp.1: { - bind: 1884 - max_connections: 10240000 - max_conn_rate: 1000 - } +gateway.mqttsn { + gateway_id = 1 + broadcast = true + enable_qos3 = true + predefined = [ + { id = 1, + topic = \"/predefined/topic/name/hello\" + }, + { id = 2, + topic = \"/predefined/topic/name/nice\" } + ] + clientinfo_override { + username = \"user1\" + password = \"pw123\" + } + listeners.udp.default { + bind = 1884 + } } ">>). @@ -98,7 +97,7 @@ end_per_suite(_) -> %% Connect t_connect(_) -> - SockName = {'mqttsn:udp', 1884}, + SockName = {'mqttsn:udp:default', 1884}, ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), {ok, Socket} = gen_udp:open(0, [binary]), diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl index 328e9fa79..75f6dadc3 100644 --- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl @@ -24,16 +24,14 @@ -define(HEARTBEAT, <<$\n>>). -define(CONF_DEFAULT, <<" -gateway: { - stomp: { - clientinfo_override: { - username: \"${Packet.headers.login}\" - password: \"${Packet.headers.passcode}\" - } - listener.tcp.1: { - bind: 61613 - } - } +gateway.stomp { + clientinfo_override { + username = \"${Packet.headers.login}\" + password = \"${Packet.headers.passcode}\" + } + listeners.tcp.default { + bind = 61613 + } } ">>).