chore(bridge-mqtt): sync enterprise code

This commit is contained in:
Zaiming Shi 2021-02-25 00:50:43 +01:00 committed by Shawn
parent 219eeed6d7
commit da6f1104dc
3 changed files with 63 additions and 21 deletions

View File

@ -0,0 +1,10 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<"*.">>, []}
]
}.

View File

@ -195,4 +195,3 @@ feedvar(max_inflight, 0, _) ->
feedvar(max_inflight, Size, _) -> feedvar(max_inflight, Size, _) ->
Size. Size.

View File

@ -185,18 +185,16 @@
}, },
ssl => #{ ssl => #{
order => 14, order => 14,
type => string, type => boolean,
required => false, default => false,
default => <<"off">>, title => #{en => <<"Enable SSL">>,
enum => [<<"on">>, <<"off">>], zh => <<"开启SSL链接"/utf8>>},
title => #{en => <<"Bridge SSL">>, description => #{en => <<"Enable SSL or not">>,
zh => <<"Bridge SSL"/utf8>>}, zh => <<"是否开启 SSL"/utf8>>}
description => #{en => <<"Switch which used to enable ssl connection of the bridge">>,
zh => <<"是否启用 Bridge SSL 连接"/utf8>>}
}, },
cacertfile => #{ cacertfile => #{
order => 15, order => 15,
type => string, type => file,
required => false, required => false,
default => <<"etc/certs/cacert.pem">>, default => <<"etc/certs/cacert.pem">>,
title => #{en => <<"CA certificates">>, title => #{en => <<"CA certificates">>,
@ -206,7 +204,7 @@
}, },
certfile => #{ certfile => #{
order => 16, order => 16,
type => string, type => file,
required => false, required => false,
default => <<"etc/certs/client-cert.pem">>, default => <<"etc/certs/client-cert.pem">>,
title => #{en => <<"SSL Certfile">>, title => #{en => <<"SSL Certfile">>,
@ -216,7 +214,7 @@
}, },
keyfile => #{ keyfile => #{
order => 17, order => 17,
type => string, type => file,
required => false, required => false,
default => <<"etc/certs/client-key.pem">>, default => <<"etc/certs/client-key.pem">>,
title => #{en => <<"SSL Keyfile">>, title => #{en => <<"SSL Keyfile">>,
@ -246,7 +244,6 @@
} }
}). }).
-define(RESOURCE_CONFIG_SPEC_MQTT_SUB, #{ -define(RESOURCE_CONFIG_SPEC_MQTT_SUB, #{
address => #{ address => #{
order => 1, order => 1,
@ -424,7 +421,6 @@
} }
}). }).
-define(RESOURCE_CONFIG_SPEC_RPC, #{ -define(RESOURCE_CONFIG_SPEC_RPC, #{
address => #{ address => #{
order => 1, order => 1,
@ -573,7 +569,7 @@ on_resource_create(ResId, Params) ->
?LOG(info, "Initiating Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]), ?LOG(info, "Initiating Resource ~p, ResId: ~p", [?RESOURCE_TYPE_MQTT, ResId]),
{ok, _} = application:ensure_all_started(ecpool), {ok, _} = application:ensure_all_started(ecpool),
PoolName = pool_name(ResId), PoolName = pool_name(ResId),
Options = options(Params, PoolName), Options = options(Params, PoolName, ResId),
start_resource(ResId, PoolName, Options), start_resource(ResId, PoolName, Options),
case test_resource_status(PoolName) of case test_resource_status(PoolName) of
true -> ok; true -> ok;
@ -719,7 +715,7 @@ name(Pool, Id) ->
pool_name(ResId) -> pool_name(ResId) ->
list_to_atom("bridge_mqtt:" ++ str(ResId)). list_to_atom("bridge_mqtt:" ++ str(ResId)).
options(Options, PoolName) -> options(Options, PoolName, ResId) ->
GetD = fun(Key, Default) -> maps:get(Key, Options, Default) end, GetD = fun(Key, Default) -> maps:get(Key, Options, Default) end,
Get = fun(Key) -> GetD(Key, undefined) end, Get = fun(Key) -> GetD(Key, undefined) end,
Address = Get(<<"address">>), Address = Get(<<"address">>),
@ -757,16 +753,13 @@ options(Options, PoolName) ->
{proto_ver, mqtt_ver(Get(<<"proto_ver">>))}, {proto_ver, mqtt_ver(Get(<<"proto_ver">>))},
{retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)}, {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)},
{ssl, cuttlefish_flag:parse(str(Get(<<"ssl">>)))}, {ssl, cuttlefish_flag:parse(str(Get(<<"ssl">>)))},
{ssl_opts, [ {keyfile, str(Get(<<"keyfile">>))} {ssl_opts, [ {versions, TlsVersions}
, {certfile, str(Get(<<"certfile">>))}
, {cacertfile, str(Get(<<"cacertfile">>))}
, {versions, TlsVersions}
, {ciphers, emqx_tls_lib:integral_ciphers(TlsVersions, Get(<<"ciphers">>))} , {ciphers, emqx_tls_lib:integral_ciphers(TlsVersions, Get(<<"ciphers">>))}
| get_ssl_opts(Options, ResId)
]} ]}
] ++ Subscriptions1 ] ++ Subscriptions1
end. end.
mqtt_ver(ProtoVer) -> mqtt_ver(ProtoVer) ->
case ProtoVer of case ProtoVer of
<<"mqttv3">> -> v3; <<"mqttv3">> -> v3;
@ -779,3 +772,43 @@ format_subscriptions(SubOpts) ->
lists:map(fun(Sub) -> lists:map(fun(Sub) ->
{maps:get(<<"topic">>, Sub), maps:get(<<"qos">>, Sub)} {maps:get(<<"topic">>, Sub), maps:get(<<"qos">>, Sub)}
end, SubOpts). end, SubOpts).
get_ssl_opts(Opts, ResId) ->
KeyFile = maps:get(<<"keyfile">>, Opts, undefined),
CertFile = maps:get(<<"certfile">>, Opts, undefined),
CAFile = case maps:get(<<"cacertfile">>, Opts, undefined) of
undefined -> maps:get(<<"cafile">>, Opts, undefined);
CAFile0 -> CAFile0
end,
Filter = fun(Opts1) ->
[{K, V} || {K, V} <- Opts1,
V =/= undefined,
V =/= <<>>,
V =/= "" ]
end,
Key = save_upload_file(KeyFile, ResId),
Cert = save_upload_file(CertFile, ResId),
CA = save_upload_file(CAFile, ResId),
Verify = case maps:get(<<"verify">>, Opts, false) of
false -> verify_none;
true -> verify_peer
end,
case Filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}]) of
[] -> [{verify, Verify}];
SslOpts ->
[{verify, Verify} | SslOpts]
end.
save_upload_file(#{<<"file">> := <<>>, <<"filename">> := <<>>}, _ResId) -> "";
save_upload_file(FilePath, _) when is_binary(FilePath) -> binary_to_list(FilePath);
save_upload_file(#{<<"file">> := File, <<"filename">> := FileName}, ResId) ->
FullFilename = filename:join([emqx:get_env(data_dir), rules, ResId, FileName]),
ok = filelib:ensure_dir(FullFilename),
case file:write_file(FullFilename, File) of
ok ->
binary_to_list(FullFilename);
{error, Reason} ->
logger:error("Store file failed, ResId: ~p, ~0p", [ResId, Reason]),
error({ResId, store_file_fail})
end;
save_upload_file(_, _) -> "".