diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 30744b300..b48c05af5 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -283,6 +283,10 @@ parse(#share{topic = Topic = <>}, _Options) -> error({invalid_topic_filter, Topic}); parse(#share{topic = Topic = <>}, _Options) -> error({invalid_topic_filter, Topic}); +parse(#share{} = T, #{nl := 1} = _Options) -> + %% Protocol Error and Should Disconnect + %% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1] + error({invalid_subopts_nl, maybe_format_share(T)}); parse(<>, Options) -> parse(#share{group = <>, topic = Topic}, Options); parse(TopicFilter = <>, Options) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index ef4fd2e63..e6a9b9555 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -758,6 +758,12 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) -> case do_subscribe(ClientID, Topic, Opts) of {error, channel_not_found} -> {404, ?CLIENTID_NOT_FOUND}; + {error, invalid_subopts_nl} -> + {400, #{ + code => <<"INVALID_PARAMETER">>, + message => + <<"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]">> + }}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOWN_ERROR">>, message => Message}}; @@ -804,18 +810,25 @@ unsubscribe_batch(#{clientid := ClientID, topics := Topics}) -> %% internal function do_subscribe(ClientID, Topic0, Options) -> - {Topic, Opts} = emqx_topic:parse(Topic0, Options), - TopicTable = [{Topic, Opts}], - case emqx_mgmt:subscribe(ClientID, TopicTable) of - {error, Reason} -> - {error, Reason}; - {subscribe, Subscriptions, Node} -> - case proplists:is_defined(Topic, Subscriptions) of - true -> - {ok, Options#{node => Node, clientid => ClientID, topic => Topic0}}; - false -> - {error, unknow_error} + try emqx_topic:parse(Topic0, Options) of + {Topic, Opts} -> + TopicTable = [{Topic, Opts}], + case emqx_mgmt:subscribe(ClientID, TopicTable) of + {error, Reason} -> + {error, Reason}; + {subscribe, Subscriptions, Node} -> + case proplists:is_defined(Topic, Subscriptions) of + true -> + {ok, Options#{node => Node, clientid => ClientID, topic => Topic0}}; + false -> + {error, unknow_error} + end end + catch + error:{invalid_subopts_nl, _} -> + {error, invalid_subopts_nl}; + _:Reason -> + {error, Reason} end. -spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->