diff --git a/Makefile b/Makefile index b7763e604..c31d7d5c1 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ EUNIT_OPTS = verbose ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ - emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ + emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ diff --git a/src/emqx.erl b/src/emqx.erl index 88be94cca..7531f6b06 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -29,7 +29,7 @@ -export([topics/0, subscriptions/1, subscribers/1, subscribed/2]). %% Hooks API --export([hook/2, hook/3, hook/4, unhook/2, run_hooks/2, run_hooks/3]). +-export([hook/2, hook/3, hook/4, unhook/2, run_hook/2, run_fold_hook/3]). %% Shutdown and reboot -export([shutdown/0, shutdown/1, reboot/0]). @@ -142,13 +142,13 @@ hook(HookPoint, Action, Filter, Priority) -> unhook(HookPoint, Action) -> emqx_hooks:del(HookPoint, Action). --spec(run_hooks(emqx_hooks:hookpoint(), list(any())) -> ok | stop). -run_hooks(HookPoint, Args) -> +-spec(run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop). +run_hook(HookPoint, Args) -> emqx_hooks:run(HookPoint, Args). --spec(run_hooks(emqx_hooks:hookpoint(), list(any()), any()) -> {ok | stop, any()}). -run_hooks(HookPoint, Args, Acc) -> - emqx_hooks:run(HookPoint, Args, Acc). +-spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()). +run_fold_hook(HookPoint, Args, Acc) -> + emqx_hooks:run_fold(HookPoint, Args, Acc). %%------------------------------------------------------------------------------ %% Shutdown and reboot diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 2af522ef0..47e7b65d7 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -14,198 +14,55 @@ -module(emqx_access_control). --behaviour(gen_server). - -include("emqx.hrl"). -include("logger.hrl"). --export([start_link/0]). --export([authenticate/2]). +-export([authenticate/1]). -export([check_acl/3, reload_acl/0]). --export([register_mod/3, register_mod/4, unregister_mod/2]). --export([lookup_mods/1]). --export([stop/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --define(TAB, ?MODULE). --define(SERVER, ?MODULE). %%------------------------------------------------------------------------------ -%% API +%% APIs %%------------------------------------------------------------------------------ - -%% @doc Start access control server. --spec(start_link() -> {ok, pid()} | {error, term()}). -start_link() -> - start_with(fun register_default_acl/0). - -start_with(Fun) -> - case gen_server:start_link({local, ?SERVER}, ?MODULE, [], []) of - {ok, Pid} -> - Fun(), {ok, Pid}; - {error, Reason} -> - {error, Reason} - end. - -register_default_acl() -> - case emqx_config:get_env(acl_file) of - undefined -> ok; - File -> register_mod(acl, emqx_acl_internal, [File]) - end. - --spec(authenticate(emqx_types:credentials(), emqx_types:password()) - -> ok | {ok, map()} | {continue, map()} | {error, term()}). -authenticate(Credentials, Password) -> - authenticate(Credentials, Password, lookup_mods(auth)). - -authenticate(Credentials, _Password, []) -> - Zone = maps:get(zone, Credentials, undefined), - case emqx_zone:get_env(Zone, allow_anonymous, false) of - true -> ok; - false -> {error, auth_modules_not_found} - end; - -authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) -> - try Mod:check(Credentials, Password, State) of - ok -> ok; - {ok, IsSuper} when is_boolean(IsSuper) -> - {ok, #{is_superuser => IsSuper}}; - {ok, Result} when is_map(Result) -> - {ok, Result}; - {continue, Result} when is_map(Result) -> - {continue, Result}; - ignore -> - authenticate(Credentials, Password, Mods); - {error, Reason} -> - {error, Reason} - catch - error:Reason:StackTrace -> - ?LOG(error, "Authenticate failed. StackTrace: ~p", [StackTrace]), - {error, Reason} +-spec(authenticate(emqx_types:credentials()) + -> {ok, emqx_types:credentials()} | {error, term()}). +authenticate(Credentials) -> + case emqx_hooks:run_fold('client.authenticate', [], Credentials#{result => init_result(Credentials)}) of + #{result := success} = NewCredentials -> + {ok, NewCredentials}; + NewCredentials -> + {error, maps:get(result, NewCredentials, unknown_error)} end. %% @doc Check ACL -spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny). -check_acl(Credentials, PubSub, Topic) when PubSub =:= publish; PubSub =:= subscribe -> - check_acl(Credentials, PubSub, Topic, lookup_mods(acl), emqx_acl_cache:is_enabled()). - -check_acl(Credentials, PubSub, Topic, AclMods, false) -> - do_check_acl(Credentials, PubSub, Topic, AclMods); -check_acl(Credentials, PubSub, Topic, AclMods, true) -> - case emqx_acl_cache:get_acl_cache(PubSub, Topic) of - not_found -> - AclResult = do_check_acl(Credentials, PubSub, Topic, AclMods), - emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult), - AclResult; - AclResult -> - AclResult +check_acl(Credentials, PubSub, Topic) -> + case emqx_acl_cache:is_enabled() of + false -> + do_check_acl(Credentials, PubSub, Topic); + true -> + case emqx_acl_cache:get_acl_cache(PubSub, Topic) of + not_found -> + AclResult = do_check_acl(Credentials, PubSub, Topic), + emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult), + AclResult; + AclResult -> + AclResult + end end. -do_check_acl(#{zone := Zone}, _PubSub, _Topic, []) -> - emqx_zone:get_env(Zone, acl_nomatch, deny); -do_check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> - case Mod:check_acl({Credentials, PubSub, Topic}, State) of - allow -> allow; - deny -> deny; - ignore -> do_check_acl(Credentials, PubSub, Topic, AclMods) +do_check_acl(#{zone := Zone} = Credentials, PubSub, Topic) -> + case emqx_hooks:run_fold('client.check_acl', [Credentials, PubSub, Topic], + emqx_zone:get_env(Zone, acl_nomatch, deny)) of + allow -> allow; + _ -> deny end. -spec(reload_acl() -> list(ok | {error, term()})). reload_acl() -> - [Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)]. + emqx_mod_acl_internal:reload_acl(). -%% @doc Register an Auth/ACL module. --spec(register_mod(auth | acl, module(), list()) -> ok | {error, term()}). -register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl -> - register_mod(Type, Mod, Opts, 0). - --spec(register_mod(auth | acl, module(), list(), non_neg_integer()) - -> ok | {error, term()}). -register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl-> - gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}). - -%% @doc Unregister an Auth/ACL module. --spec(unregister_mod(auth | acl, module()) -> ok | {error, not_found | term()}). -unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl -> - gen_server:call(?SERVER, {unregister_mod, Type, Mod}). - -%% @doc Lookup all Auth/ACL modules. --spec(lookup_mods(auth | acl) -> list()). -lookup_mods(Type) -> - case ets:lookup(?TAB, tab_key(Type)) of - [] -> []; - [{_, Mods}] -> Mods - end. - -tab_key(auth) -> auth_modules; -tab_key(acl) -> acl_modules. - -stop() -> - gen_server:stop(?SERVER, normal, infinity). - -%%----------------------------------------------------------------------------- -%% gen_server callbacks -%%----------------------------------------------------------------------------- - -init([]) -> - ok = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]), - {ok, #{}}. - -handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> - Mods = lookup_mods(Type), - reply(case lists:keymember(Mod, 1, Mods) of - true -> {error, already_exists}; - false -> - try Mod:init(Opts) of - {ok, ModState} -> - NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) -> - Seq1 >= Seq2 - end, [{Mod, ModState, Seq} | Mods]), - ets:insert(?TAB, {tab_key(Type), NewMods}), - ok - catch - _:Error -> - emqx_logger:error("[AccessControl] Failed to init ~s: ~p", [Mod, Error]), - {error, Error} - end - end, State); - -handle_call({unregister_mod, Type, Mod}, _From, State) -> - Mods = lookup_mods(Type), - reply(case lists:keyfind(Mod, 1, Mods) of - false -> - {error, not_found}; - {Mod, _ModState, _Seq} -> - ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), ok - end, State); - -handle_call(stop, _From, State) -> - {stop, normal, ok, State}; - -handle_call(Req, _From, State) -> - emqx_logger:error("[AccessControl] unexpected request: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - emqx_logger:error("[AccessControl] unexpected msg: ~p", [Msg]), - {noreply, State}. - -handle_info(Info, State) -> - emqx_logger:error("[AccessControl] unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -reply(Reply, State) -> - {reply, Reply, State}. +init_result(#{zone := Zone}) -> + case emqx_zone:get_env(Zone, allow_anonymous, false) of + true -> success; + false -> not_authorized + end. \ No newline at end of file diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index 27a6ab821..c14cbec7f 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -16,6 +16,8 @@ -include("emqx.hrl"). +-type(acl_result() :: allow | deny). + -type(who() :: all | binary() | {client, binary()} | {user, binary()} | @@ -23,10 +25,8 @@ -type(access() :: subscribe | publish | pubsub). --type(rule() :: {allow, all} | - {allow, who(), access(), list(emqx_topic:topic())} | - {deny, all} | - {deny, who(), access(), list(emqx_topic:topic())}). +-type(rule() :: {acl_result(), all} | + {acl_result(), who(), access(), list(emqx_topic:topic())}). -export_type([rule/0]). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b43fa5cff..c63831ab9 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -167,13 +167,14 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> -spec(publish(emqx_types:message()) -> emqx_types:deliver_results()). publish(Msg) when is_record(Msg, message) -> _ = emqx_tracer:trace(publish, Msg), - case emqx_hooks:run('message.publish', [], Msg) of - {ok, Msg1 = #message{topic = Topic}} -> + Headers = Msg#message.headers, + case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of + #message{headers = #{allow_publish := false}} -> + ?WARN("Publishing interrupted: ~s", [emqx_message:format(Msg)]), + []; + #message{topic = Topic} = Msg1 -> Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), - Delivery#delivery.results; - {stop, _} -> - ?WARN("Stop publishing: ~s", [emqx_message:format(Msg)]), - [] + Delivery#delivery.results end. %% Called internally @@ -443,5 +444,4 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% Internal functions -%%------------------------------------------------------------------------------ - +%%------------------------------------------------------------------------------ \ No newline at end of file diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 42989ec0c..b17507275 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -22,7 +22,7 @@ -export([start_link/0, stop/0]). %% Hooks API --export([add/2, add/3, add/4, del/2, run/2, run/3, lookup/1]). +-export([add/2, add/3, add/4, del/2, run/2, run_fold/3, lookup/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -90,39 +90,44 @@ del(HookPoint, Action) -> gen_server:cast(?SERVER, {del, HookPoint, Action}). %% @doc Run hooks. --spec(run(atom(), list(Arg :: any())) -> ok | stop). +-spec(run(atom(), list(Arg::term())) -> ok). run(HookPoint, Args) -> - run_(lookup(HookPoint), Args). + do_run(lookup(HookPoint), Args). %% @doc Run hooks with Accumulator. --spec(run(atom(), list(Arg::any()), Acc::any()) -> {ok, Acc::any()} | {stop, Acc::any()}). -run(HookPoint, Args, Acc) -> - run_(lookup(HookPoint), Args, Acc). +-spec(run_fold(atom(), list(Arg::term()), Acc::term()) -> Acc::term()). +run_fold(HookPoint, Args, Acc) -> + do_run_fold(lookup(HookPoint), Args, Acc). -%% @private -run_([#callback{action = Action, filter = Filter} | Callbacks], Args) -> + +do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) -> case filter_passed(Filter, Args) andalso execute(Action, Args) of - false -> run_(Callbacks, Args); - ok -> run_(Callbacks, Args); - stop -> stop; - _Any -> run_(Callbacks, Args) + %% stop the hook chain and return + stop -> ok; + %% continue the hook chain, in following cases: + %% - the filter validation failed with 'false' + %% - the callback returns any term other than 'stop' + _ -> do_run(Callbacks, Args) end; -run_([], _Args) -> +do_run([], _Args) -> ok. -%% @private -run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) -> +do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) -> Args1 = Args ++ [Acc], case filter_passed(Filter, Args1) andalso execute(Action, Args1) of - false -> run_(Callbacks, Args, Acc); - ok -> run_(Callbacks, Args, Acc); - {ok, NewAcc} -> run_(Callbacks, Args, NewAcc); - stop -> {stop, Acc}; - {stop, NewAcc} -> {stop, NewAcc}; - _Any -> run_(Callbacks, Args, Acc) + %% stop the hook chain + stop -> Acc; + %% stop the hook chain with NewAcc + {stop, NewAcc} -> NewAcc; + %% continue the hook chain with NewAcc + {ok, NewAcc} -> do_run_fold(Callbacks, Args, NewAcc); + %% continue the hook chain, in following cases: + %% - the filter validation failed with 'false' + %% - the callback returns any term other than 'stop' or {'stop', NewAcc} + _ -> do_run_fold(Callbacks, Args, Acc) end; -run_([], _Args, Acc) -> - {ok, Acc}. +do_run_fold([], _Args, Acc) -> + Acc. -spec(filter_passed(filter(), Args::term()) -> true | false). filter_passed(undefined, _Args) -> true; diff --git a/src/emqx_acl_internal.erl b/src/emqx_mod_acl_internal.erl similarity index 59% rename from src/emqx_acl_internal.erl rename to src/emqx_mod_acl_internal.erl index 57cc12b73..b829fc9a1 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -12,57 +12,56 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_acl_internal). +-module(emqx_mod_acl_internal). --behaviour(emqx_acl_mod). +-behaviour(emqx_gen_mod). -include("emqx.hrl"). -include("logger.hrl"). +-export([load/1, unload/1]). + -export([all_rules/0]). -%% ACL mod callbacks --export([init/1, check_acl/2, reload_acl/1, description/0]). +-export([check_acl/5, reload_acl/0]). -define(ACL_RULE_TAB, emqx_acl_rule). --type(state() :: #{acl_file := string()}). +-define(FUNC(M, F, A), {M, F, A}). + +-type(acl_rules() :: #{publish => [emqx_access_rule:rule()], + subscribe => [emqx_access_rule:rule()]}). %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ +load(_Env) -> + Rules = load_rules_from_file(acl_file()), + emqx_hooks:add('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules]), -1). + +unload(_Env) -> + Rules = load_rules_from_file(acl_file()), + emqx_hooks:del('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules])). + %% @doc Read all rules -spec(all_rules() -> list(emqx_access_rule:rule())). all_rules() -> - case ets:lookup(?ACL_RULE_TAB, all_rules) of - [] -> []; - [{_, Rules}] -> Rules - end. + load_rules_from_file(acl_file()). %%------------------------------------------------------------------------------ %% ACL callbacks %%------------------------------------------------------------------------------ --spec(init([File :: string()]) -> {ok, #{}}). -init([File]) -> - _ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]), - ok = load_rules_from_file(File), - {ok, #{acl_file => File}}. - load_rules_from_file(AclFile) -> case file:consult(AclFile) of {ok, Terms} -> Rules = [emqx_access_rule:compile(Term) || Term <- Terms], - lists:foreach(fun(PubSub) -> - ets:insert(?ACL_RULE_TAB, {PubSub, - lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)}) - end, [publish, subscribe]), - ets:insert(?ACL_RULE_TAB, {all_rules, Terms}), - ok; + #{publish => lists:filter(fun(Rule) -> filter(publish, Rule) end, Rules), + subscribe => lists:filter(fun(Rule) -> filter(subscribe, Rule) end, Rules)}; {error, Reason} -> - emqx_logger:error("[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]), - {error, Reason} + ?LOG(error, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]), + [] end. filter(_PubSub, {allow, all}) -> @@ -79,20 +78,18 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> false. %% @doc Check ACL --spec(check_acl({emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic()}, #{}) - -> allow | deny | ignore). -check_acl({Credentials, PubSub, Topic}, _State) -> - case match(Credentials, Topic, lookup(PubSub)) of - {matched, allow} -> allow; - {matched, deny} -> deny; - nomatch -> ignore +-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(), + emqx_access_rule:acl_result(), acl_rules()) + -> {ok, allow} | {ok, deny} | ok). +check_acl(Credentials, PubSub, Topic, _AclResult, Rules) -> + case match(Credentials, Topic, lookup(PubSub, Rules)) of + {matched, allow} -> {ok, allow}; + {matched, deny} -> {ok, deny}; + nomatch -> ok end. -lookup(PubSub) -> - case ets:lookup(?ACL_RULE_TAB, PubSub) of - [] -> []; - [{PubSub, Rules}] -> Rules - end. +lookup(PubSub, Rules) -> + maps:get(PubSub, Rules, []). match(_Credentials, _Topic, []) -> nomatch; @@ -104,11 +101,11 @@ match(Credentials, Topic, [Rule|Rules]) -> {matched, AllowDeny} end. --spec(reload_acl(state()) -> ok | {error, term()}). -reload_acl(#{acl_file := AclFile}) -> - try load_rules_from_file(AclFile) of +-spec(reload_acl() -> ok | {error, term()}). +reload_acl() -> + try load_rules_from_file(acl_file()) of ok -> - emqx_logger:info("Reload acl_file ~s successfully", [AclFile]), + emqx_logger:info("Reload acl_file ~s successfully", [acl_file()]), ok; {error, Error} -> {error, Error} @@ -118,6 +115,5 @@ reload_acl(#{acl_file := AclFile}) -> {error, Reason} end. --spec(description() -> string()). -description() -> - "Internal ACL with etc/acl.conf". +acl_file() -> + emqx_config:get_env(acl_file). diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index 6f27256d0..777db54fc 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -18,6 +18,7 @@ -spec(load() -> ok). load() -> + ok = emqx_mod_acl_internal:load([]), lists:foreach( fun({Mod, Env}) -> ok = Mod:load(Env), @@ -26,6 +27,7 @@ load() -> -spec(unload() -> ok). unload() -> + ok = emqx_mod_acl_internal:unload([]), lists:foreach( fun({Mod, Env}) -> Mod:unload(Env) end, diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 96aae9c56..6f15a9f69 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -57,7 +57,6 @@ will_msg, keepalive, mountpoint, - is_super, is_bridge, enable_ban, enable_acl, @@ -68,7 +67,8 @@ connected_at, ignore_loop, topic_alias_maximum, - conn_mod + conn_mod, + credentials }). -opaque(state() :: #pstate{}). @@ -97,7 +97,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF is_assigned = false, conn_pid = self(), username = init_username(Peercert, Options), - is_super = false, clean_start = false, topic_aliases = #{}, packet_size = emqx_zone:get_env(Zone, max_packet_size), @@ -111,7 +110,8 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF connected = false, ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), topic_alias_maximum = #{to_client => 0, from_client => 0}, - conn_mod = maps:get(conn_mod, SocketOpts, undefined)}. + conn_mod = maps:get(conn_mod, SocketOpts, undefined), + credentials = #{}}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -153,10 +153,10 @@ attrs(#pstate{zone = Zone, proto_name = ProtoName, keepalive = Keepalive, mountpoint = Mountpoint, - is_super = IsSuper, is_bridge = IsBridge, connected_at = ConnectedAt, - conn_mod = ConnMod}) -> + conn_mod = ConnMod, + credentials = Credentials}) -> [{zone, Zone}, {client_id, ClientId}, {username, Username}, @@ -167,10 +167,11 @@ attrs(#pstate{zone = Zone, {clean_start, CleanStart}, {keepalive, Keepalive}, {mountpoint, Mountpoint}, - {is_super, IsSuper}, {is_bridge, IsBridge}, {connected_at, ConnectedAt}, - {conn_mod, ConnMod}]. + {conn_mod, ConnMod}, + {credentials, Credentials} + ]. attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> get_property('Receive-Maximum', ConnProps, 65535); @@ -200,6 +201,8 @@ caps(#pstate{zone = Zone}) -> client_id(#pstate{client_id = ClientId}) -> ClientId. +credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 -> + Credentials; credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, @@ -362,8 +365,7 @@ process(?CONNECT_PACKET( %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) - - PState1 = set_username(Username, + PState0 = set_username(Username, PState#pstate{client_id = NewClientId, proto_ver = ProtoVer, proto_name = ProtoName, @@ -372,20 +374,21 @@ process(?CONNECT_PACKET( conn_props = ConnProps, is_bridge = IsBridge, connected_at = os:timestamp()}), - + Credentials = credentials(PState0), + PState1 = PState0#pstate{credentials = Credentials}, connack( case check_connect(ConnPkt, PState1) of - {ok, PState2} -> - case authenticate(credentials(PState2), Password) of - {ok, IsSuper} -> - %% Maybe assign a clientId - PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}), + ok -> + case emqx_access_control:authenticate(Credentials#{password => Password}) of + {ok, Credentials0} -> + PState3 = maybe_assign_client_id(PState1), emqx_logger:set_metadata_client_id(PState3#pstate.client_id), %% Open session SessAttrs = #{will_msg => make_will_msg(ConnPkt)}, case try_open_session(SessAttrs, PState3) of {ok, SPid, SP} -> - PState4 = PState3#pstate{session = SPid, connected = true}, + PState4 = PState3#pstate{session = SPid, connected = true, + credentials = maps:remove(password, Credentials0)}, ok = emqx_cm:register_connection(client_id(PState4)), true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)), %% Start keepalive @@ -394,11 +397,11 @@ process(?CONNECT_PACKET( {?RC_SUCCESS, SP, PState4}; {error, Error} -> ?LOG(error, "Failed to open session: ~p", [Error]), - {?RC_UNSPECIFIED_ERROR, PState1} + {?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}} end; {error, Reason} -> - ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason]), - {?RC_NOT_AUTHORIZED, PState1} + ?LOG(error, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]), + {emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}} end; {error, ReasonCode} -> {ReasonCode, PState1} @@ -406,8 +409,8 @@ process(?CONNECT_PACKET( process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> case check_publish(Packet, PState) of - {ok, PState1} -> - do_publish(Packet, PState1); + ok -> + do_publish(Packet, PState); {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), @@ -416,8 +419,8 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) -> case check_publish(Packet, PState) of - {ok, PState1} -> - do_publish(Packet, PState1); + ok -> + do_publish(Packet, PState); {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), @@ -430,8 +433,8 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) -> process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) -> case check_publish(Packet, PState) of - {ok, PState1} -> - do_publish(Packet, PState1); + ok -> + do_publish(Packet, PState); {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos2 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), @@ -480,16 +483,10 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), case check_subscribe( parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of {ok, TopicFilters} -> - case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of - {ok, TopicFilters1} -> - ok = emqx_session:subscribe(SPid, PacketId, Properties, - emqx_mountpoint:mount(Mountpoint, TopicFilters1)), - {ok, PState}; - {stop, _} -> - ReasonCodes = lists:duplicate(length(TopicFilters), - ?RC_IMPLEMENTATION_SPECIFIC_ERROR), - deliver({suback, PacketId, ReasonCodes}, PState) - end; + TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters), + ok = emqx_session:subscribe(SPid, PacketId, Properties, + emqx_mountpoint:mount(Mountpoint, TopicFilters0)), + {ok, PState}; {error, TopicFilters} -> {SubTopics, ReasonCodes} = lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) -> @@ -509,17 +506,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), PState = #pstate{session = SPid, mountpoint = MountPoint}) -> - case emqx_hooks:run('client.unsubscribe', [credentials(PState)], - parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)) of - {ok, TopicFilters} -> - ok = emqx_session:unsubscribe(SPid, PacketId, Properties, - emqx_mountpoint:mount(MountPoint, TopicFilters)), - {ok, PState}; - {stop, _Acc} -> - ReasonCodes = lists:duplicate(length(RawTopicFilters), - ?RC_IMPLEMENTATION_SPECIFIC_ERROR), - deliver({unsuback, PacketId, ReasonCodes}, PState) - end; + TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)], + parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)), + ok = emqx_session:unsubscribe(SPid, PacketId, Properties, + emqx_mountpoint:mount(MountPoint, TopicFilters)), + {ok, PState}; process(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); @@ -547,11 +538,11 @@ process(?DISCONNECT_PACKET(_), PState) -> %%------------------------------------------------------------------------------ connack({?RC_SUCCESS, SP, PState}) -> - emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), + ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> - emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]), + ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]), [ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer), _ = deliver({connack, ReasonCode1}, PState), {error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}. @@ -660,8 +651,8 @@ deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> - _ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg), - Msg1 = emqx_message:update_expiry(Msg), + Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg), + Msg1 = emqx_message:update_expiry(Msg0), Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), send(emqx_packet:from_message(PacketId, emqx_message:remove_topic_alias(Msg2)), PState); @@ -744,17 +735,6 @@ try_open_session(SessAttrs, PState = #pstate{zone = Zone, Other -> Other end. -authenticate(Credentials, Password) -> - case emqx_access_control:authenticate(Credentials, Password) of - ok -> {ok, false}; - {ok, IsSuper} when is_boolean(IsSuper) -> - {ok, IsSuper}; - {ok, Result} when is_map(Result) -> - {ok, maps:get(is_superuser, Result, false)}; - {error, Error} -> - {error, Error} - end. - set_property(Name, Value, ?NO_PROPS) -> #{Name => Value}; set_property(Name, Value, Props) -> @@ -855,25 +835,21 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret #pstate{zone = Zone}) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). -check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl}) - when IsSuper orelse (not EnableAcl) -> +check_pub_acl(_Packet, #pstate{credentials = #{is_super := IsSuper}, enable_acl = EnableAcl}) + when IsSuper orelse (not EnableAcl) -> ok; - check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) -> case emqx_access_control:check_acl(credentials(PState), publish, Topic) of allow -> ok; - deny -> - {error, ?RC_NOT_AUTHORIZED} + deny -> {error, ?RC_NOT_AUTHORIZED} end. -run_check_steps([], _Packet, PState) -> - {ok, PState}; +run_check_steps([], _Packet, _PState) -> + ok; run_check_steps([Check|Steps], Packet, PState) -> case Check(Packet, PState) of ok -> run_check_steps(Steps, Packet, PState); - {ok, PState1} -> - run_check_steps(Steps, Packet, PState1); Error = {error, _RC} -> Error end. @@ -886,15 +862,13 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) -> {error, TopicFilter1} end. -check_sub_acl(TopicFilters, #pstate{is_super = IsSuper, enable_acl = EnableAcl}) - when IsSuper orelse (not EnableAcl) -> +check_sub_acl(TopicFilters, #pstate{credentials = #{is_super := IsSuper}, enable_acl = EnableAcl}) + when IsSuper orelse (not EnableAcl) -> {ok, TopicFilters}; - check_sub_acl(TopicFilters, PState) -> - Credentials = credentials(PState), lists:foldr( fun({Topic, SubOpts}, {Ok, Acc}) -> - case emqx_access_control:check_acl(Credentials, subscribe, Topic) of + case emqx_access_control:check_acl(credentials(PState), publish, Topic) of allow -> {Ok, [{Topic, SubOpts}|Acc]}; deny -> {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]} @@ -928,7 +902,7 @@ terminate(discard, _PState) -> ok; terminate(Reason, PState) -> ?LOG(info, "Shutdown for ~p", [Reason]), - emqx_hooks:run('client.disconnected', [credentials(PState), Reason]). + ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]). start_keepalive(0, _PState) -> ignore; @@ -999,4 +973,4 @@ reason_codes_compat(_PktType, ReasonCodes, ?MQTT_PROTO_V5) -> reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) -> undefined; reason_codes_compat(PktType, ReasonCodes, _ProtoVer) -> - [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes]. + [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes]. \ No newline at end of file diff --git a/src/emqx_psk.erl b/src/emqx_psk.erl index 8062274ce..7510366eb 100644 --- a/src/emqx_psk.erl +++ b/src/emqx_psk.erl @@ -25,10 +25,12 @@ -type psk_user_state() :: term(). -spec lookup(psk, psk_identity(), psk_user_state()) -> {ok, SharedSecret :: binary()} | error. -lookup(psk, ClientPSKID, UserState) -> - try emqx_hooks:run('tls_handshake.psk_lookup', [ClientPSKID], UserState) of - {ok, SharedSecret} -> {ok, SharedSecret}; - {stop, SharedSecret} -> {ok, SharedSecret} +lookup(psk, ClientPSKID, _UserState) -> + try emqx_hooks:run_fold('tls_handshake.psk_lookup', [ClientPSKID], not_found) of + SharedSecret when is_binary(SharedSecret) -> {ok, SharedSecret}; + Error -> + ?LOG(error, "Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]), + error catch Except:Error:Stacktrace -> ?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]), diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index 6797da59d..846e33e2a 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -17,7 +17,7 @@ -include("emqx_mqtt.hrl"). --export([name/2, text/1]). +-export([name/2, text/1, connack_error/1]). -export([compat/2]). name(I, Ver) when Ver >= ?MQTT_PROTO_V5 -> @@ -143,3 +143,12 @@ compat(suback, Code) when Code =< ?QOS_2 -> Code; compat(suback, Code) when Code >= 16#80 -> 16#80; compat(unsuback, _Code) -> undefined. + +connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID; +connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD; +connack_error(not_authorized) -> ?RC_NOT_AUTHORIZED; +connack_error(server_unavailable) -> ?RC_SERVER_UNAVAILABLE; +connack_error(server_busy) -> ?RC_SERVER_BUSY; +connack_error(banned) -> ?RC_BANNED; +connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD; +connack_error(_) -> ?RC_NOT_AUTHORIZED. \ No newline at end of file diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 276454cd9..a3023b382 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -369,7 +369,7 @@ init([Parent, #{zone := Zone, ok = emqx_sm:register_session(ClientId, self()), true = emqx_sm:set_session_attrs(ClientId, attrs(State)), true = emqx_sm:set_session_stats(ClientId, stats(State)), - emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), + ok = emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), ok = emqx_misc:init_proc_mng_policy(Zone), ok = proc_lib:init_ack(Parent, {ok, self()}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). @@ -466,22 +466,13 @@ handle_call(Req, _From, State) -> handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = - lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> - {[QoS|RcAcc], case maps:find(Topic, SubMap) of - {ok, SubOpts} -> - emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]), - SubMap; - {ok, _SubOpts} -> - emqx_broker:set_subopts(Topic, SubOpts), - %% Why??? - emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]), - maps:put(Topic, SubOpts, SubMap); - error -> - emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]), - maps:put(Topic, SubOpts, SubMap) - end} - end, {[], Subscriptions}, TopicFilters), + lists:foldr( + fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when + RC == ?QOS_0; RC == ?QOS_1; RC == ?QOS_2 -> + {[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)}; + ({_Topic, #{rc := RC}}, {RcAcc, SubMap}) -> + {[RC|RcAcc], SubMap} + end, {[], Subscriptions}, TopicFilters), suback(FromPid, PacketId, ReasonCodes), noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1})); @@ -493,7 +484,7 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, case maps:find(Topic, SubMap) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(Topic), - emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]), + ok = emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]), {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; error -> {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} @@ -568,7 +559,7 @@ handle_cast({resume, #{conn_pid := ConnPid, %% Clean Session: true -> false??? CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), - emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]), + ok = emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]), %% Replay delivery and Dequeue pending messages noreply(ensure_stats_timer(dequeue(retry_delivery(true, State1)))); @@ -668,7 +659,7 @@ terminate(Reason, #state{will_msg = WillMsg, old_conn_pid = OldConnPid}) -> send_willmsg(WillMsg), [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], - emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). + ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -941,7 +932,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use if Dropped =/= undefined -> SessProps = #{client_id => ClientId, username => Username}, - emqx_hooks:run('message.dropped', [SessProps, Msg]); + ok = emqx_hooks:run('message.dropped', [SessProps, Msg]); true -> ok end, State#state{mqueue = NewQ}. @@ -980,7 +971,7 @@ await(PacketId, Msg, State = #state{inflight = Inflight}) -> acked(puback, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, {_, Msg}, _Ts}} -> - emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg), + ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> ?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]), @@ -990,7 +981,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, {_, Msg}, _Ts}} -> - emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg), + ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} -> ?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]), @@ -1118,3 +1109,18 @@ noreply(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. +do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) -> + case maps:find(Topic, SubMap) of + {ok, SubOpts} -> + ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]), + SubMap; + {ok, _SubOpts} -> + emqx_broker:set_subopts(Topic, SubOpts), + %% Why??? + ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]), + maps:put(Topic, SubOpts, SubMap); + error -> + emqx_broker:subscribe(Topic, ClientId, SubOpts), + ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]), + maps:put(Topic, SubOpts, SubMap) + end. \ No newline at end of file diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index eff33a841..72ec6431f 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -62,8 +62,6 @@ init([]) -> %% Broker Sup BrokerSup = supervisor_spec(emqx_broker_sup), BridgeSup = supervisor_spec(emqx_bridge_sup), - %% AccessControl - AccessControl = worker_spec(emqx_access_control), %% Session Manager SMSup = supervisor_spec(emqx_sm_sup), %% Connection Manager @@ -75,7 +73,6 @@ init([]) -> RouterSup, BrokerSup, BridgeSup, - AccessControl, SMSup, CMSup, SysSup]}}. diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 0b18fdf23..913946a79 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -217,6 +217,8 @@ parse(Topic = <>, Options) -> _ -> error({invalid_topic, Topic}) end end; +parse(Topic, Options = #{qos := QoS}) -> + {Topic, Options#{rc => QoS}}; parse(Topic, Options) -> {Topic, Options}. diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index adec0b117..919418a62 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -39,6 +39,16 @@ end_per_suite(_Config) -> local_path(RelativePath) -> filename:join([get_base_dir(), RelativePath]). +deps_path(App, RelativePath) -> + %% Note: not lib_dir because etc dir is not sym-link-ed to _build dir + %% but priv dir is + Path0 = code:priv_dir(App), + Path = case file:read_link(Path0) of + {ok, Resolved} -> Resolved; + {error, _} -> Path0 + end, + filename:join([Path, "..", RelativePath]). + get_base_dir() -> {file, Here} = code:is_loaded(?MODULE), filename:dirname(filename:dirname(Here)). @@ -56,6 +66,9 @@ read_schema_configs(App, {SchemaFile, ConfigFile}) -> Vals = proplists:get_value(App, NewConfig, []), [application:set_env(App, Par, Value) || {Par, Value} <- Vals]. +set_special_configs(emqx) -> + application:set_env(emqx, acl_file, deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf")); + set_special_configs(_App) -> ok. diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index e479bbaba..250cf51bc 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -72,7 +72,7 @@ publish(_) -> ok = emqx:subscribe(<<"test/+">>), timer:sleep(10), emqx:publish(Msg), - ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). + ?assert(receive {dispatch, <<"test/+">>, #message{payload = <<"hello">>}} -> true after 5 -> false end). dispatch_with_no_sub(_) -> Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>), diff --git a/test/emqx_hooks_SUITE.erl b/test/emqx_hooks_SUITE.erl index 5fb7ed461..b79559122 100644 --- a/test/emqx_hooks_SUITE.erl +++ b/test/emqx_hooks_SUITE.erl @@ -21,7 +21,7 @@ -include_lib("common_test/include/ct.hrl"). all() -> - [add_delete_hook, run_hooks]. + [add_delete_hook, run_hook]. add_delete_hook(_) -> {ok, _} = emqx_hooks:start_link(), @@ -54,57 +54,55 @@ add_delete_hook(_) -> ?assertEqual([], emqx_hooks:lookup(emqx_hook)), ok = emqx_hooks:stop(). -run_hooks(_) -> +run_hook(_) -> {ok, _} = emqx_hooks:start_link(), ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), ok = emqx:hook(foldl_hook, {?MODULE, hook_fun3, [init]}), ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), - {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []), - {ok, []} = emqx:run_hooks(unknown_hook, [], []), + [r5,r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []), + [] = emqx:run_fold_hook(unknown_hook, [], []), + + ok = emqx:hook(foldl_hook2, fun ?MODULE:hook_fun9/2), + ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}), + [r9] = emqx:run_fold_hook(foldl_hook2, [arg], []), ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), {error, already_exists} = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), - stop = emqx:run_hooks(foreach_hook, [arg]), + ok = emqx:run_hook(foreach_hook, [arg]), - ok = emqx:hook(foldl_hook2, fun ?MODULE:hook_fun9/2), - ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}), - {stop, []} = emqx:run_hooks(foldl_hook2, [arg], []), - - %% foreach hook always returns 'ok' or 'stop' ok = emqx:hook(foreach_filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0), - ?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg])), %% filter passed - ?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg1])), %% filter failed + ?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])), %% filter passed + ?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])), %% filter failed - %% foldl hook always returns {'ok', Acc} or {'stop', Acc} ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, [init_arg]}), ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, {?MODULE, hook_filter2_1, [init_arg]}), - ?assertEqual({ok, 3}, emqx:run_hooks(foldl_filter2_hook, [arg], 1)), - ?assertEqual({ok, 2}, emqx:run_hooks(foldl_filter2_hook, [arg1], 1)), + ?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)), + ?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)), ok = emqx_hooks:stop(). hook_fun1(arg) -> ok; -hook_fun1(_) -> stop. +hook_fun1(_) -> error. hook_fun2(arg) -> ok; -hook_fun2(_) -> stop. +hook_fun2(_) -> error. hook_fun2(_, Acc) -> {ok, Acc + 1}. hook_fun2_1(_, Acc) -> {ok, Acc + 1}. hook_fun3(arg1, arg2, _Acc, init) -> ok. -hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. -hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. +hook_fun4(arg1, arg2, Acc, init) -> {ok, [r4 | Acc]}. +hook_fun5(arg1, arg2, Acc, init) -> {ok, [r5 | Acc]}. hook_fun6(arg, initArg) -> ok. -hook_fun7(arg, initArg) -> any. -hook_fun8(arg, initArg) -> stop. +hook_fun7(arg, initArg) -> ok. +hook_fun8(arg, initArg) -> ok. -hook_fun9(arg, _Acc) -> any. -hook_fun10(arg, _Acc) -> stop. +hook_fun9(arg, Acc) -> {stop, [r9 | Acc]}. +hook_fun10(arg, Acc) -> {stop, [r10 | Acc]}. hook_filter1(arg) -> true; hook_filter1(_) -> false. diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 0297d2615..c447b8e4a 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -546,6 +546,8 @@ acl_deny_do_disconnect(publish, QoS, Topic) -> {ok, _} = emqx_client:connect(Client), emqx_client:publish(Client, Topic, <<"test">>, QoS), receive + {disconnected, shutdown, tcp_closed} -> + ct:pal(info, "[OK] after publish, client got disconnected: tcp_closed", []); {'EXIT', Client, {shutdown,tcp_closed}} -> ct:pal(info, "[OK] after publish, received exit: {shutdown,tcp_closed}"), false = is_process_alive(Client); @@ -560,6 +562,8 @@ acl_deny_do_disconnect(subscribe, QoS, Topic) -> {ok, _} = emqx_client:connect(Client), {ok, _, [128]} = emqx_client:subscribe(Client, Topic, QoS), receive + {disconnected, shutdown, tcp_closed} -> + ct:pal(info, "[OK] after subscribe, client got disconnected: tcp_closed", []); {'EXIT', Client, {shutdown,tcp_closed}} -> ct:pal(info, "[OK] after subscribe, received exit: {shutdown,tcp_closed}"), false = is_process_alive(Client);