diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c8a598ef1..10e3c78b6 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -267,9 +267,9 @@ connected(enter, _PrevSt, State = #state{proto_state = ProtoState}) -> shutdown(Reason, NState) end; -connected(cast, {incoming, Packet = ?PACKET(?CONNECT)}, State) -> - ?LOG(warning, "Unexpected connect: ~p", [Packet]), - shutdown(unexpected_incoming_connect, State); +connected(cast, {incoming, ?PACKET(?CONNECT)}, State) -> + Shutdown = fun(NewSt) -> shutdown(?RC_PROTOCOL_ERROR, NewSt) end, + handle_outgoing(?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Shutdown, State); connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> handle_incoming(Packet, fun keep_state/1, State); diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index 2f65d1222..0b3038523 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -122,6 +122,8 @@ critical(Metadata, Format, Args) when is_map(Metadata) -> logger:critical(Format, Args, Metadata). -spec(set_metadata_client_id(emqx_types:client_id()) -> ok). +set_metadata_client_id(<<>>) -> + ok; set_metadata_client_id(ClientId) -> set_proc_metadata(#{client_id => ClientId}). diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 97f7a9929..bec9dbaa0 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -45,25 +45,21 @@ on_client_connected(#{client_id := ClientId, username := Username, peername := {IpAddr, _} }, ConnAck, - #{session := #{clean_start := CleanStart, - expiry_interval := Interval - }, + #{session := Session, proto_name := ProtoName, proto_ver := ProtoVer, keepalive := Keepalive }, Env) -> - - case emqx_json:safe_encode(#{clientid => ClientId, - username => Username, - ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), - proto_name => ProtoName, - proto_ver => ProtoVer, - keepalive => Keepalive, - clean_start => CleanStart, - expiry_interval => Interval, - connack => ConnAck, - ts => erlang:system_time(millisecond) - }) of + + case emqx_json:safe_encode(maps:merge(#{clientid => ClientId, + username => Username, + ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), + proto_name => ProtoName, + proto_ver => ProtoVer, + keepalive => Keepalive, + connack => ConnAck, + ts => erlang:system_time(millisecond) + }, maps:with([clean_start, expiry_interval], Session))) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 0aa3a2a23..ec7f55330 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -79,10 +79,6 @@ do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS}) {error, ?RC_QOS_NOT_SUPPORTED}; do_check_pub(#{retain := true}, #{retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; -do_check_pub(#{topic_alias := TopicAlias}, - #{max_topic_alias := MaxTopicAlias}) - when 0 == TopicAlias; TopicAlias >= MaxTopicAlias -> - {error, ?RC_TOPIC_ALIAS_INVALID}; do_check_pub(_Flags, _Caps) -> ok. -spec(check_sub(emqx_types:zone(), diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index e0351b8a9..d8205b98a 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -82,8 +82,7 @@ validate_packet_id(_) -> validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) when I =< 0; I >= 16#FFFFFFF -> error(subscription_identifier_invalid); -validate_properties(?PUBLISH, #{'Topic-Alias':= I}) - when I =:= 0 -> +validate_properties(?PUBLISH, #{'Topic-Alias':= 0}) -> error(topic_alias_invalid); validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) -> error(protocol_error); diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 6e0ac8336..9f792a097 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -137,6 +137,7 @@ init(ConnInfo, Options) -> MountPoint = emqx_zone:get_env(Zone, mountpoint), Client = maps:merge(#{zone => Zone, username => Username, + client_id => <<>>, mountpoint => MountPoint, is_bridge => false, is_superuser => false @@ -175,13 +176,14 @@ handle_in(?CONNECT_PACKET( fun check_connect/2, fun enrich_client/2, fun auth_connect/2], ConnPkt, PState1) of - {ok, NConnPkt, NPState} -> - process_connect(NConnPkt, maybe_assign_clientid(NPState)); + {ok, NConnPkt, NPState = #protocol{client = #{client_id := ClientId1}}} -> + ok = emqx_logger:set_metadata_client_id(ClientId1), + process_connect(NConnPkt, NPState); {error, ReasonCode, NPState} -> - handle_out({disconnect, ReasonCode}, NPState) + handle_out({connack, ReasonCode}, NPState) end; -handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState= #protocol{proto_ver = Ver}) -> +handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), PState= #protocol{proto_ver = Ver}) -> case pipeline([fun validate_in/2, fun process_alias/2, fun check_publish/2], Packet, PState) of @@ -190,7 +192,7 @@ handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState= #protocol{prot {error, ReasonCode, NPState} -> ?LOG(warning, "Cannot publish message to ~s due to ~s", [Topic, emqx_reason_codes:text(ReasonCode, Ver)]), - puback(QoS, PacketId, ReasonCode, NPState) + handle_out({disconnect, ReasonCode}, NPState) end; handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), PState = #protocol{session = Session}) -> @@ -380,10 +382,6 @@ handle_out({publish, PacketId, Msg}, PState = #protocol{client = Client}) -> Packet = emqx_packet:from_message(PacketId, unmount(Client, Msg1)), {ok, Packet, PState}; -%% TODO: How to handle the err? -handle_out({puberr, _ReasonCode}, PState) -> - {ok, PState}; - handle_out({puback, PacketId, ReasonCode}, PState) -> {ok, ?PUBACK_PACKET(PacketId, ReasonCode), PState}; @@ -500,7 +498,7 @@ check_connect(ConnPkt, PState) -> fun check_banned/2, fun check_will_topic/2, fun check_will_retain/2], ConnPkt, PState) of - ok -> {ok, PState}; + {ok, NConnPkt, NPState} -> {ok, NConnPkt, NPState}; Error -> Error end. @@ -508,7 +506,7 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}, _PState) -> case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of true -> ok; - false -> {error, ?RC_PROTOCOL_ERROR} + false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} end. %% MQTT3.1 does not allow null clientId @@ -571,29 +569,43 @@ check_will_retain(#mqtt_packet_connect{will_retain = true}, %% Enrich client %%-------------------------------------------------------------------- -enrich_client(#mqtt_packet_connect{client_id = ClientId, - username = Username, - is_bridge = IsBridge - }, - PState = #protocol{client = Client}) -> - Client1 = set_username(Username, Client#{client_id => ClientId, - is_bridge => IsBridge - }), - {ok, PState#protocol{client = maybe_username_as_clientid(Client1)}}. +enrich_client(ConnPkt, PState) -> + case pipeline([fun set_username/2, + fun maybe_use_username_as_clientid/2, + fun maybe_assign_clientid/2, + fun set_rest_client_fields/2], ConnPkt, PState) of + {ok, NConnPkt, NPState} -> {ok, NConnPkt, NPState}; + Error -> Error + end. + +maybe_use_username_as_clientid(_ConnPkt, PState = #protocol{client = #{username := undefined}}) -> + {ok, PState}; +maybe_use_username_as_clientid(_ConnPkt, PState = #protocol{client = Client = #{zone := Zone, + username := Username}}) -> + NClient = + case emqx_zone:get_env(Zone, use_username_as_clientid, false) of + true -> Client#{client_id => Username}; + false -> Client + end, + {ok, PState#protocol{client = NClient}}. + +maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, + PState = #protocol{client = Client, ack_props = AckProps}) -> + ClientId = emqx_guid:to_base62(emqx_guid:gen()), + AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), + {ok, PState#protocol{client = Client#{client_id => ClientId}, ack_props = AckProps1}}; +maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, PState = #protocol{client = Client}) -> + {ok, PState#protocol{client = Client#{client_id => ClientId}}}. %% Username maybe not undefined if peer_cert_as_username -set_username(Username, Client = #{username := undefined}) -> - Client#{username => Username}; -set_username(_Username, Client) -> Client. +set_username(#mqtt_packet_connect{username = Username}, + PState = #protocol{client = Client = #{username := undefined}}) -> + {ok, PState#protocol{client = Client#{username => Username}}}; +set_username(_ConnPkt, PState) -> + {ok, PState}. -maybe_username_as_clientid(Client = #{username := undefined}) -> - Client; -maybe_username_as_clientid(Client = #{zone := Zone, - username := Username}) -> - case emqx_zone:get_env(Zone, use_username_as_clientid, false) of - true -> Client#{client_id => Username}; - false -> Client - end. +set_rest_client_fields(#mqtt_packet_connect{is_bridge = IsBridge}, PState = #protocol{client = Client}) -> + {ok, PState#protocol{client = Client#{is_bridge => IsBridge}}}. %%-------------------------------------------------------------------- %% Auth Connect @@ -612,18 +624,6 @@ auth_connect(#mqtt_packet_connect{client_id = ClientId, {error, emqx_reason_codes:connack_error(Reason)} end. -%%-------------------------------------------------------------------- -%% Assign a random clientId -%%-------------------------------------------------------------------- - -maybe_assign_clientid(PState = #protocol{client = Client = #{client_id := <<>>}, - ack_props = AckProps}) -> - ClientId = emqx_guid:to_base62(emqx_guid:gen()), - Client1 = Client#{client_id => ClientId}, - AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), - PState#protocol{client = Client1, ack_props = AckProps1}; -maybe_assign_clientid(PState) -> PState. - %%-------------------------------------------------------------------- %% Process Connect %%-------------------------------------------------------------------- @@ -671,7 +671,7 @@ process_alias(Packet = #mqtt_packet{ {ok, Packet#mqtt_packet{ variable = Publish#mqtt_packet_publish{ topic_name = Topic}}, PState}; - false -> {error, ?RC_TOPIC_ALIAS_INVALID} + false -> {error, ?RC_PROTOCOL_ERROR} end; process_alias(#mqtt_packet{ @@ -760,17 +760,6 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2}, handle_out({pubrec, PacketId, ReasonCode}, PState) end. -%%-------------------------------------------------------------------- -%% Puback -%%-------------------------------------------------------------------- - -puback(?QOS_0, _PacketId, ReasonCode, PState) -> - handle_out({puberr, ReasonCode}, PState); -puback(?QOS_1, PacketId, ReasonCode, PState) -> - handle_out({puback, PacketId, ReasonCode}, PState); -puback(?QOS_2, PacketId, ReasonCode, PState) -> - handle_out({pubrec, PacketId, ReasonCode}, PState). - %%-------------------------------------------------------------------- %% Process subscribe request %%-------------------------------------------------------------------- diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 116f5756e..81db5305c 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -248,6 +248,8 @@ info(created_at, #session{created_at = CreatedAt}) -> %%-------------------------------------------------------------------- -spec(attrs(session()) -> emqx_types:attrs()). +attrs(undefined) -> + #{}; attrs(#session{clean_start = CleanStart, expiry_interval = ExpiryInterval, created_at = CreatedAt}) -> diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index c9cc0b073..961229ed0 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -26,23 +26,17 @@ all() -> emqx_ct:all(?MODULE). t_check_pub(_) -> PubCaps = #{max_qos_allowed => ?QOS_1, - retain_available => false, - max_topic_alias => 4 + retain_available => false }, ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, - retain => false, - topic_alias => 1 - }), + retain => false}), PubFlags1 = #{qos => ?QOS_2, retain => false}, ?assertEqual({error, ?RC_QOS_NOT_SUPPORTED}, emqx_mqtt_caps:check_pub(zone, PubFlags1)), PubFlags2 = #{qos => ?QOS_1, retain => true}, ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, emqx_mqtt_caps:check_pub(zone, PubFlags2)), - PubFlags3 = #{qos => ?QOS_1, retain => false, topic_alias => 5}, - ?assertEqual({error, ?RC_TOPIC_ALIAS_INVALID}, - emqx_mqtt_caps:check_pub(zone, PubFlags3)), true = emqx_zone:unset_env(zone, '$mqtt_pub_caps'). t_check_sub(_) ->