From 3202ed2392acbea90fe8b2bcba6020c5144d1d25 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 20 Sep 2019 14:38:16 +0800 Subject: [PATCH] Improve the 'channel' module and add more test cases - Rename the 'Client' field to 'ClientInfo' - Remove the 'expiry_interval' from session record - Add more test cases for emqx_zone module - Add more test cases for emqx_banned module - Add more test cases for emqx_message module - Remove 'sockname', 'conn_mod' fields from type 'client' --- src/emqx_banned.erl | 11 +- src/emqx_channel.erl | 622 ++++++++++++++++++----------------- src/emqx_session.erl | 13 +- src/emqx_types.erl | 2 - src/emqx_zone.erl | 8 +- test/emqx_banned_SUITE.erl | 13 + test/emqx_cm_SUITE.erl | 60 ++++ test/emqx_inflight_SUITE.erl | 3 +- test/emqx_message_SUITE.erl | 13 +- test/emqx_zone_SUITE.erl | 76 ++++- 10 files changed, 482 insertions(+), 339 deletions(-) create mode 100644 test/emqx_cm_SUITE.erl diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 834e5260e..7e3e959e3 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -30,7 +30,7 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --export([start_link/0]). +-export([start_link/0, stop/0]). -export([ check/1 , add/1 @@ -69,6 +69,10 @@ mnesia(copy) -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +%% for tests +-spec(stop() -> ok). +stop() -> gen_server:stop(?MODULE). + -spec(check(emqx_types:client()) -> boolean()). check(#{client_id := ClientId, username := Username, @@ -105,8 +109,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - mnesia:async_dirty(fun expire_banned_items/1, - [erlang:system_time(second)]), + mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> @@ -125,7 +128,7 @@ code_change(_OldVsn, State, _Extra) -> -ifdef(TEST). ensure_expiry_timer(State) -> - State#{expiry_timer := emqx_misc:start_timer(timer:seconds(1), expire)}. + State#{expiry_timer := emqx_misc:start_timer(10, expire)}. -else. ensure_expiry_timer(State) -> State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}. diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 1e780eb08..19d361515 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -24,8 +24,6 @@ -logger_header("[Channel]"). --export([init/2]). - -export([ info/1 , info/2 , attrs/1 @@ -36,12 +34,13 @@ %% Exports for unit tests:( -export([set_field/3]). --export([ handle_in/2 +-export([ init/2 + , handle_in/2 , handle_out/2 , handle_call/2 , handle_cast/2 , handle_info/2 - , timeout/3 + , handle_timeout/3 , terminate/2 ]). @@ -58,19 +57,25 @@ -export_type([channel/0]). -record(channel, { - %% MQTT Client + %% MQTT ConnInfo + conninfo :: emqx_types:conninfo(), + %% MQTT ClientInfo client :: emqx_types:client(), %% MQTT Session session :: emqx_session:session(), - %% MQTT Protocol - protocol :: emqx_protocol:protocol(), %% Keepalive keepalive :: emqx_keepalive:keepalive(), + %% MQTT Will Msg + will_msg :: emqx_types:message(), + %% MQTT Topic Aliases + topic_aliases :: maybe(map()), + %% MQTT Topic Alias Maximum + alias_maximum :: maybe(map()), %% Timers timers :: #{atom() => disabled | maybe(reference())}, %% GC State gc_state :: maybe(emqx_gc:gc_state()), - %% OOM Policy + %% OOM Policy TODO: should be removed from channel. oom_policy :: maybe(emqx_oom:oom_policy()), %% Connected connected :: undefined | boolean(), @@ -97,53 +102,8 @@ will_timer => will_message }). --define(ATTR_KEYS, [client, session, protocol, connected, connected_at, disconnected_at]). - --define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, gc_state, disconnected_at]). - -%%-------------------------------------------------------------------- -%% Init the channel -%%-------------------------------------------------------------------- - --spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()). -init(ConnInfo, Options) -> - Zone = proplists:get_value(zone, Options), - Peercert = maps:get(peercert, ConnInfo, undefined), - Username = case peer_cert_as_username(Options) of - cn -> esockd_peercert:common_name(Peercert); - dn -> esockd_peercert:subject(Peercert); - crt -> Peercert; - _ -> undefined - end, - MountPoint = emqx_zone:get_env(Zone, mountpoint), - Client = maps:merge(#{zone => Zone, - username => Username, - client_id => <<>>, - mountpoint => MountPoint, - is_bridge => false, - is_superuser => false - }, ConnInfo), - EnableStats = emqx_zone:get_env(Zone, enable_stats, true), - StatsTimer = if - EnableStats -> undefined; - ?Otherwise -> disabled - end, - GcState = maybe_apply(fun emqx_gc:init/1, - emqx_zone:get_env(Zone, force_gc_policy)), - OomPolicy = maybe_apply(fun emqx_oom:init/1, - emqx_zone:get_env(Zone, force_shutdown_policy)), - #channel{client = Client, - gc_state = GcState, - oom_policy = OomPolicy, - timers = #{stats_timer => StatsTimer}, - connected = undefined, - takeover = false, - resuming = false, - pendings = [] - }. - -peer_cert_as_username(Options) -> - proplists:get_value(peer_cert_as_username, Options). +-define(ATTR_KEYS, [conninfo, client, session, connected, connected_at, disconnected_at]). +-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, topic_aliases, alias_maximum, gc_state, disconnected_at]). %%-------------------------------------------------------------------- %% Info, Attrs and Caps @@ -157,14 +117,18 @@ info(Channel) -> -spec(info(list(atom())|atom(), channel()) -> term()). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; -info(client, #channel{client = Client}) -> - Client; +info(conninfo, #channel{conninfo = ConnInfo}) -> + ConnInfo; +info(client, #channel{client = ClientInfo}) -> + ClientInfo; info(session, #channel{session = Session}) -> maybe_apply(fun emqx_session:info/1, Session); -info(protocol, #channel{protocol = Protocol}) -> - maybe_apply(fun emqx_protocol:info/1, Protocol); info(keepalive, #channel{keepalive = Keepalive}) -> maybe_apply(fun emqx_keepalive:info/1, Keepalive); +info(topic_aliases, #channel{topic_aliases = Aliases}) -> + Aliases; +info(alias_maximum, #channel{alias_maximum = Limits}) -> + Limits; info(gc_state, #channel{gc_state = GcState}) -> maybe_apply(fun emqx_gc:info/1, GcState); info(oom_policy, #channel{oom_policy = OomPolicy}) -> @@ -181,8 +145,8 @@ info(disconnected_at, #channel{disconnected_at = DisconnectedAt}) -> attrs(Channel) -> maps:from_list([{Key, attr(Key, Channel)} || Key <- ?ATTR_KEYS]). -attr(protocol, #channel{protocol = Proto}) -> - maybe_apply(fun emqx_protocol:attrs/1, Proto); +attr(conninfo, #channel{conninfo = ConnInfo}) -> + ConnInfo; attr(session, #channel{session = Session}) -> maybe_apply(fun emqx_session:attrs/1, Session); attr(Key, Channel) -> info(Key, Channel). @@ -201,6 +165,54 @@ set_field(Name, Val, Channel) -> Pos = emqx_misc:index_of(Name, Fields), setelement(Pos+1, Channel, Val). +%%-------------------------------------------------------------------- +%% Init the channel +%%-------------------------------------------------------------------- + +-spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()). +init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> + Zone = proplists:get_value(zone, Options), + Peercert = maps:get(peercert, ConnInfo, undefined), + Username = case peer_cert_as_username(Options) of + cn -> esockd_peercert:common_name(Peercert); + dn -> esockd_peercert:subject(Peercert); + crt -> Peercert; + _ -> undefined + end, + MountPoint = emqx_zone:get_env(Zone, mountpoint), + ClientInfo = #{zone => Zone, + peerhost => PeerHost, + peercert => Peercert, + client_id => undefined, + username => Username, + mountpoint => MountPoint, + is_bridge => false, + is_superuser => false + }, + StatsTimer = case emqx_zone:enable_stats(Zone) of + true -> undefined; + false -> disabled + end, + #channel{conninfo = ConnInfo, + client = ClientInfo, + gc_state = init_gc_state(Zone), + oom_policy = init_oom_policy(Zone), + timers = #{stats_timer => StatsTimer}, + connected = undefined, + takeover = false, + resuming = false, + pendings = [] + }. + +peer_cert_as_username(Options) -> + proplists:get_value(peer_cert_as_username, Options). + +init_gc_state(Zone) -> + maybe_apply(fun emqx_gc:init/1, emqx_zone:force_gc_policy(Zone)). + +init_oom_policy(Zone) -> + maybe_apply(fun emqx_oom:init/1, emqx_zone:force_shutdown_policy(Zone)). + %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- @@ -215,8 +227,8 @@ handle_in(?CONNECT_PACKET(_), Channel = #channel{connected = true}) -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> - case pipeline([fun check_connpkt/2, - fun init_protocol/2, + case pipeline([fun enrich_conninfo/2, + fun check_connect/2, fun enrich_client/2, fun set_logger_meta/2, fun check_banned/2, @@ -225,31 +237,25 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> {ok, NConnPkt, NChannel} -> process_connect(NConnPkt, NChannel); {error, ReasonCode, NChannel} -> - handle_out({connack, ReasonCode}, NChannel) + handle_out({connack, ReasonCode, ConnPkt}, NChannel) end; -handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), - Channel = #channel{protocol = Protocol}) -> - case pipeline([fun emqx_packet:check/1, - fun process_alias/2, - fun check_publish/2], Packet, Channel) of - {ok, NPacket, NChannel} -> - process_publish(NPacket, NChannel); - {error, ReasonCode, NChannel} -> - ProtoVer = emqx_protocol:info(proto_ver, Protocol), - ?LOG(warning, "Cannot publish message to ~s due to ~s", - [Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]), - handle_out({disconnect, ReasonCode}, NChannel) +handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> + case emqx_packet:check(Packet) of + ok -> + handle_publish(Packet, Channel); + {error, ReasonCode} -> + handle_out({disconnect, ReasonCode}, Channel) end; handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), - Channel = #channel{client = Client, session = Session}) -> + Channel = #channel{client = ClientInfo, session = Session}) -> case emqx_session:puback(PacketId, Session) of {ok, Msg, Publishes, NSession} -> - ok = emqx_hooks:run('message.acked', [Client, Msg]), + ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), handle_out({publish, Publishes}, Channel#channel{session = NSession}); {ok, Msg, NSession} -> - ok = emqx_hooks:run('message.acked', [Client, Msg]), + ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), {ok, Channel#channel{session = NSession}}; {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), @@ -262,10 +268,10 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), end; handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), - Channel = #channel{client = Client, session = Session}) -> + Channel = #channel{client = ClientInfo, session = Session}) -> case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> - ok = emqx_hooks:run('message.acked', [Client, Msg]), + ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), NChannel = Channel#channel{session = NSession}, handle_out({pubrel, PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> @@ -301,10 +307,10 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S end; handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{client = Client}) -> + Channel = #channel{client = ClientInfo}) -> case emqx_packet:check(Packet) of ok -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe', - [Client, Properties], + [ClientInfo, Properties], parse_topic_filters(TopicFilters)), TopicFilters2 = enrich_subid(Properties, TopicFilters1), {ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel), @@ -314,10 +320,10 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), end; handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{client = Client}) -> + Channel = #channel{client = ClientInfo}) -> case emqx_packet:check(Packet) of ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', - [Client, Properties], + [ClientInfo, Properties], parse_topic_filters(TopicFilters)), {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), handle_out({unsuback, PacketId, ReasonCodes}, NChannel); @@ -328,18 +334,18 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(?PACKET(?PINGREQ), Channel) -> {ok, ?PACKET(?PINGRESP), Channel}; -handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Session, protocol = Protocol}) -> - OldInterval = emqx_session:info(expiry_interval, Session), - Interval = get_property('Session-Expiry-Interval', Properties, OldInterval), +handle_in(?DISCONNECT_PACKET(RC, Props), + Channel = #channel{conninfo = ConnInfo = #{expiry_interval := OldInterval}}) -> + Interval = emqx_mqtt_props:get('Session-Expiry-Interval', Props, OldInterval), case OldInterval =:= 0 andalso Interval =/= OldInterval of true -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); false -> Channel1 = case RC of - ?RC_SUCCESS -> Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)}; + ?RC_SUCCESS -> Channel#channel{will_msg = undefined}; _ -> Channel end, - Channel2 = Channel1#channel{session = emqx_session:update_expiry_interval(Interval, Session)}, + Channel2 = Channel1#channel{conninfo = ConnInfo#{expiry_interval => Interval}}, case Interval of ?UINT_MAX -> {ok, ensure_timer(will_timer, Channel2)}; @@ -348,9 +354,7 @@ handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Sessi _Other -> Reason = case RC of ?RC_SUCCESS -> normal; - _ -> - Ver = emqx_protocol:info(proto_ver, Protocol), - emqx_reason_codes:name(RC, Ver) + _ -> emqx_reason_codes:name(RC, maps:get(proto_ver, ConnInfo)) end, {stop, {shutdown, Reason}, Channel2} end @@ -368,28 +372,43 @@ handle_in(Packet, Channel) -> %% Process Connect %%-------------------------------------------------------------------- -process_connect(ConnPkt, Channel) -> - case open_session(ConnPkt, Channel) of +process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, + Channel = #channel{conninfo = ConnInfo, client = ClientInfo}) -> + case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of {ok, #{session := Session, present := false}} -> NChannel = Channel#channel{session = Session}, - handle_out({connack, ?RC_SUCCESS, sp(false)}, NChannel); + handle_out({connack, ?RC_SUCCESS, sp(false), ConnPkt}, NChannel); {ok, #{session := Session, present := true, pendings := Pendings}} -> %%TODO: improve later. NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), NChannel = Channel#channel{session = Session, resuming = true, pendings = NPendings}, - handle_out({connack, ?RC_SUCCESS, sp(true)}, NChannel); + handle_out({connack, ?RC_SUCCESS, sp(true), ConnPkt}, NChannel); {error, Reason} -> %% TODO: Unknown error? ?LOG(error, "Failed to open session: ~p", [Reason]), - handle_out({connack, ?RC_UNSPECIFIED_ERROR}, Channel) + handle_out({connack, ?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel) end. %%-------------------------------------------------------------------- %% Process Publish %%-------------------------------------------------------------------- +handle_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), + Channel = #channel{conninfo = #{proto_ver := ProtoVer}}) -> + case pipeline([fun process_alias/2, + fun check_pub_acl/2, + fun check_pub_alias/2, + fun check_pub_caps/2], Packet, Channel) of + {ok, NPacket, NChannel} -> + process_publish(NPacket, NChannel); + {error, ReasonCode, NChannel} -> + ?LOG(warning, "Cannot publish message to ~s due to ~s", + [Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]), + handle_out({disconnect, ReasonCode}, NChannel) + end. + process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), Channel) -> Msg = publish_to_msg(Packet, Channel), process_publish(PacketId, Msg, Channel). @@ -424,11 +443,10 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2}, handle_out({pubrec, PacketId, RC}, Channel) end. -publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint}, - protocol = Protocol}) -> - Msg = emqx_packet:to_message(Client, Packet), +publish_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, + client = ClientInfo = #{mountpoint := MountPoint}}) -> + Msg = emqx_packet:to_message(ClientInfo, Packet), Msg1 = emqx_message:set_flag(dup, false, Msg), - ProtoVer = emqx_protocol:info(proto_ver, Protocol), Msg2 = emqx_message:set_header(proto_ver, ProtoVer, Msg1), emqx_mountpoint:mount(MountPoint, Msg2). @@ -447,13 +465,13 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> process_subscribe(More, [RC|Acc], NChannel). do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = - #channel{client = Client = #{mountpoint := MountPoint}, + #channel{client = ClientInfo = #{mountpoint := MountPoint}, session = Session}) -> case check_subscribe(TopicFilter, SubOpts, Channel) of ok -> TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), - case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of + case emqx_session:subscribe(ClientInfo, TopicFilter1, SubOpts1, Session) of {ok, NSession} -> {QoS, Channel#channel{session = NSession}}; {error, RC} -> {RC, Channel} @@ -477,10 +495,10 @@ process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> process_unsubscribe(More, [RC|Acc], NChannel). do_unsubscribe(TopicFilter, _SubOpts, Channel = - #channel{client = Client = #{mountpoint := MountPoint}, + #channel{client = ClientInfo = #{mountpoint := MountPoint}, session = Session}) -> TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), - case emqx_session:unsubscribe(Client, TopicFilter1, Session) of + case emqx_session:unsubscribe(ClientInfo, TopicFilter1, Session) of {ok, NSession} -> {?RC_SUCCESS, Channel#channel{session = NSession}}; {error, RC} -> {RC, Channel} @@ -491,35 +509,37 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = %%-------------------------------------------------------------------- %%TODO: RunFold or Pipeline -handle_out({connack, ?RC_SUCCESS, SP}, Channel = #channel{client = Client}) -> +handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, + Channel = #channel{conninfo = ConnInfo, client = ClientInfo}) -> AckProps = run_fold([fun enrich_caps/2, fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2 ], #{}, Channel), - Channel1 = ensure_keepalive(AckProps, ensure_connected(Channel)), - ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(Channel1)]), + Channel1 = Channel#channel{will_msg = emqx_packet:will_msg(ConnPkt), + alias_maximum = init_alias_maximum(ConnPkt, ClientInfo), + connected = true, + connected_at = os:timestamp() + }, + Channel2 = ensure_keepalive(AckProps, Channel1), + ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]), AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), - case maybe_resume_session(Channel1) of - ignore -> {ok, AckPacket, Channel1}; + case maybe_resume_session(Channel2) of + ignore -> + {ok, AckPacket, Channel2}; {ok, Publishes, NSession} -> - Channel2 = Channel1#channel{session = NSession, + Channel3 = Channel2#channel{session = NSession, resuming = false, pendings = []}, - {ok, Packets, _} = handle_out({publish, Publishes}, Channel2), + {ok, Packets, _} = handle_out({publish, Publishes}, Channel3), {ok, [AckPacket|Packets], Channel2} end; -handle_out({connack, ReasonCode}, Channel = #channel{client = Client, - protocol = Protocol - }) -> - ok = emqx_hooks:run('client.connected', [Client, ReasonCode, attrs(Channel)]), - ProtoVer = case Protocol of - undefined -> ?MQTT_PROTO_V5; - _ -> emqx_protocol:info(proto_ver, Protocol) - end, - ReasonCode1 = if - ProtoVer == ?MQTT_PROTO_V5 -> ReasonCode; - true -> emqx_reason_codes:compat(connack, ReasonCode) +handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo, + client = ClientInfo}) -> + ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]), + ReasonCode1 = case ProtoVer = maps:get(proto_ver, ConnInfo) of + ?MQTT_PROTO_V5 -> ReasonCode; + _Ver -> emqx_reason_codes:compat(connack, ReasonCode) end, Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), {stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel}; @@ -563,9 +583,9 @@ handle_out({publish, _PacketId, #message{from = ClientId, {ok, Channel}; handle_out({publish, PacketId, Msg}, Channel = - #channel{client = Client = #{mountpoint := MountPoint}}) -> + #channel{client = ClientInfo = #{mountpoint := MountPoint}}) -> Msg1 = emqx_message:update_expiry(Msg), - Msg2 = emqx_hooks:run_fold('message.delivered', [Client], Msg1), + Msg2 = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg1), Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2), {ok, emqx_message:to_packet(PacketId, Msg3), Channel}; @@ -581,24 +601,23 @@ handle_out({pubrec, PacketId, ReasonCode}, Channel) -> handle_out({pubcomp, PacketId, ReasonCode}, Channel) -> {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel}; -handle_out({suback, PacketId, ReasonCodes}, Channel = #channel{protocol = Protocol}) -> - ReasonCodes1 = case emqx_protocol:info(proto_ver, Protocol) of - ?MQTT_PROTO_V5 -> ReasonCodes; - _Ver -> - [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes] - end, +handle_out({suback, PacketId, ReasonCodes}, + Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> + {ok, ?SUBACK_PACKET(PacketId, ReasonCodes), Channel}; + +handle_out({suback, PacketId, ReasonCodes}, Channel) -> + ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes], {ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), Channel}; -handle_out({unsuback, PacketId, ReasonCodes}, Channel = #channel{protocol = Protocol}) -> - Unsuback = case emqx_protocol:info(proto_ver, Protocol) of - ?MQTT_PROTO_V5 -> - ?UNSUBACK_PACKET(PacketId, ReasonCodes); - _Ver -> ?UNSUBACK_PACKET(PacketId) - end, - {ok, Unsuback, Channel}; +handle_out({unsuback, PacketId, ReasonCodes}, + Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> + {ok, ?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel}; -handle_out({disconnect, ReasonCode}, Channel = #channel{protocol = Protocol}) -> - case emqx_protocol:info(proto_ver, Protocol) of +handle_out({unsuback, PacketId, _ReasonCodes}, Channel) -> + {ok, ?UNSUBACK_PACKET(PacketId), Channel}; + +handle_out({disconnect, ReasonCode}, Channel = #channel{conninfo = ConnInfo}) -> + case maps:get(proto_ver, ConnInfo) of ?MQTT_PROTO_V5 -> Reason = emqx_reason_codes:name(ReasonCode), Packet = ?DISCONNECT_PACKET(ReasonCode), @@ -660,16 +679,16 @@ handle_cast(Msg, Channel) -> -spec(handle_info(Info :: term(), channel()) -> {ok, channel()} | {stop, Reason :: term(), channel()}). -handle_info({subscribe, TopicFilters}, Channel = #channel{client = Client}) -> +handle_info({subscribe, TopicFilters}, Channel = #channel{client = ClientInfo}) -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe', - [Client, #{'Internal' => true}], + [ClientInfo, #{'Internal' => true}], parse_topic_filters(TopicFilters)), {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), {ok, NChannel}; -handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) -> +handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = ClientInfo}) -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', - [Client, #{'Internal' => true}], + [ClientInfo, #{'Internal' => true}], parse_topic_filters(TopicFilters)), {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; @@ -677,12 +696,11 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) -> handle_info(disconnected, Channel = #channel{connected = undefined}) -> shutdown(closed, Channel); -handle_info(disconnected, Channel = #channel{protocol = Protocol, - session = Session}) -> +handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := Interval}, + will_msg = WillMsg}) -> %% TODO: Why handle will_msg here? - publish_will_msg(emqx_protocol:info(will_msg, Protocol)), - NChannel = Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)}, - Interval = emqx_session:info(expiry_interval, Session), + publish_will_msg(WillMsg), + NChannel = Channel#channel{will_msg = undefined}, case Interval of ?UINT_MAX -> {ok, ensure_disconnected(NChannel)}; @@ -699,20 +717,19 @@ handle_info(Info, Channel) -> %% Handle timeout %%-------------------------------------------------------------------- --spec(timeout(reference(), Msg :: term(), channel()) +-spec(handle_timeout(reference(), Msg :: term(), channel()) -> {ok, channel()} | {ok, Result :: term(), channel()} | {stop, Reason :: term(), channel()}). -timeout(TRef, {emit_stats, Stats}, - Channel = #channel{client = #{client_id := ClientId}, - timers = #{stats_timer := TRef} - }) -> +handle_timeout(TRef, {emit_stats, Stats}, + Channel = #channel{client = #{client_id := ClientId}, + timers = #{stats_timer := TRef}}) -> ok = emqx_cm:set_chan_stats(ClientId, Stats), {ok, clean_timer(stats_timer, Channel)}; -timeout(TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive, - timers = #{alive_timer := TRef} - }) -> +handle_timeout(TRef, {keepalive, StatVal}, + Channel = #channel{keepalive = Keepalive, + timers = #{alive_timer := TRef}}) -> case emqx_keepalive:check(StatVal, Keepalive) of {ok, NKeepalive} -> NChannel = Channel#channel{keepalive = NKeepalive}, @@ -721,9 +738,9 @@ timeout(TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive, {stop, {shutdown, keepalive_timeout}, Channel} end; -timeout(TRef, retry_delivery, Channel = #channel{session = Session, - timers = #{retry_timer := TRef} - }) -> +handle_timeout(TRef, retry_delivery, + Channel = #channel{session = Session, + timers = #{retry_timer := TRef}}) -> case emqx_session:retry(Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; @@ -735,8 +752,9 @@ timeout(TRef, retry_delivery, Channel = #channel{session = Session, handle_out({publish, Publishes}, reset_timer(retry_timer, Timeout, NChannel)) end; -timeout(TRef, expire_awaiting_rel, Channel = #channel{session = Session, - timers = #{await_timer := TRef}}) -> +handle_timeout(TRef, expire_awaiting_rel, + Channel = #channel{session = Session, + timers = #{await_timer := TRef}}) -> case emqx_session:expire(awaiting_rel, Session) of {ok, Session} -> {ok, clean_timer(await_timer, Channel#channel{session = Session})}; @@ -744,15 +762,15 @@ timeout(TRef, expire_awaiting_rel, Channel = #channel{session = Session, {ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})} end; -timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) -> +handle_timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) -> shutdown(expired, Channel); -timeout(TRef, will_message, Channel = #channel{protocol = Protocol, - timers = #{will_timer := TRef}}) -> - publish_will_msg(emqx_protocol:info(will_msg, Protocol)), - {ok, clean_timer(will_timer, Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)})}; +handle_timeout(TRef, will_message, Channel = #channel{will_msg = WillMsg, + timers = #{will_timer := TRef}}) -> + publish_will_msg(WillMsg), + {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})}; -timeout(_TRef, Msg, Channel) -> +handle_timeout(_TRef, Msg, Channel) -> ?LOG(error, "Unexpected timeout: ~p~n", [Msg]), {ok, Channel}. @@ -796,25 +814,21 @@ interval(retry_timer, #channel{session = Session}) -> emqx_session:info(retry_interval, Session); interval(await_timer, #channel{session = Session}) -> emqx_session:info(await_rel_timeout, Session); -interval(expire_timer, #channel{session = Session}) -> - timer:seconds(emqx_session:info(expiry_interval, Session)); -interval(will_timer, #channel{protocol = Protocol}) -> - timer:seconds(emqx_protocol:info(will_delay_interval, Protocol)). +interval(expire_timer, #channel{conninfo = ConnInfo}) -> + timer:seconds(maps:get(expiry_interval, ConnInfo)); +interval(will_timer, #channel{will_msg = WillMsg}) -> + %% TODO: Ensure the header exists. + timer:seconds(emqx_message:get_header('Will-Delay-Interval', WillMsg)). %%-------------------------------------------------------------------- %% Terminate %%-------------------------------------------------------------------- -terminate(normal, #channel{client = Client}) -> - ok = emqx_hooks:run('client.disconnected', [Client, normal]); -terminate(Reason, #channel{client = Client, - protocol = Protocol - }) -> - ok = emqx_hooks:run('client.disconnected', [Client, Reason]), - if - Protocol == undefined -> ok; - true -> publish_will_msg(emqx_protocol:info(will_msg, Protocol)) - end. +terminate(normal, #channel{conninfo = ConnInfo, client = ClientInfo}) -> + ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]); +terminate(Reason, #channel{conninfo = ConnInfo, client = ClientInfo, will_msg = WillMsg}) -> + publish_will_msg(WillMsg), + ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]). -spec(received(pos_integer(), channel()) -> channel()). received(Oct, Channel) -> @@ -830,51 +844,78 @@ publish_will_msg(undefined) -> publish_will_msg(Msg) -> emqx_broker:publish(Msg). +%% @doc Enrich MQTT Connect Info. +enrich_conninfo(#mqtt_packet_connect{ + proto_name = ProtoName, + proto_ver = ProtoVer, + clean_start = CleanStart, + keepalive = Keepalive, + properties = ConnProps, + client_id = ClientId, + username = Username}, Channel) -> + #channel{conninfo = ConnInfo, client = #{zone := Zone}} = Channel, + MaxInflight = emqx_mqtt_props:get('Receive-Maximum', + ConnProps, emqx_zone:max_inflight(Zone)), + Interval = if ProtoVer == ?MQTT_PROTO_V5 -> + emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0); + true -> case CleanStart of + true -> 0; + false -> emqx_zone:session_expiry_interval(Zone) + end + end, + NConnInfo = ConnInfo#{proto_name => ProtoName, + proto_ver => ProtoVer, + clean_start => CleanStart, + keepalive => Keepalive, + client_id => ClientId, + username => Username, + conn_props => ConnProps, + receive_maximum => MaxInflight, + expiry_interval => Interval + }, + {ok, Channel#channel{conninfo = NConnInfo}}. + %% @doc Check connect packet. -check_connpkt(ConnPkt, #channel{client = #{zone := Zone}}) -> +check_connect(ConnPkt, #channel{client = #{zone := Zone}}) -> emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)). -%% @doc Init protocol record. -init_protocol(ConnPkt, Channel = #channel{client = #{zone := Zone}}) -> - {ok, Channel#channel{protocol = emqx_protocol:init(ConnPkt, Zone)}}. - %% @doc Enrich client -enrich_client(ConnPkt, Channel = #channel{client = Client}) -> - {ok, NConnPkt, NClient} = pipeline([fun set_username/2, - fun set_bridge_mode/2, - fun maybe_username_as_clientid/2, - fun maybe_assign_clientid/2, - fun fix_mountpoint/2 - ], ConnPkt, Client), - {ok, NConnPkt, Channel#channel{client = NClient}}. +enrich_client(ConnPkt, Channel = #channel{client = ClientInfo}) -> + {ok, NConnPkt, NClientInfo} = + pipeline([fun set_username/2, + fun set_bridge_mode/2, + fun maybe_username_as_clientid/2, + fun maybe_assign_clientid/2, + fun fix_mountpoint/2], ConnPkt, ClientInfo), + {ok, NConnPkt, Channel#channel{client = NClientInfo}}. -set_username(#mqtt_packet_connect{username = Username}, Client = #{username := undefined}) -> - {ok, Client#{username => Username}}; -set_username(_ConnPkt, Client) -> - {ok, Client}. +set_username(#mqtt_packet_connect{username = Username}, + ClientInfo = #{username := undefined}) -> + {ok, ClientInfo#{username => Username}}; +set_username(_ConnPkt, ClientInfo) -> + {ok, ClientInfo}. -set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, Client) -> - {ok, Client#{is_bridge => true}}; -set_bridge_mode(_ConnPkt, _Client) -> ok. +set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, ClientInfo) -> + {ok, ClientInfo#{is_bridge => true}}; +set_bridge_mode(_ConnPkt, _ClientInfo) -> ok. -maybe_username_as_clientid(_ConnPkt, Client = #{username := undefined}) -> - {ok, Client}; -maybe_username_as_clientid(_ConnPkt, Client = #{zone := Zone, username := Username}) -> +maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) -> + {ok, ClientInfo}; +maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Username}) -> case emqx_zone:use_username_as_clientid(Zone) of - true -> {ok, Client#{client_id => Username}}; + true -> {ok, ClientInfo#{client_id => Username}}; false -> ok end. -maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, Client) -> +maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, ClientInfo) -> %% Generate a rand clientId - RandId = emqx_guid:to_base62(emqx_guid:gen()), - {ok, Client#{client_id => RandId}}; -maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, Client) -> - {ok, Client#{client_id => ClientId}}. + {ok, ClientInfo#{client_id => emqx_guid:to_base62(emqx_guid:gen())}}; +maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, ClientInfo) -> + {ok, ClientInfo#{client_id => ClientId}}. fix_mountpoint(_ConnPkt, #{mountpoint := undefined}) -> ok; -fix_mountpoint(_ConnPkt, Client = #{mountpoint := Mountpoint}) -> - {ok, Client#{mountpoint := emqx_mountpoint:replvar(Mountpoint, Client)}}. +fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := Mountpoint}) -> + {ok, ClientInfo#{mountpoint := emqx_mountpoint:replvar(Mountpoint, ClientInfo)}}. %% @doc Set logger metadata. set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) -> @@ -884,15 +925,15 @@ set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) -> %% Check banned/flapping %%-------------------------------------------------------------------- -check_banned(_ConnPkt, #channel{client = Client = #{zone := Zone}}) -> - case emqx_zone:enable_banned(Zone) andalso emqx_banned:check(Client) of +check_banned(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) -> + case emqx_zone:enable_ban(Zone) andalso emqx_banned:check(ClientInfo) of true -> {error, ?RC_BANNED}; false -> ok end. -check_flapping(_ConnPkt, #channel{client = Client = #{zone := Zone}}) -> +check_flapping(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) -> case emqx_zone:enable_flapping_detect(Zone) - andalso emqx_flapping:check(Client) of + andalso emqx_flapping:check(ClientInfo) of true -> {error, ?RC_CONNECTION_RATE_EXCEEDED}; false -> ok end. @@ -904,38 +945,16 @@ check_flapping(_ConnPkt, #channel{client = Client = #{zone := Zone}}) -> auth_connect(#mqtt_packet_connect{client_id = ClientId, username = Username, password = Password}, - Channel = #channel{client = Client}) -> - case emqx_access_control:authenticate(Client#{password => Password}) of + Channel = #channel{client = ClientInfo}) -> + case emqx_access_control:authenticate(ClientInfo#{password => Password}) of {ok, AuthResult} -> - {ok, Channel#channel{client = maps:merge(Client, AuthResult)}}; + {ok, Channel#channel{client = maps:merge(ClientInfo, AuthResult)}}; {error, Reason} -> ?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p", [ClientId, Username, Reason]), {error, emqx_reason_codes:connack_error(Reason)} end. -%%-------------------------------------------------------------------- -%% Open session -%%-------------------------------------------------------------------- - -open_session(#mqtt_packet_connect{clean_start = CleanStart, - properties = ConnProps}, - #channel{client = Client = #{zone := Zone}, protocol = Protocol}) -> - MaxInflight = get_property('Receive-Maximum', ConnProps, - emqx_zone:get_env(Zone, max_inflight, 65535)), - Interval = - case emqx_protocol:info(proto_ver, Protocol) of - ?MQTT_PROTO_V5 -> get_property('Session-Expiry-Interval', ConnProps, 0); - _ -> - case CleanStart of - true -> 0; - false -> emqx_zone:get_env(Zone, session_expiry_interval, 0) - end - end, - emqx_cm:open_session(CleanStart, Client, #{max_inflight => MaxInflight, - expiry_interval => Interval - }). - %%-------------------------------------------------------------------- %% Process publish message: Client -> Broker %%-------------------------------------------------------------------- @@ -945,8 +964,8 @@ process_alias(Packet = #mqtt_packet{ properties = #{'Topic-Alias' := AliasId} } = Publish }, - Channel = #channel{protocol = Protocol}) -> - case emqx_protocol:find_alias(AliasId, Protocol) of + Channel = #channel{topic_aliases = Aliases}) -> + case find_alias(AliasId, Aliases) of {ok, Topic} -> {ok, Packet#mqtt_packet{ variable = Publish#mqtt_packet_publish{ @@ -958,23 +977,23 @@ process_alias(#mqtt_packet{ variable = #mqtt_packet_publish{topic_name = Topic, properties = #{'Topic-Alias' := AliasId} } - }, Channel = #channel{protocol = Protocol}) -> - {ok, Channel#channel{protocol = emqx_protocol:save_alias(AliasId, Topic, Protocol)}}; + }, Channel = #channel{topic_aliases = Aliases}) -> + {ok, Channel#channel{topic_aliases = save_alias(AliasId, Topic, Aliases)}}; process_alias(_Packet, Channel) -> {ok, Channel}. -%% Check Publish -check_publish(Packet, Channel) -> - pipeline([fun check_pub_acl/2, - fun check_pub_alias/2, - fun check_pub_caps/2], Packet, Channel). +find_alias(_AliasId, undefined) -> false; +find_alias(AliasId, Aliases) -> maps:find(AliasId, Aliases). + +save_alias(AliasId, Topic, undefined) -> #{AliasId => Topic}; +save_alias(AliasId, Topic, Aliases) -> maps:put(AliasId, Topic, Aliases). %% Check Pub ACL check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, - #channel{client = Client}) -> - case is_acl_enabled(Client) andalso - emqx_access_control:check_acl(Client, publish, Topic) of + #channel{client = ClientInfo}) -> + case is_acl_enabled(ClientInfo) andalso + emqx_access_control:check_acl(ClientInfo, publish, Topic) of false -> ok; allow -> ok; deny -> {error, ?RC_NOT_AUTHORIZED} @@ -986,9 +1005,8 @@ check_pub_alias(#mqtt_packet{ properties = #{'Topic-Alias' := AliasId} } }, - #channel{protocol = Protocol}) -> + #channel{alias_maximum = Limits}) -> %% TODO: Move to Protocol - Limits = emqx_protocol:info(alias_maximum, Protocol), case (Limits == undefined) orelse (Max = maps:get(inbound, Limits, 0)) == 0 orelse (AliasId > Max) of @@ -1013,9 +1031,9 @@ check_subscribe(TopicFilter, SubOpts, Channel) -> end. %% Check Sub ACL -check_sub_acl(TopicFilter, #channel{client = Client}) -> - case is_acl_enabled(Client) andalso - emqx_access_control:check_acl(Client, subscribe, TopicFilter) of +check_sub_acl(TopicFilter, #channel{client = ClientInfo}) -> + case is_acl_enabled(ClientInfo) andalso + emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of false -> allow; Result -> Result end. @@ -1029,64 +1047,63 @@ enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) -> enrich_subid(_Properties, TopicFilters) -> TopicFilters. -enrich_subopts(SubOpts, #channel{client = Client, protocol = Proto}) -> - #{zone := Zone, is_bridge := IsBridge} = Client, - case emqx_protocol:info(proto_ver, Proto) of - ?MQTT_PROTO_V5 -> SubOpts; - _Ver -> Rap = flag(IsBridge), - Nl = flag(emqx_zone:get_env(Zone, ignore_loop_deliver, false)), - SubOpts#{rap => Rap, nl => Nl} - end. +enrich_subopts(SubOpts, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> + SubOpts; -enrich_caps(AckProps, #channel{client = #{zone := Zone}, protocol = Protocol}) -> - case emqx_protocol:info(proto_ver, Protocol) of - ?MQTT_PROTO_V5 -> - #{max_packet_size := MaxPktSize, - max_qos_allowed := MaxQoS, - retain_available := Retain, - max_topic_alias := MaxAlias, - shared_subscription := Shared, - wildcard_subscription := Wildcard - } = emqx_mqtt_caps:get_caps(Zone), - AckProps#{'Retain-Available' => flag(Retain), - 'Maximum-Packet-Size' => MaxPktSize, - 'Topic-Alias-Maximum' => MaxAlias, - 'Wildcard-Subscription-Available' => flag(Wildcard), - 'Subscription-Identifier-Available' => 1, - 'Shared-Subscription-Available' => flag(Shared), - 'Maximum-QoS' => MaxQoS - }; - _Ver -> AckProps - end. +enrich_subopts(SubOpts, #channel{client = #{zone := Zone, is_bridge := IsBridge}}) -> + NL = flag(emqx_zone:ignore_loop_deliver(Zone)), + SubOpts#{rap => flag(IsBridge), nl => NL}. + +enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}, + client = #{zone := Zone}}) -> + #{max_packet_size := MaxPktSize, + max_qos_allowed := MaxQoS, + retain_available := Retain, + max_topic_alias := MaxAlias, + shared_subscription := Shared, + wildcard_subscription := Wildcard + } = emqx_mqtt_caps:get_caps(Zone), + AckProps#{'Retain-Available' => flag(Retain), + 'Maximum-Packet-Size' => MaxPktSize, + 'Topic-Alias-Maximum' => MaxAlias, + 'Wildcard-Subscription-Available' => flag(Wildcard), + 'Subscription-Identifier-Available' => 1, + 'Shared-Subscription-Available' => flag(Shared), + 'Maximum-QoS' => MaxQoS + }; +enrich_caps(AckProps, _Channel) -> + AckProps. enrich_server_keepalive(AckProps, #channel{client = #{zone := Zone}}) -> - case emqx_zone:get_env(Zone, server_keepalive) of + case emqx_zone:server_keepalive(Zone) of undefined -> AckProps; Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive} end. -enrich_assigned_clientid(AckProps, #channel{client = #{client_id := ClientId}, - protocol = Protocol}) -> - case emqx_protocol:info(client_id, Protocol) of - <<>> -> %% Original ClientId. +enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo, + client = #{client_id := ClientId} + }) -> + case maps:get(client_id, ConnInfo) of + <<>> -> %% Original ClientId is null. AckProps#{'Assigned-Client-Identifier' => ClientId}; _Origin -> AckProps end. -ensure_connected(Channel) -> - Channel#channel{connected = true, connected_at = os:timestamp(), disconnected_at = undefined}. +init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, + properties = Properties}, #{zone := Zone}) -> + #{outbound => emqx_mqtt_props:get('Topic-Alias-Maximum', Properties, 0), + inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, 0)}; +init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined. ensure_disconnected(Channel) -> Channel#channel{connected = false, disconnected_at = os:timestamp()}. ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) -> ensure_keepalive_timer(Interval, Channel); -ensure_keepalive(_AckProp, Channel = #channel{protocol = Protocol}) -> - case emqx_protocol:info(keepalive, Protocol) of - 0 -> Channel; - Interval -> ensure_keepalive_timer(Interval, Channel) - end. +ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) -> + ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel). +ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel = #channel{client = #{zone := Zone}}) -> Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), @@ -1137,11 +1154,6 @@ check_oom(OomPolicy) -> %% Helper functions %%-------------------------------------------------------------------- -get_property(_Name, undefined, Default) -> - Default; -get_property(Name, Props, Default) -> - maps:get(Name, Props, Default). - sp(true) -> 1; sp(false) -> 0. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 973193185..67b6608b8 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -114,8 +114,6 @@ max_awaiting_rel :: non_neg_integer(), %% Awaiting PUBREL Timeout await_rel_timeout :: timeout(), - %% Session Expiry Interval - expiry_interval :: timeout(), %% Enqueue Count enqueue_cnt :: non_neg_integer(), %% Created at @@ -127,11 +125,12 @@ -type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}). -define(DEFAULT_BATCH_N, 1000). --define(ATTR_KEYS, [expiry_interval, created_at]). +-define(ATTR_KEYS, [max_inflight, max_mqueue, retry_interval, + max_awaiting_rel, await_rel_timeout, created_at]). -define(INFO_KEYS, [subscriptions, max_subscriptions, upgrade_qos, inflight, max_inflight, retry_interval, mqueue_len, max_mqueue, mqueue_dropped, next_pkt_id, awaiting_rel, max_awaiting_rel, - await_rel_timeout, expiry_interval, created_at]). + await_rel_timeout, created_at]). -define(STATS_KEYS, [subscriptions_cnt, max_subscriptions, inflight, max_inflight, mqueue_len, max_mqueue, mqueue_dropped, awaiting_rel, max_awaiting_rel, enqueue_cnt]). @@ -142,8 +141,7 @@ %% @doc Init a session. -spec(init(emqx_types:client(), Options :: map()) -> session()). -init(#{zone := Zone}, #{receive_maximum := MaxInflight, - expiry_interval := ExpiryInterval}) -> +init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> #session{max_subscriptions = get_env(Zone, max_subscriptions, 0), subscriptions = #{}, upgrade_qos = get_env(Zone, upgrade_qos, false), @@ -154,7 +152,6 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight, awaiting_rel = #{}, max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000), - expiry_interval = ExpiryInterval, enqueue_cnt = 0, created_at = os:timestamp() }. @@ -210,8 +207,6 @@ info(max_awaiting_rel, #session{max_awaiting_rel = MaxAwaitingRel}) -> MaxAwaitingRel; info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; -info(expiry_interval, #session{expiry_interval = Interval}) -> - Interval; info(enqueue_cnt, #session{enqueue_cnt = Cnt}) -> Cnt; info(created_at, #session{created_at = CreatedAt}) -> diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 8dc9785a1..58b438010 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -103,9 +103,7 @@ atom() => term() }). -type(client() :: #{zone := zone(), - conn_mod := maybe(module()), peerhost := peerhost(), - sockname := peername(), client_id := client_id(), username := username(), peercert := esockd_peercert:peercert(), diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 6b927f9bd..9d45c64e1 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -30,7 +30,7 @@ -export([ use_username_as_clientid/1 , enable_stats/1 , enable_acl/1 - , enable_banned/1 + , enable_ban/1 , enable_flapping_detect/1 , ignore_loop_deliver/1 , server_keepalive/1 @@ -88,9 +88,9 @@ enable_stats(Zone) -> enable_acl(Zone) -> get_env(Zone, enable_acl, true). --spec(enable_banned(zone()) -> boolean()). -enable_banned(Zone) -> - get_env(Zone, enable_banned, false). +-spec(enable_ban(zone()) -> boolean()). +enable_ban(Zone) -> + get_env(Zone, enable_ban, false). -spec(enable_flapping_detect(zone()) -> boolean()). enable_flapping_detect(Zone) -> diff --git a/test/emqx_banned_SUITE.erl b/test/emqx_banned_SUITE.erl index ed97e5e96..99c5df3d3 100644 --- a/test/emqx_banned_SUITE.erl +++ b/test/emqx_banned_SUITE.erl @@ -27,6 +27,8 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> application:load(emqx), ok = ekka:start(), + %% for coverage + ok = emqx_banned:mnesia(copy), Config. end_per_suite(_Config) -> @@ -80,3 +82,14 @@ t_check(_) -> ?assertNot(emqx_banned:check(ClientInfo4)), ?assertEqual(0, emqx_banned:info(size)). +t_unused(_) -> + {ok, Banned} = emqx_banned:start_link(), + ok = emqx_banned:add(#banned{who = {client_id, <<"BannedClient">>}, + until = erlang:system_time(second) + }), + ?assertEqual(ignored, gen_server:call(Banned, unexpected_req)), + ?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)), + ?assertEqual(ok, Banned ! ok), + timer:sleep(500), %% expiry timer + ok = emqx_banned:stop(). + diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl new file mode 100644 index 000000000..cd91ed1f3 --- /dev/null +++ b/test/emqx_cm_SUITE.erl @@ -0,0 +1,60 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cm_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_reg_unreg_channel(_) -> + error(not_implemented). + +t_get_set_chan_attrs(_) -> + error(not_implemented). + +t_get_set_chan_stats(_) -> + error(not_implemented). + +t_open_session(_) -> + error(not_implemented). + +t_discard_session(_) -> + error(not_implemented). + +t_takeover_session(_) -> + error(not_implemented). + +t_lookup_channels(_) -> + error(not_implemented). + +t_lock_clientid(_) -> + error(not_implemented). + +t_unlock_clientid(_) -> + error(not_implemented). + diff --git a/test/emqx_inflight_SUITE.erl b/test/emqx_inflight_SUITE.erl index 7a015a443..4d7065486 100644 --- a/test/emqx_inflight_SUITE.erl +++ b/test/emqx_inflight_SUITE.erl @@ -84,8 +84,9 @@ t_is_empty(_) -> ?assert(emqx_inflight:is_empty(Inflight1)). t_window(_) -> + ?assertEqual([], emqx_inflight:window(emqx_inflight:new(0))), Inflight = emqx_inflight:insert( b, 2, emqx_inflight:insert( a, 1, emqx_inflight:new(2))), - [a, b] = emqx_inflight:window(Inflight). + ?assertEqual([a, b], emqx_inflight:window(Inflight)). diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 224739224..0c1cad6c7 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -56,6 +56,7 @@ t_get_set_flag(_) -> ?assertNot(emqx_message:get_flag(retain, Msg3)), Msg4 = emqx_message:unset_flag(dup, Msg3), Msg5 = emqx_message:unset_flag(retain, Msg4), + Msg5 = emqx_message:unset_flag(badflag, Msg5), ?assertEqual(undefined, emqx_message:get_flag(dup, Msg5, undefined)), ?assertEqual(undefined, emqx_message:get_flag(retain, Msg5, undefined)), Msg6 = emqx_message:set_flags(#{dup => true, retain => true}, Msg5), @@ -81,7 +82,7 @@ t_get_set_header(_) -> ?assertEqual(1, emqx_message:get_header(a, Msg3)), ?assertEqual(4, emqx_message:get_header(d, Msg2, 4)), Msg4 = emqx_message:remove_header(a, Msg3), - Msg4 = emqx_message:remove_header(a, Msg3), + Msg4 = emqx_message:remove_header(a, Msg4), ?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg4)). t_undefined_headers(_) -> @@ -93,16 +94,24 @@ t_undefined_headers(_) -> t_format(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - io:format("~s", [emqx_message:format(Msg)]). + io:format("~s~n", [emqx_message:format(Msg)]), + Msg1 = #message{id = <<"id">>, + qos = ?QOS_0, + flags = undefined, + headers = undefined + }, + io:format("~s~n", [emqx_message:format(Msg1)]). t_expired(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + ?assertNot(emqx_message:is_expired(Msg)), Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg), timer:sleep(500), ?assertNot(emqx_message:is_expired(Msg1)), timer:sleep(600), ?assert(emqx_message:is_expired(Msg1)), timer:sleep(1000), + Msg = emqx_message:update_expiry(Msg), Msg2 = emqx_message:update_expiry(Msg1), ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). diff --git a/test/emqx_zone_SUITE.erl b/test/emqx_zone_SUITE.erl index c9502d0a7..c56a68c51 100644 --- a/test/emqx_zone_SUITE.erl +++ b/test/emqx_zone_SUITE.erl @@ -21,29 +21,81 @@ -include_lib("eunit/include/eunit.hrl"). --define(OPTS, [{enable_acl, true}, - {enable_banned, false} +-define(ENVS, [{use_username_as_clientid, false}, + {server_keepalive, 60}, + {upgrade_qos, false}, + {session_expiry_interval, 7200}, + {retry_interval, 20000}, + {mqueue_store_qos0, true}, + {mqueue_priorities, none}, + {mqueue_default_priority, highest}, + {max_subscriptions, 0}, + {max_mqueue_len, 1000}, + {max_inflight, 32}, + {max_awaiting_rel, 100}, + {keepalive_backoff, 0.75}, + {ignore_loop_deliver, false}, + {idle_timeout, 15000}, + {force_shutdown_policy, #{max_heap_size => 838860800, + message_queue_len => 8000}}, + {force_gc_policy, #{bytes => 1048576, count => 1000}}, + {enable_stats, true}, + {enable_flapping_detect, false}, + {enable_ban, true}, + {enable_acl, true}, + {await_rel_timeout, 300000}, + {acl_deny_action, ignore} ]). all() -> emqx_ct:all(?MODULE). -t_set_get_env(_) -> +init_per_suite(Config) -> _ = application:load(emqx), - application:set_env(emqx, zones, [{external, ?OPTS}]), - {ok, _} = emqx_zone:start_link(), - ?assert(emqx_zone:get_env(external, enable_acl)), - ?assertNot(emqx_zone:get_env(external, enable_banned)), + application:set_env(emqx, zone_env, val), + application:set_env(emqx, zones, [{zone, ?ENVS}]), + Config. + +end_per_suite(_Config) -> + application:unset_env(emqx, zone_env), + application:unset_env(emqx, zones). + +t_zone_env_func(_) -> + lists:foreach(fun({Env, Val}) -> + case erlang:function_exported(emqx_zone, Env, 1) of + true -> + ?assertEqual(Val, erlang:apply(emqx_zone, Env, [zone])); + false -> ok + end + end, ?ENVS). + +t_get_env(_) -> + ?assertEqual(val, emqx_zone:get_env(undefined, zone_env)), + ?assertEqual(val, emqx_zone:get_env(undefined, zone_env, def)), + ?assert(emqx_zone:get_env(zone, enable_acl)), + ?assert(emqx_zone:get_env(zone, enable_ban)), ?assertEqual(defval, emqx_zone:get_env(extenal, key, defval)), ?assertEqual(undefined, emqx_zone:get_env(external, key)), ?assertEqual(undefined, emqx_zone:get_env(internal, key)), - ?assertEqual(def, emqx_zone:get_env(internal, key, def)), - emqx_zone:stop(). + ?assertEqual(def, emqx_zone:get_env(internal, key, def)). + +t_get_set_env(_) -> + ok = emqx_zone:set_env(zone, key, val), + ?assertEqual(val, emqx_zone:get_env(zone, key)), + true = emqx_zone:unset_env(zone, key), + ?assertEqual(undefined, emqx_zone:get_env(zone, key)). t_force_reload(_) -> {ok, _} = emqx_zone:start_link(), - application:set_env(emqx, zones, [{zone, [{key, val}]}]), - ?assertEqual(undefined, emqx_zone:get_env(zone, key)), + ?assertEqual(undefined, emqx_zone:get_env(xzone, key)), + application:set_env(emqx, zones, [{xzone, [{key, val}]}]), ok = emqx_zone:force_reload(), - ?assertEqual(val, emqx_zone:get_env(zone, key)), + ?assertEqual(val, emqx_zone:get_env(xzone, key)), + emqx_zone:stop(). + +t_uncovered_func(_) -> + {ok, Pid} = emqx_zone:start_link(), + ignored = gen_server:call(Pid, unexpected_call), + ok = gen_server:cast(Pid, unexpected_cast), + ok = Pid ! ok, emqx_zone:stop().