diff --git a/.gitignore b/.gitignore index 1259471d2..d1b8a289e 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ _build rebar3.crashdump .DS_Store rebar.config +emqx.iml diff --git a/Makefile b/Makefile index 4da68accf..7084cadaa 100644 --- a/Makefile +++ b/Makefile @@ -1,17 +1,19 @@ +.PHONY: plugins tests + PROJECT = emqx PROJECT_DESCRIPTION = EMQ X Broker PROJECT_VERSION = 3.0 -NO_AUTOPATCH = cuttlefish +NO_AUTOPATCH = gen_rpc cuttlefish DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc +dep_jsx = git https://github.com/talentdeficit/jsx dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master dep_lager_syslog = git https://github.com/basho/lager_syslog -dep_jsx = git https://github.com/talentdeficit/jsx dep_esockd = git https://github.com/emqtt/esockd v5.2.1 dep_ekka = git https://github.com/emqtt/ekka v0.2.2 dep_mochiweb = git https://github.com/emqtt/mochiweb v4.2.2 @@ -25,16 +27,15 @@ ERLC_OPTS += +'{parse_transform, lager_transform}' BUILD_DEPS = cuttlefish dep_cuttlefish = git https://github.com/emqtt/cuttlefish -TEST_DEPS = emqttc emq_dashboard +TEST_DEPS = emqttc dep_emqttc = git https://github.com/emqtt/emqttc -dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard develop TEST_ERLC_OPTS += +debug_info TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' EUNIT_OPTS = verbose -CT_SUITES = emqx emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \ +CT_SUITES = emqx emqx_broker emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \ emqx_vm emqx_net emqx_protocol emqx_access emqx_router CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1 diff --git a/README.md b/README.md index ecb5c569f..eceeee5de 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# *EMQ* - Erlang MQTT Broker +# *EMQ X* - EMQ X Broker [![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd) diff --git a/etc/emqx.conf b/etc/emqx.conf index b15206df9..fe69bec20 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -400,6 +400,11 @@ mqtt.max_packet_size = 64KB ## Value: on | off mqtt.websocket_protocol_header = on +## Check Websocket Upgrade Header. +## +## Value: on | off +mqtt.websocket_check_upgrade_header = on + ## The backoff for MQTT keepalive timeout. ## EMQ will kick a MQTT connection out until 'Keepalive * backoff * 2' timeout. ## @@ -578,6 +583,9 @@ mqtt.plugins.etc_dir ={{ platform_etc_dir }}/plugins/ ## Value: File mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins +## File to store loaded plugin names. +mqtt.plugins.expand_plugins_dir = {{ platform_plugins_dir }}/ + ##-------------------------------------------------------------------- ## MQTT Listeners ##-------------------------------------------------------------------- @@ -611,6 +619,7 @@ listener.tcp.external.max_clients = 102400 ## Mountpoint of the MQTT/TCP Listener. All the topics of this ## listener will be prefixed with the mount point if this option ## is enabled. +## Notice that EMQ X supports wildcard mount:%c clientid, %u username ## ## Value: String ## listener.tcp.external.mountpoint = external/ @@ -830,7 +839,7 @@ listener.ssl.external.acceptors = 16 ## Maximum number of concurrent MQTT/SSL connections. ## ## Value: Number -listener.ssl.external.max_clients = 1024 +listener.ssl.external.max_clients = 102400 ## TODO: Zone of the external MQTT/SSL listener belonged to. ## @@ -1314,7 +1323,7 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## TCP backlog for the WebSocket/SSL connection. ## -## See listener.tcp..backlog +## See: listener.tcp..backlog ## ## Value: Number >= 0 listener.wss.external.backlog = 1024 diff --git a/include/emqx.hrl b/include/emqx.hrl index 4752adc86..7476446f1 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -85,7 +85,9 @@ will_topic :: undefined | binary(), ws_initial_headers :: list({ws_header_key(), ws_header_val()}), mountpoint :: undefined | binary(), - connected_at :: erlang:timestamp() + connected_at :: erlang:timestamp(), + %%TODO: Headers + headers = [] :: list() }). -type(mqtt_client() :: #mqtt_client{}). diff --git a/priv/emqx.schema b/priv/emqx.schema index 27ccabba1..00f4735f8 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -538,6 +538,11 @@ end}. {datatype, flag} ]}. +{mapping, "mqtt.websocket_check_upgrade_header", "emqx.websocket_check_upgrade_header", [ + {default, on}, + {datatype, flag} +]}. + %%-------------------------------------------------------------------- %% MQTT Connection %%-------------------------------------------------------------------- @@ -760,6 +765,10 @@ end}. {datatype, string} ]}. +{mapping, "mqtt.plugins.expand_plugins_dir", "emqx.expand_plugins_dir", [ + {datatype, string} +]}. + %%-------------------------------------------------------------------- %% MQTT Listeners %%-------------------------------------------------------------------- diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index ba3ca569b..a9b3c5c25 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -28,6 +28,8 @@ -export([list/0]). +-export([load_expand_plugin/1]). + %% @doc Init plugins' config -spec(init() -> ok). init() -> @@ -49,6 +51,7 @@ init_config(CfgFile) -> %% @doc Load all plugins when the broker started. -spec(load() -> list() | {error, term()}). load() -> + load_expand_plugins(), case emqx:env(plugins_loaded_file) of {ok, File} -> ensure_file(File), @@ -58,6 +61,66 @@ load() -> ignore end. +load_expand_plugins() -> + case emqx:env(expand_plugins_dir) of + {ok, Dir} -> + PluginsDir = filelib:wildcard("*", Dir), + lists:foreach(fun(PluginDir) -> + case filelib:is_dir(Dir ++ PluginDir) of + true -> load_expand_plugin(Dir ++ PluginDir); + false -> ok + end + end, PluginsDir); + _ -> ok + end. + +load_expand_plugin(PluginDir) -> + init_expand_plugin_config(PluginDir), + Ebin = PluginDir ++ "/ebin", + code:add_patha(Ebin), + Modules = filelib:wildcard(Ebin ++ "/*.beam"), + lists:foreach(fun(Mod) -> + Module = list_to_atom(filename:basename(Mod, ".beam")), + code:load_file(Module) + end, Modules), + case filelib:wildcard(Ebin ++ "/*.app") of + [App|_] -> application:load(list_to_atom(filename:basename(App, ".app"))); + _ -> lager:error("load application fail"), {error, load_app_fail} + end. + +init_expand_plugin_config(PluginDir) -> + Priv = PluginDir ++ "/priv", + Etc = PluginDir ++ "/etc", + Schema = filelib:wildcard(Priv ++ "/*.schema"), + Conf = case filelib:wildcard(Etc ++ "/*.conf") of + [] -> []; + [Conf1] -> cuttlefish_conf:file(Conf1) + end, + AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf), + lists:foreach(fun({AppName, Envs}) -> + [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs] + end, AppsEnv). + +get_expand_plugin_config() -> + case emqx:env(expand_plugins_dir) of + {ok, Dir} -> + PluginsDir = filelib:wildcard("*", Dir), + lists:foldl(fun(PluginDir, Acc) -> + case filelib:is_dir(Dir ++ PluginDir) of + true -> + Etc = Dir ++ PluginDir ++ "/etc", + case filelib:wildcard("*.{conf,config}", Etc) of + [] -> Acc; + [Conf] -> [Conf | Acc] + end; + false -> + Acc + end + end, [], PluginsDir); + _ -> ok + end. + + ensure_file(File) -> case filelib:is_file(File) of false -> write_loaded([]); true -> ok end. @@ -98,7 +161,7 @@ stop_plugins(Names) -> list() -> case emqx:env(plugins_etc_dir) of {ok, PluginsEtc} -> - CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc), + CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(), Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles], StartedApps = names(started_app), lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9856ec744..7e5cd2402 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -35,6 +35,10 @@ -export([process/2]). +-ifdef(TEST). +-compile(export_all). +-endif. + -record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0, send_pkt = 0, send_msg = 0}). @@ -289,7 +293,7 @@ process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), false -> case emqx_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of {ok, TopicTable1} -> - emqx_session:subscribe(Session, PacketId, mount(MountPoint, TopicTable1)), + emqx_session:subscribe(Session, PacketId, mount(replvar(MountPoint, State), TopicTable1)), {ok, State}; {stop, _} -> {ok, State} @@ -307,7 +311,7 @@ process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics), session = Session}) -> case emqx_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of {ok, TopicTable} -> - emqx_session:unsubscribe(Session, mount(MountPoint, TopicTable)); + emqx_session:unsubscribe(Session, mount(replvar(MountPoint, State), TopicTable)); {stop, _} -> ok end, @@ -321,12 +325,12 @@ process(?PACKET(?DISCONNECT), State) -> {stop, normal, State#proto_state{will_msg = undefined}}. publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), - #proto_state{client_id = ClientId, - username = Username, - mountpoint = MountPoint, - session = Session}) -> + State = #proto_state{client_id = ClientId, + username = Username, + mountpoint = MountPoint, + session = Session}) -> Msg = emqx_message:from_packet(Username, ClientId, Packet), - emqx_session:publish(Session, mount(MountPoint, Msg)); + emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg)); publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> with_puback(?PUBACK, Packet, State); @@ -340,7 +344,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), mountpoint = MountPoint, session = Session}) -> Msg = emqx_message:from_packet(Username, ClientId, Packet), - case emqx_session:publish(Session, mount(MountPoint, Msg)) of + case emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg)) of ok -> send(?PUBACK_PACKET(Type, PacketId), State); {error, Error} -> @@ -415,10 +419,10 @@ shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> %% emqx_cm:unreg(ClientId). ok. -willmsg(Packet, #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) -> +willmsg(Packet, State = #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) -> case emqx_message:from_packet(Packet) of undefined -> undefined; - Msg -> mount(MountPoint, Msg) + Msg -> mount(replvar(MountPoint, State), Msg) end. %% Generate a client if if nulll @@ -577,6 +581,18 @@ clean_retain(_IsBridge, Msg) -> %% Mount Point %%-------------------------------------------------------------------- +replvar(undefined, _State) -> + undefined; +replvar(MountPoint, #proto_state{client_id = ClientId, username = Username}) -> + lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]). + +feed_var({<<"%c">>, ClientId}, MountPoint) -> + emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint); +feed_var({<<"%u">>, undefined}, MountPoint) -> + MountPoint; +feed_var({<<"%u">>, Username}, MountPoint) -> + emqx_topic:feed_var(<<"%u">>, Username, MountPoint). + mount(undefined, Any) -> Any; mount(MountPoint, Msg = #mqtt_message{topic = Topic}) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 1c7ae1f52..8c72cd499 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -667,7 +667,7 @@ expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs], case (timer:now_diff(Now, TS) div 1000) of Diff when Diff >= Timeout -> ?LOG(warning, "Dropped Qos2 Message for await_rel_timeout: ~p", [Msg], State), - emqttd_metrics:inc('messages/qos2/dropped'), + emqx_metrics:inc('messages/qos2/dropped'), expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Diff -> State#state{await_rel_timer = start_timer(Timeout - Diff, check_awaiting_rel)} diff --git a/src/emqx_ws.erl b/src/emqx_ws.erl index 355ea1b92..ff9ea3a5b 100644 --- a/src/emqx_ws.erl +++ b/src/emqx_ws.erl @@ -74,7 +74,8 @@ handle_request(Method, Path, Req) -> Req:not_found(). is_websocket(Upgrade) -> - Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". + (not emqx:env(websocket_check_upgrade_header, true)) orelse + (Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket"). check_protocol_header(Req) -> case emqx:env(websocket_protocol_header, false) of diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 71d6ae234..b47b23942 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -18,9 +18,7 @@ -compile(export_all). --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). +-include_lib("emqttc/include/emqttc_packet.hrl"). -define(APP, emqx). @@ -28,115 +26,37 @@ -include_lib("common_test/include/ct.hrl"). --define(CONTENT_TYPE, "application/json"). - --define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"}, - {verify, verify_peer}, - {fail_if_no_peer_cert, true}]). - --define(MQTT_SSL_CLIENT, [{keyfile, "certs/client-key.pem"}, - {cacertfile, "certs/cacert.pem"}, - {certfile, "certs/client-cert.pem"}]). - --define(URL, "http://localhost:8080/api/v2/"). - --define(APPL_JSON, "application/json"). - --define(PRINT(PATH), lists:flatten(io_lib:format(PATH, [atom_to_list(node())]))). - --define(GET_API, ["management/nodes", - ?PRINT("management/nodes/~s"), - "monitoring/nodes", - ?PRINT("monitoring/nodes/~s"), - "monitoring/listeners", - ?PRINT("monitoring/listeners/~s"), - "monitoring/metrics", - ?PRINT("monitoring/metrics/~s"), - "monitoring/stats", - ?PRINT("monitoring/stats/~s"), - ?PRINT("nodes/~s/clients"), - "routes"]). - +-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + username = <<"admin">>, + password = <<"public">>})). all() -> - [{group, protocol}, - {group, pubsub}, - {group, session}, - {group, broker}, - {group, metrics}, - {group, stats}, - {group, hook}, - {group, http}, - {group, alarms}, - {group, cli}, + [{group, connect}, {group, cleanSession}]. groups() -> - [{protocol, [sequence], + [{connect, [non_parallel_tests], [mqtt_connect, - mqtt_ssl_oneway, - mqtt_ssl_twoway]}, - {pubsub, [sequence], - [subscribe_unsubscribe, - publish, pubsub, - t_local_subscribe, - t_shared_subscribe, - 'pubsub#', 'pubsub+']}, - {session, [sequence], - [start_session]}, - {broker, [sequence], - [hook_unhook]}, - {metrics, [sequence], - [inc_dec_metric]}, - {stats, [sequence], - [set_get_stat]}, - {hook, [sequence], - [add_delete_hook, - run_hooks]}, - {http, [sequence], - [request_status, - request_publish, - get_api_lists - % websocket_test - ]}, - {alarms, [sequence], - [set_alarms] - }, - {cli, [sequence], - [ctl_register_cmd, - cli_status, - cli_broker, - cli_clients, - cli_sessions, - cli_routes, - cli_topics, - cli_subscriptions, - cli_bridges, - cli_plugins, - {listeners, [sequence], - [cli_listeners, - conflict_listeners - ]}, - cli_vm]}, + mqtt_connect_with_tcp, + mqtt_connect_with_ssl_oneway, + mqtt_connect_with_ssl_twoway, + mqtt_connect_with_ws]}, {cleanSession, [sequence], - [cleanSession_validate, - cleanSession_validate1] + [cleanSession_validate] } ]. init_per_suite(Config) -> - NewConfig = generate_config(), - lists:foreach(fun set_app_env/1, NewConfig), - Apps = application:ensure_all_started(?APP), - ct:log("Apps:~p", [Apps]), + emqx_ct_broker_helpers:run_setup_steps(), + % ct:log("Apps:~p", [Apps]), Config. end_per_suite(_Config) -> - emqx:shutdown(). + emqx_ct_broker_helpers:run_teardown_steps(). %%-------------------------------------------------------------------- %% Protocol Test %%-------------------------------------------------------------------- - mqtt_connect(_) -> %% Issue #599 %% Empty clientId and clean_session = false @@ -151,10 +71,21 @@ connect_broker_(Packet, RecvSize) -> gen_tcp:close(Sock), Data. -mqtt_ssl_oneway(_) -> +mqtt_connect_with_tcp(_) -> + %% Issue #599 + %% Empty clientId and clean_session = false + {ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]), + Packet = raw_send_serialise(?CLIENT), + gen_tcp:send(Sock, Packet), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), + gen_tcp:close(Sock). + +mqtt_connect_with_ssl_oneway(_) -> emqx:stop(), - change_opts(ssl_oneway), + emqx_ct_broker_helpers:change_opts(ssl_oneway), emqx:start(), + timer:sleep(5000), {ok, SslOneWay} = emqttc:start_link([{host, "localhost"}, {port, 8883}, {logger, debug}, @@ -173,12 +104,12 @@ mqtt_ssl_oneway(_) -> emqttc:disconnect(SslOneWay), emqttc:disconnect(Pub). -mqtt_ssl_twoway(_Config) -> +mqtt_connect_with_ssl_twoway(_Config) -> emqx:stop(), - change_opts(ssl_twoway), + emqx_ct_broker_helpers:change_opts(ssl_twoway), emqx:start(), timer:sleep(3000), - ClientSSl = [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT], + ClientSSl = emqx_ct_broker_helpers:client_ssl(), {ok, SslTwoWay} = emqttc:start_link([{host, "localhost"}, {port, 8883}, {client_id, <<"ssltwoway">>}, @@ -195,369 +126,16 @@ mqtt_ssl_twoway(_Config) -> emqttc:disconnect(SslTwoWay), emqttc:disconnect(Sub). -%%-------------------------------------------------------------------- -%% PubSub Test -%%-------------------------------------------------------------------- - -subscribe_unsubscribe(_) -> - ok = emqx:subscribe(<<"topic">>, <<"clientId">>), - ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]), - ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]), - ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), - ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>), - ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>). - -publish(_) -> - Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>), - ok = emqx:subscribe(<<"test/+">>), - timer:sleep(10), - emqx:publish(Msg), - ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). - -pubsub(_) -> - Self = self(), - ok = emqx:subscribe(<<"a/b/c">>, Self, [{qos, 1}]), - ?assertMatch({error, _}, emqx:subscribe(<<"a/b/c">>, Self, [{qos, 2}])), - timer:sleep(10), - [{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self), - [{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>), - emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), - ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), - spawn(fun() -> - emqx:subscribe(<<"a/b/c">>), - emqx:subscribe(<<"c/d/e">>), - timer:sleep(10), - emqx:unsubscribe(<<"a/b/c">>) - end), - timer:sleep(20), - emqx:unsubscribe(<<"a/b/c">>). - -t_local_subscribe(_) -> - ok = emqx:subscribe("$local/topic0"), - ok = emqx:subscribe("$local/topic1", <<"x">>), - ok = emqx:subscribe("$local/topic2", <<"x">>, [{qos, 2}]), - timer:sleep(10), - ?assertEqual([self()], emqx:subscribers("$local/topic0")), - ?assertEqual([{<<"x">>, self()}], emqx:subscribers("$local/topic1")), - ?assertEqual([{{<<"x">>, self()}, <<"$local/topic1">>, []}, - {{<<"x">>, self()}, <<"$local/topic2">>, [{qos,2}]}], - emqx:subscriptions(<<"x">>)), - ?assertEqual(ok, emqx:unsubscribe("$local/topic0")), - ?assertMatch({error, {subscription_not_found, _}}, emqx:unsubscribe("$local/topic0")), - ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"x">>)), - ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"x">>)), - ?assertEqual([], emqx:subscribers("topic1")), - ?assertEqual([], emqx:subscriptions(<<"x">>)). - -t_shared_subscribe(_) -> - emqx:subscribe("$local/$share/group1/topic1"), - emqx:subscribe("$share/group2/topic2"), - emqx:subscribe("$queue/topic3"), - timer:sleep(10), - ?assertEqual([self()], emqx:subscribers(<<"$local/$share/group1/topic1">>)), - ?assertEqual([{self(), <<"$local/$share/group1/topic1">>, []}, - {self(), <<"$queue/topic3">>, []}, - {self(), <<"$share/group2/topic2">>, []}], - lists:sort(emqx:subscriptions(self()))), - emqx:unsubscribe("$local/$share/group1/topic1"), - emqx:unsubscribe("$share/group2/topic2"), - emqx:unsubscribe("$queue/topic3"), - ?assertEqual([], lists:sort(emqx:subscriptions(self()))). - -'pubsub#'(_) -> - emqx:subscribe(<<"a/#">>), - timer:sleep(10), - emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), - ?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end), - emqx:unsubscribe(<<"a/#">>). - -'pubsub+'(_) -> - emqx:subscribe(<<"a/+/+">>), - timer:sleep(10), - emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), - ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end), - emqx:unsubscribe(<<"a/+/+">>). - -loop_recv(Topic, Timeout) -> - loop_recv(Topic, Timeout, []). - -loop_recv(Topic, Timeout, Acc) -> - receive - {dispatch, Topic, Msg} -> - loop_recv(Topic, Timeout, [Msg|Acc]) - after - Timeout -> {ok, Acc} - end. - -recv_loop(Msgs) -> - receive - {dispatch, _Topic, Msg} -> - recv_loop([Msg|Msgs]) - after - 100 -> lists:reverse(Msgs) - end. - -%%-------------------------------------------------------------------- -%% Session Group -%%-------------------------------------------------------------------- - -start_session(_) -> - {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>), - {ok, SessPid} = emqx_mock_client:start_session(ClientPid), - Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), - Message1 = Message#mqtt_message{pktid = 1}, - emqx_session:publish(SessPid, Message1), - emqx_session:pubrel(SessPid, 1), - emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), - Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>), - emqx_session:publish(SessPid, Message2), - emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]), - emqx_mock_client:stop(ClientPid). - -%%-------------------------------------------------------------------- -%% Broker Group -%%-------------------------------------------------------------------- -hook_unhook(_) -> +mqtt_connect_with_ws(_Config) -> + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + Packet = raw_send_serialise(?CLIENT), + ok = rfc6455_client:send_binary(WS, Packet), + {binary, P} = rfc6455_client:recv(WS), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(P), + {close, _} = rfc6455_client:close(WS), ok. -%%-------------------------------------------------------------------- -%% Metric Group -%%-------------------------------------------------------------------- -inc_dec_metric(_) -> - emqx_metrics:inc(gauge, 'messages/retained', 10), - emqx_metrics:dec(gauge, 'messages/retained', 10). - -%%-------------------------------------------------------------------- -%% Stats Group -%%-------------------------------------------------------------------- -set_get_stat(_) -> - emqx_stats:setstat('retained/max', 99), - 99 = emqx_stats:getstat('retained/max'). - -%%-------------------------------------------------------------------- -%% Hook Test -%%-------------------------------------------------------------------- - -add_delete_hook(_) -> - ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []), - ok = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), - {error, already_hooked} = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), - Callbacks = [{callback, undefined, fun ?MODULE:hook_fun1/1, [], 0}, - {callback, tag, fun ?MODULE:hook_fun2/1, [], 0}], - Callbacks = emqx_hooks:lookup(test_hook), - ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1), - ct:print("Callbacks: ~p~n", [emqx_hooks:lookup(test_hook)]), - ok = emqx:unhook(test_hook, {tag, fun ?MODULE:hook_fun2/1}), - {error, not_found} = emqx:unhook(test_hook1, {tag, fun ?MODULE:hook_fun2/1}), - [] = emqx_hooks:lookup(test_hook), - - ok = emqx:hook(emqx_hook, fun ?MODULE:hook_fun1/1, [], 9), - ok = emqx:hook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}, [], 8), - Callbacks2 = [{callback, "tag", fun ?MODULE:hook_fun2/1, [], 8}, - {callback, undefined, fun ?MODULE:hook_fun1/1, [], 9}], - Callbacks2 = emqx_hooks:lookup(emqx_hook), - ok = emqx:unhook(emqx_hook, fun ?MODULE:hook_fun1/1), - ok = emqx:unhook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}), - [] = emqx_hooks:lookup(emqx_hook). - -run_hooks(_) -> - ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), - ok = emqx:hook(foldl_hook, {tag, fun ?MODULE:hook_fun3/4}, [init]), - ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), - ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), - {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []), - {ok, []} = emqx:run_hooks(unknown_hook, [], []), - - ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), - ok = emqx:hook(foreach_hook, {tag, fun ?MODULE:hook_fun6/2}, [initArg]), - ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), - ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), - stop = emqx:run_hooks(foreach_hook, [arg]). - -hook_fun1([]) -> ok. -hook_fun2([]) -> {ok, []}. - -hook_fun3(arg1, arg2, _Acc, init) -> ok. -hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. -hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. - -hook_fun6(arg, initArg) -> ok. -hook_fun7(arg, initArg) -> any. -hook_fun8(arg, initArg) -> stop. - -%%-------------------------------------------------------------------- -%% HTTP Request Test -%%-------------------------------------------------------------------- - -request_status(_) -> - {InternalStatus, _ProvidedStatus} = init:get_status(), - AppStatus = - case lists:keysearch(?APP, 1, application:which_applications()) of - false -> not_running; - {value, _Val} -> running - end, - Status = iolist_to_binary(io_lib:format("Node ~s is ~s~nemqx is ~s", - [node(), InternalStatus, AppStatus])), - Url = "http://127.0.0.1:8080/status", - {ok, {{"HTTP/1.1", 200, "OK"}, _, Return}} = - httpc:request(get, {Url, []}, [], []), - ?assertEqual(binary_to_list(Status), Return). - -request_publish(_) -> - emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"random">>}, - {clean_sess, false}]), - SubParams = "{\"qos\":1, \"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", - ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("", ""))), - ok = emqx:subscribe(<<"a/b/c">>, self(), [{qos, 1}]), - Params = "{\"qos\":1, \"retain\":false, \"topic\" : \"a\/b\/c\", \"messages\" :\"hello\"}", - ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("", ""))), - ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), - - UnSubParams = "{\"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", - ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("", ""))). - -connect_emqx_publish_(Method, Api, Params, Auth) -> - Url = "http://127.0.0.1:8080/" ++ Api, - case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of - {error, socket_closed_remotely} -> - false; - {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> - true; - {ok, {{"HTTP/1.1", 400, _}, _, []}} -> - false; - {ok, {{"HTTP/1.1", 404, _}, _, []}} -> - false - end. - -auth_header_(User, Pass) -> - Encoded = base64:encode_to_string(lists:append([User,":",Pass])), - {"Authorization","Basic " ++ Encoded}. - -get_api_lists(_Config) -> - lists:foreach(fun request/1, ?GET_API). - -websocket_test(_) -> - Conn = esockd_connection:new(esockd_transport, nil, []), - Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1}, - mochiweb_headers:make([{"Sec-WebSocket-Key","Xn3fdKyc3qEXPuj2A3O+ZA=="}])), - - ct:log("Req:~p", [Req]). - %%emqx_http:handle_request(Req). - -set_alarms(_) -> - AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, - emqx_alarm:set_alarm(AlarmTest), - Alarms = emqx_alarm:get_alarms(), - ?assertEqual(1, length(Alarms)), - emqx_alarm:clear_alarm(<<"1">>), - [] = emqx_alarm:get_alarms(). - -%%-------------------------------------------------------------------- -%% Cli group -%%-------------------------------------------------------------------- - -ctl_register_cmd(_) -> - emqx_ctl:register_cmd(test_cmd, {?MODULE, test_cmd}), - erlang:yield(), - timer:sleep(5), - [{?MODULE, test_cmd}] = emqx_ctl:lookup(test_cmd), - emqx_ctl:run(["test_cmd", "arg1", "arg2"]), - emqx_ctl:unregister_cmd(test_cmd). - -test_cmd(["arg1", "arg2"]) -> - ct:print("test_cmd is called"); - -test_cmd([]) -> - io:format("test command"). - -cli_status(_) -> - emqx_cli:status([]). - -cli_broker(_) -> - emqx_cli:broker([]), - emqx_cli:broker(["stats"]), - emqx_cli:broker(["metrics"]), - emqx_cli:broker(["pubsub"]). - -cli_clients(_) -> - emqx_cli:clients(["list"]), - emqx_cli:clients(["show", "clientId"]), - emqx_cli:clients(["kick", "clientId"]). - -cli_sessions(_) -> - emqx_cli:sessions(["list"]), - emqx_cli:sessions(["list", "persistent"]), - emqx_cli:sessions(["list", "transient"]), - emqx_cli:sessions(["show", "clientId"]). - -cli_routes(_) -> - emqx:subscribe(<<"topic/route">>), - emqx_cli:routes(["list"]), - emqx_cli:routes(["show", "topic/route"]), - emqx:unsubscribe(<<"topic/route">>). - -cli_topics(_) -> - emqx:subscribe(<<"topic">>), - emqx_cli:topics(["list"]), - emqx_cli:topics(["show", "topic"]), - emqx:unsubscribe(<<"topic">>). - -cli_subscriptions(_) -> - emqx_cli:subscriptions(["list"]), - emqx_cli:subscriptions(["show", "clientId"]), - emqx_cli:subscriptions(["add", "clientId", "topic", "2"]), - emqx_cli:subscriptions(["del", "clientId", "topic"]). - -cli_plugins(_) -> - emqx_cli:plugins(["list"]), - emqx_cli:plugins(["load", "emqx_plugin_template"]), - emqx_cli:plugins(["unload", "emqx_plugin_template"]). - -cli_bridges(_) -> - emqx_cli:bridges(["list"]), - emqx_cli:bridges(["start", "a@127.0.0.1", "topic"]), - emqx_cli:bridges(["stop", "a@127.0.0.1", "topic"]). - -cli_listeners(_) -> - emqx_cli:listeners([]). - -conflict_listeners(_) -> - F = - fun() -> - process_flag(trap_exit, true), - emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]) - end, - spawn_link(F), - - {ok, C2} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]), - timer:sleep(100), - - Listeners = - lists:map(fun({{Protocol, ListenOn}, Pid}) -> - Key = atom_to_list(Protocol) ++ ":" ++ esockd:to_string(ListenOn), - {Key, [{acceptors, esockd:get_acceptors(Pid)}, - {max_clients, esockd:get_max_clients(Pid)}, - {current_clients, esockd:get_current_clients(Pid)}, - {shutdown_count, esockd:get_shutdown_count(Pid)}]} - end, esockd:listeners()), - L = proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners), - ?assertEqual(1, proplists:get_value(current_clients, L)), - ?assertEqual(1, proplists:get_value(conflict, proplists:get_value(shutdown_count, L))), - timer:sleep(100), - emqttc:disconnect(C2). - -cli_vm(_) -> - emqx_cli:vm([]), - emqx_cli:vm(["ports"]). - cleanSession_validate(_) -> {ok, C1} = emqttc:start_link([{host, "localhost"}, {port, 1883}, @@ -565,7 +143,6 @@ cleanSession_validate(_) -> {clean_sess, false}]), timer:sleep(10), emqttc:subscribe(C1, <<"topic">>, qos0), - ok = emqx_cli:sessions(["list", "persistent"]), emqttc:disconnect(C1), {ok, Pub} = emqttc:start_link([{host, "localhost"}, {port, 1883}, @@ -578,169 +155,16 @@ cleanSession_validate(_) -> {client_id, <<"c1">>}, {clean_sess, false}]), timer:sleep(100), - Metrics = emqx_metrics:all(), - ct:log("Metrics:~p~n", [Metrics]), - ?assertEqual(1, proplists:get_value('messages/qos0/sent', Metrics)), - ?assertEqual(1, proplists:get_value('messages/qos0/received', Metrics)), + receive {publish, _Topic, M1} -> + ?assertEqual(<<"m1">>, M1) + after 1000 -> false + end, emqttc:disconnect(Pub), emqttc:disconnect(C11). -cleanSession_validate1(_) -> - {ok, C1} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, true}]), - timer:sleep(10), - emqttc:subscribe(C1, <<"topic">>, qos1), - ok = emqx_cli:sessions(["list", "transient"]), - emqttc:disconnect(C1), - {ok, Pub} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"pub">>}]), +raw_send_serialise(Packet) -> + emqttc_serialiser:serialise(Packet). - emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 1}]), - timer:sleep(10), - {ok, C11} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]), - timer:sleep(100), - Metrics = emqx_metrics:all(), - ?assertEqual(0, proplists:get_value('messages/qos1/sent', Metrics)), - ?assertEqual(1, proplists:get_value('messages/qos1/received', Metrics)), - emqttc:disconnect(Pub), - emqttc:disconnect(C11). - -connect_emqx_pubsub_(Method, Api, Params, Auth) -> - Url = "http://127.0.0.1:8080/" ++ Api, - case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of - {error, socket_closed_remotely} -> - false; - {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> - true; - {ok, {{"HTTP/1.1", 400, _}, _, []}} -> - false; - {ok, {{"HTTP/1.1", 404, _}, _, []}} -> - false - end. - -request(Path) -> - http_get(get, Path). - -http_get(Method, Path) -> - req(Method, Path, []). - -http_put(Method, Path, Params) -> - req(Method, Path, format_for_upload(Params)). - -http_post(Method, Path, Params) -> - req(Method, Path, format_for_upload(Params)). - -req(Method, Path, Body) -> - Url = ?URL ++ Path, - Headers = auth_header_("", ""), - case httpc:request(Method, {Url, [Headers]}, [], []) of - {error, R} -> - ct:log("R:~p~n", [R]), - false; - {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> - true; - {ok, {{"HTTP/1.1", 400, _}, _, []}} -> - false; - {ok, {{"HTTP/1.1", 404, _}, _, []}} -> - false - end. - -format_for_upload(none) -> - <<"">>; -format_for_upload(List) -> - iolist_to_binary(mochijson2:encode(List)). - -ensure_ok(ok) -> ok; -ensure_ok({error, {already_started, _}}) -> ok. - -host() -> ct:print("!!!! Node: ~p~n", [node()]), [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. - -wait_running(Node) -> - wait_running(Node, 30000). - -wait_running(Node, Timeout) when Timeout < 0 -> - throw({wait_timeout, Node}); - -wait_running(Node, Timeout) -> - case rpc:call(Node, emqx, is_running, [Node]) of - true -> ok; - false -> timer:sleep(100), - wait_running(Node, Timeout - 100) - end. - -slave(emqx, Node) -> - {ok, Slave} = slave:start(host(), Node, "-config ../../test/emqx_SUITE_data/slave.config " ++ ensure_slave()), - ct:log("Slave:~p~n", [Slave]), - rpc:call(Slave, application, ensure_all_started, [emqx]), - Slave; - -slave(node, Node) -> - {ok, N} = slave:start(host(), Node, ensure_slave()), - N. - -ensure_slave() -> - EbinDir = local_path(["ebin"]), - DepsDir = local_path(["deps", "*", "ebin"]), - RpcDir = local_path(["deps", "gen_rpc", "_build", "dev", "lib", "*", "ebin"]), - "-pa " ++ EbinDir ++ " -pa " ++ DepsDir ++ " -pa " ++ RpcDir. - -change_opts(SslType) -> - {ok, Listeners} = application:get_env(?APP, listeners), - NewListeners = - lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> - case Protocol of - ssl -> - SslOpts = proplists:get_value(sslopts, Opts), - Keyfile = local_path(["etc/certs", "key.pem"]), - Certfile = local_path(["etc/certs", "cert.pem"]), - TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), - TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), - TupleList3 = - case SslType of - ssl_twoway-> - CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), - MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), - lists:merge(TupleList2, MutSslList); - _ -> - lists:filter(fun ({cacertfile, _}) -> false; - ({verify, _}) -> false; - ({fail_if_no_peer_cert, _}) -> false; - (_) -> true - end, TupleList2) - end, - [{Protocol, Port, lists:keyreplace(sslopts, 1, Opts, {sslopts, TupleList3})} | Acc]; - _ -> - [Listener | Acc] - end - end, [], Listeners), - application:set_env(?APP, listeners, NewListeners). - -generate_config() -> - Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), - Conf = conf_parse:file([local_path(["etc", "emqx.conf"])]), - cuttlefish_generator:map(Schema, Conf). - -get_base_dir(Module) -> - {file, Here} = code:is_loaded(Module), - filename:dirname(filename:dirname(Here)). - -get_base_dir() -> - get_base_dir(?MODULE). - -local_path(Components, Module) -> - filename:join([get_base_dir(Module) | Components]). - -local_path(Components) -> - local_path(Components, ?MODULE). - -set_app_env({App, Lists}) -> - lists:foreach(fun({Par, Var}) -> - application:set_env(App, Par, Var) - end, Lists). +raw_recv_pase(P) -> + emqttc_parser:parse(P, emqttc_parser:new()). diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index c311d4061..ac79daa3c 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -20,6 +20,8 @@ -include("emqx.hrl"). +-include_lib("common_test/include/ct.hrl"). + -define(AC, emqx_access_control). -import(emqx_access_rule, [compile/1, match/3]). diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl new file mode 100644 index 000000000..e0ec205f7 --- /dev/null +++ b/test/emqx_broker_SUITE.erl @@ -0,0 +1,238 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_broker_SUITE). + +-compile(export_all). + +-define(APP, emqx). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). + +-include("emqx.hrl"). + +all() -> + [ + {group, pubsub}, + {group, session}, + {group, broker}, + {group, metrics}, + {group, stats}, + {group, hook}, + {group, alarms}]. + +groups() -> + [ + {pubsub, [sequence], [subscribe_unsubscribe, + publish, pubsub, + t_local_subscribe, + t_shared_subscribe, + 'pubsub#', 'pubsub+']}, + {session, [sequence], [start_session]}, + {broker, [sequence], [hook_unhook]}, + {metrics, [sequence], [inc_dec_metric]}, + {stats, [sequence], [set_get_stat]}, + {hook, [sequence], [add_delete_hook, run_hooks]}, + {alarms, [sequence], [set_alarms]} + ]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +%%-------------------------------------------------------------------- +%% PubSub Test +%%-------------------------------------------------------------------- + +subscribe_unsubscribe(_) -> + ok = emqx:subscribe(<<"topic">>, <<"clientId">>), + ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]), + ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]), + ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), + ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>), + ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>). + +publish(_) -> + Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>), + ok = emqx:subscribe(<<"test/+">>), + timer:sleep(10), + emqx:publish(Msg), + ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). + +pubsub(_) -> + Self = self(), + ok = emqx:subscribe(<<"a/b/c">>, Self, [{qos, 1}]), + ?assertMatch({error, _}, emqx:subscribe(<<"a/b/c">>, Self, [{qos, 2}])), + timer:sleep(10), + [{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self), + [{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>), + emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), + ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), + spawn(fun() -> + emqx:subscribe(<<"a/b/c">>), + emqx:subscribe(<<"c/d/e">>), + timer:sleep(10), + emqx:unsubscribe(<<"a/b/c">>) + end), + timer:sleep(20), + emqx:unsubscribe(<<"a/b/c">>). + +t_local_subscribe(_) -> + ok = emqx:subscribe("$local/topic0"), + ok = emqx:subscribe("$local/topic1", <<"x">>), + ok = emqx:subscribe("$local/topic2", <<"x">>, [{qos, 2}]), + timer:sleep(10), + ?assertEqual([self()], emqx:subscribers("$local/topic0")), + ?assertEqual([{<<"x">>, self()}], emqx:subscribers("$local/topic1")), + ?assertEqual([{{<<"x">>, self()}, <<"$local/topic1">>, []}, + {{<<"x">>, self()}, <<"$local/topic2">>, [{qos,2}]}], + emqx:subscriptions(<<"x">>)), + ?assertEqual(ok, emqx:unsubscribe("$local/topic0")), + ?assertMatch({error, {subscription_not_found, _}}, emqx:unsubscribe("$local/topic0")), + ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"x">>)), + ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"x">>)), + ?assertEqual([], emqx:subscribers("topic1")), + ?assertEqual([], emqx:subscriptions(<<"x">>)). + +t_shared_subscribe(_) -> + emqx:subscribe("$local/$share/group1/topic1"), + emqx:subscribe("$share/group2/topic2"), + emqx:subscribe("$queue/topic3"), + timer:sleep(10), + ?assertEqual([self()], emqx:subscribers(<<"$local/$share/group1/topic1">>)), + ?assertEqual([{self(), <<"$local/$share/group1/topic1">>, []}, + {self(), <<"$queue/topic3">>, []}, + {self(), <<"$share/group2/topic2">>, []}], + lists:sort(emqx:subscriptions(self()))), + emqx:unsubscribe("$local/$share/group1/topic1"), + emqx:unsubscribe("$share/group2/topic2"), + emqx:unsubscribe("$queue/topic3"), + ?assertEqual([], lists:sort(emqx:subscriptions(self()))). + +'pubsub#'(_) -> + emqx:subscribe(<<"a/#">>), + timer:sleep(10), + emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), + ?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end), + emqx:unsubscribe(<<"a/#">>). + +'pubsub+'(_) -> + emqx:subscribe(<<"a/+/+">>), + timer:sleep(10), + emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), + ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end), + emqx:unsubscribe(<<"a/+/+">>). + +%%-------------------------------------------------------------------- +%% Session Group +%%-------------------------------------------------------------------- +start_session(_) -> + {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>), + {ok, SessPid} = emqx_mock_client:start_session(ClientPid), + Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), + Message1 = Message#mqtt_message{pktid = 1}, + emqx_session:publish(SessPid, Message1), + emqx_session:pubrel(SessPid, 1), + emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), + Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>), + emqx_session:publish(SessPid, Message2), + emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]), + emqx_mock_client:stop(ClientPid). + +%%-------------------------------------------------------------------- +%% Broker Group +%%-------------------------------------------------------------------- +hook_unhook(_) -> + ok. + +%%-------------------------------------------------------------------- +%% Metric Group +%%-------------------------------------------------------------------- +inc_dec_metric(_) -> + emqx_metrics:inc(gauge, 'messages/retained', 10), + emqx_metrics:dec(gauge, 'messages/retained', 10). + +%%-------------------------------------------------------------------- +%% Stats Group +%%-------------------------------------------------------------------- +set_get_stat(_) -> + emqx_stats:setstat('retained/max', 99), + 99 = emqx_stats:getstat('retained/max'). + +%%-------------------------------------------------------------------- +%% Hook Test +%%-------------------------------------------------------------------- + +add_delete_hook(_) -> + ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []), + ok = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), + {error, already_hooked} = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), + Callbacks = [{callback, undefined, fun ?MODULE:hook_fun1/1, [], 0}, + {callback, tag, fun ?MODULE:hook_fun2/1, [], 0}], + Callbacks = emqx_hooks:lookup(test_hook), + ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1), + ct:print("Callbacks: ~p~n", [emqx_hooks:lookup(test_hook)]), + ok = emqx:unhook(test_hook, {tag, fun ?MODULE:hook_fun2/1}), + {error, not_found} = emqx:unhook(test_hook1, {tag, fun ?MODULE:hook_fun2/1}), + [] = emqx_hooks:lookup(test_hook), + + ok = emqx:hook(emqx_hook, fun ?MODULE:hook_fun1/1, [], 9), + ok = emqx:hook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}, [], 8), + Callbacks2 = [{callback, "tag", fun ?MODULE:hook_fun2/1, [], 8}, + {callback, undefined, fun ?MODULE:hook_fun1/1, [], 9}], + Callbacks2 = emqx_hooks:lookup(emqx_hook), + ok = emqx:unhook(emqx_hook, fun ?MODULE:hook_fun1/1), + ok = emqx:unhook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}), + [] = emqx_hooks:lookup(emqx_hook). + +run_hooks(_) -> + ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), + ok = emqx:hook(foldl_hook, {tag, fun ?MODULE:hook_fun3/4}, [init]), + ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), + ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), + {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []), + {ok, []} = emqx:run_hooks(unknown_hook, [], []), + + ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), + ok = emqx:hook(foreach_hook, {tag, fun ?MODULE:hook_fun6/2}, [initArg]), + ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), + ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), + stop = emqx:run_hooks(foreach_hook, [arg]). + +hook_fun1([]) -> ok. +hook_fun2([]) -> {ok, []}. + +hook_fun3(arg1, arg2, _Acc, init) -> ok. +hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. +hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. + +hook_fun6(arg, initArg) -> ok. +hook_fun7(arg, initArg) -> any. +hook_fun8(arg, initArg) -> stop. + +set_alarms(_) -> + AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, + emqx_alarm:set_alarm(AlarmTest), + Alarms = emqx_alarm:get_alarms(), + ?assertEqual(1, length(Alarms)), + emqx_alarm:clear_alarm(<<"1">>), + [] = emqx_alarm:get_alarms(). + + diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl new file mode 100644 index 000000000..3dfe84fcc --- /dev/null +++ b/test/emqx_ct_broker_helpers.erl @@ -0,0 +1,102 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ct_broker_helpers). + +-compile(export_all). + +-define(APP, emqx). + +-define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"}, + {verify, verify_peer}, + {fail_if_no_peer_cert, true}]). + +-define(MQTT_SSL_CLIENT, [{keyfile, "certs/client-key.pem"}, + {cacertfile, "certs/cacert.pem"}, + {certfile, "certs/client-cert.pem"}]). + + +run_setup_steps() -> + NewConfig = generate_config(), + lists:foreach(fun set_app_env/1, NewConfig), + application:ensure_all_started(?APP). + +run_teardown_steps() -> + ?APP:shutdown(). + +generate_config() -> + Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), + Conf = conf_parse:file([local_path(["etc", "emqx.conf"])]), + cuttlefish_generator:map(Schema, Conf). + +get_base_dir(Module) -> + {file, Here} = code:is_loaded(Module), + filename:dirname(filename:dirname(Here)). + +get_base_dir() -> + get_base_dir(?MODULE). + +local_path(Components, Module) -> + filename:join([get_base_dir(Module) | Components]). + +local_path(Components) -> + local_path(Components, ?MODULE). + +set_app_env({App, Lists}) -> + lists:foreach(fun({acl_file, _Var}) -> + application:set_env(App, acl_file, local_path(["etc", "acl.conf"])); + ({license_file, _Var}) -> + application:set_env(App, license_file, local_path(["etc", "emqx.lic"])); + ({plugins_loaded_file, _Var}) -> + application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"])); + ({Par, Var}) -> + application:set_env(App, Par, Var) + end, Lists). + +change_opts(SslType) -> + {ok, Listeners} = application:get_env(?APP, listeners), + NewListeners = + lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> + case Protocol of + ssl -> + SslOpts = proplists:get_value(sslopts, Opts), + Keyfile = local_path(["etc/certs", "key.pem"]), + Certfile = local_path(["etc/certs", "cert.pem"]), + TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), + TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}), + TupleList3 = + case SslType of + ssl_twoway-> + CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]), + MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}), + lists:merge(TupleList2, MutSslList); + _ -> + lists:filter(fun ({cacertfile, _}) -> false; + ({verify, _}) -> false; + ({fail_if_no_peer_cert, _}) -> false; + (_) -> true + end, TupleList2) + end, + [{Protocol, Port, lists:keyreplace(sslopts, 1, Opts, {sslopts, TupleList3})} | Acc]; + _ -> + [Listener | Acc] + end + end, [], Listeners), + application:set_env(?APP, listeners, NewListeners). + +client_ssl() -> + [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT]. + diff --git a/test/emqx_mod_SUITE.erl b/test/emqx_mod_SUITE.erl index d1c340a0b..e87bddecc 100644 --- a/test/emqx_mod_SUITE.erl +++ b/test/emqx_mod_SUITE.erl @@ -23,13 +23,4 @@ all() -> [mod_subscription_rep]. mod_subscription_rep(_) -> ok. -%% <<"topic/clientId">> = emqttd_mod_subscription:rep( -%% <<"$c">>, <<"clientId">>, <<"topic/$c">>), -%% <<"topic/username">> = emqttd_mod_subscription:rep( -%% <<"$u">>, <<"username">>, <<"topic/$u">>), -%% <<"topic/username/clientId">> = emqttd_mod_subscription:rep( -%% <<"$c">>, <<"clientId">>, emqttd_mod_subscription:rep( -%% <<"$u">>, <<"username">>, <<"topic/$u/$c">>)). - - diff --git a/test/rfc6455_client.erl b/test/rfc6455_client.erl new file mode 100644 index 000000000..4696f7ab3 --- /dev/null +++ b/test/rfc6455_client.erl @@ -0,0 +1,252 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ Management Console. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2012-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rfc6455_client). + +-export([new/2, open/1, recv/1, send/2, send_binary/2, close/1, close/2]). + +-record(state, {host, port, addr, path, ppid, socket, data, phase}). + +%% -------------------------------------------------------------------------- + +new(WsUrl, PPid) -> + crypto:start(), + "ws://" ++ Rest = WsUrl, + [Addr, Path] = split("/", Rest, 1), + [Host, MaybePort] = split(":", Addr, 1, empty), + Port = case MaybePort of + empty -> 80; + V -> {I, ""} = string:to_integer(V), I + end, + State = #state{host = Host, + port = Port, + addr = Addr, + path = "/" ++ Path, + ppid = PPid}, + spawn(fun () -> + start_conn(State) + end). + +open(WS) -> + receive + {rfc6455, open, WS, Opts} -> + {ok, Opts}; + {rfc6455, close, WS, R} -> + {close, R} + end. + +recv(WS) -> + receive + {rfc6455, recv, WS, Payload} -> + {ok, Payload}; + {rfc6455, recv_binary, WS, Payload} -> + {binary, Payload}; + {rfc6455, close, WS, R} -> + {close, R} + end. + +send(WS, IoData) -> + WS ! {send, IoData}, + ok. + +send_binary(WS, IoData) -> + WS ! {send_binary, IoData}, + ok. + +close(WS) -> + close(WS, {1000, ""}). + +close(WS, WsReason) -> + WS ! {close, WsReason}, + receive + {rfc6455, close, WS, R} -> + {close, R} + end. + + +%% -------------------------------------------------------------------------- + +start_conn(State) -> + {ok, Socket} = gen_tcp:connect(State#state.host, State#state.port, + [binary, + {packet, 0}]), + Key = base64:encode_to_string(crypto:strong_rand_bytes(16)), + gen_tcp:send(Socket, + "GET " ++ State#state.path ++ " HTTP/1.1\r\n" ++ + "Host: " ++ State#state.addr ++ "\r\n" ++ + "Upgrade: websocket\r\n" ++ + "Connection: Upgrade\r\n" ++ + "Sec-WebSocket-Key: " ++ Key ++ "\r\n" ++ + "Origin: null\r\n" ++ + "Sec-WebSocket-Protocol: mqtt\r\n" ++ + "Sec-WebSocket-Version: 13\r\n\r\n"), + + loop(State#state{socket = Socket, + data = <<>>, + phase = opening}). + +do_recv(State = #state{phase = opening, ppid = PPid, data = Data}) -> + case split("\r\n\r\n", binary_to_list(Data), 1, empty) of + [_Http, empty] -> State; + [Http, Data1] -> + %% TODO: don't ignore http response data, verify key + PPid ! {rfc6455, open, self(), [{http_response, Http}]}, + State#state{phase = open, + data = Data1} + end; +do_recv(State = #state{phase = Phase, data = Data, socket = Socket, ppid = PPid}) + when Phase =:= open orelse Phase =:= closing -> + R = case Data of + <> + when L < 126 -> + {F, O, Payload, Rest}; + + <> -> + {F, O, Payload, Rest}; + + <> -> + {F, O, Payload, Rest}; + + <<_:1, _:3, _:4, 1:1, _/binary>> -> + %% According o rfc6455 5.1 the server must not mask any frames. + die(Socket, PPid, {1006, "Protocol error"}, normal); + _ -> + moredata + end, + case R of + moredata -> + State; + _ -> do_recv2(State, R) + end. + +do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid}, R) -> + case R of + {1, 1, Payload, Rest} -> + PPid ! {rfc6455, recv, self(), Payload}, + State#state{data = Rest}; + {1, 2, Payload, Rest} -> + PPid ! {rfc6455, recv_binary, self(), Payload}, + State#state{data = Rest}; + {1, 8, Payload, _Rest} -> + WsReason = case Payload of + <> -> {WC, WR}; + <<>> -> {1005, "No status received"} + end, + case Phase of + open -> %% echo + do_close(State, WsReason), + gen_tcp:close(Socket); + closing -> + ok + end, + die(Socket, PPid, WsReason, normal); + {_, _, _, Rest2} -> + io:format("Unknown frame type~n"), + die(Socket, PPid, {1006, "Unknown frame type"}, normal) + end. + +encode_frame(F, O, Payload) -> + Mask = crypto:strong_rand_bytes(4), + MaskedPayload = apply_mask(Mask, iolist_to_binary(Payload)), + + L = byte_size(MaskedPayload), + IoData = case L of + _ when L < 126 -> + [<>, Mask, MaskedPayload]; + _ when L < 65536 -> + [<>, Mask, MaskedPayload]; + _ -> + [<>, Mask, MaskedPayload] + end, + iolist_to_binary(IoData). + +do_send(State = #state{socket = Socket}, Payload) -> + gen_tcp:send(Socket, encode_frame(1, 1, Payload)), + State. + +do_send_binary(State = #state{socket = Socket}, Payload) -> + gen_tcp:send(Socket, encode_frame(1, 2, Payload)), + State. + +do_close(State = #state{socket = Socket}, {Code, Reason}) -> + Payload = iolist_to_binary([<>, Reason]), + gen_tcp:send(Socket, encode_frame(1, 8, Payload)), + State#state{phase = closing}. + + +loop(State = #state{socket = Socket, ppid = PPid, data = Data, + phase = Phase}) -> + receive + {tcp, Socket, Bin} -> + State1 = State#state{data = iolist_to_binary([Data, Bin])}, + loop(do_recv(State1)); + {send, Payload} when Phase == open -> + loop(do_send(State, Payload)); + {send_binary, Payload} when Phase == open -> + loop(do_send_binary(State, Payload)); + {tcp_closed, Socket} -> + die(Socket, PPid, {1006, "Connection closed abnormally"}, normal); + {close, WsReason} when Phase == open -> + loop(do_close(State, WsReason)) + end. + + +die(Socket, PPid, WsReason, Reason) -> + gen_tcp:shutdown(Socket, read_write), + PPid ! {rfc6455, close, self(), WsReason}, + exit(Reason). + + +%% -------------------------------------------------------------------------- + +split(SubStr, Str, Limit) -> + split(SubStr, Str, Limit, ""). + +split(SubStr, Str, Limit, Default) -> + Acc = split(SubStr, Str, Limit, [], Default), + lists:reverse(Acc). +split(_SubStr, Str, 0, Acc, _Default) -> [Str | Acc]; +split(SubStr, Str, Limit, Acc, Default) -> + {L, R} = case string:str(Str, SubStr) of + 0 -> {Str, Default}; + I -> {string:substr(Str, 1, I-1), + string:substr(Str, I+length(SubStr))} + end, + split(SubStr, R, Limit-1, [L | Acc], Default). + + +apply_mask(Mask, Data) when is_number(Mask) -> + apply_mask(<>, Data); + +apply_mask(<<0:32>>, Data) -> + Data; +apply_mask(Mask, Data) -> + iolist_to_binary(lists:reverse(apply_mask2(Mask, Data, []))). + +apply_mask2(M = <>, <>, Acc) -> + T = Data bxor Mask, + apply_mask2(M, Rest, [<> | Acc]); +apply_mask2(<>, <>, Acc) -> + T = Data bxor Mask, + [<> | Acc]; +apply_mask2(<>, <>, Acc) -> + T = Data bxor Mask, + [<> | Acc]; +apply_mask2(<>, <>, Acc) -> + T = Data bxor Mask, + [<> | Acc]; +apply_mask2(_, <<>>, Acc) -> + Acc.