diff --git a/CHANGELOG.md b/CHANGELOG.md index b6faf9368..384dea81b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,30 @@ eMQTTD ChangeLog ================== +0.5.4-alpha (2015-03-22) +------------------------- + +Benchmark this release on a ubuntu/14.04 server with 8 cores, 32G memory from QingCloud.com: + +``` +200K Connections, +30K Messages/Sec, +20Mbps In/Out Traffic, +200K Topics, +200K Subscribers, + +Consumed 7G memory, 40% CPU/core +``` + +Benchmark code: https://github.com/emqtt/emqttd_benchmark + +Change: rewrite emqttd_pubsub to handle more concurrent subscribe requests. + +Change: ./bin/emqttd_ctl add 'stats', 'metrics' commands. + +Bugfix: issue #71, #72 + + 0.5.3-alpha (2015-03-19) ------------------------- diff --git a/README.md b/README.md index b86f26547..1637dff9a 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,24 @@ -# eMQTT [![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd) +# eMQTTD [![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd) -eMQTT is a clusterable, massively scalable, fault-tolerant and extensible MQTT V3.1/V3.1.1 broker written in Erlang/OTP. +eMQTTD is a clusterable, massively scalable, fault-tolerant and extensible MQTT V3.1/V3.1.1 broker written in Erlang/OTP. -eMQTT support MQTT V3.1/V3.1.1 Protocol Specification. +eMQTTD support MQTT V3.1/V3.1.1 Protocol Specification. + +eMQTTD requires Erlang R17+. + + +## Benchmark + +Benchmark 0.5.4-alpha on a ubuntu/14.04 server with 8 cores, 32G memory from QingCloud: + +200K Connections, 200K Topics, 20K Messages/sec, 20Mbps In/Out with 7G Memory, 40%CPU/core -eMQTT requires Erlang R17+. ## NOTICE eMQTTD still cannot handle massive retained messages. + ## Featues Full MQTT V3.1.1 Support @@ -41,15 +50,15 @@ Bridge brokers locally or remotelly ## Startup in Five Minutes ``` -$ git clone git://github.com/emqtt/emqtt.git +$ git clone git://github.com/emqtt/emqttd.git -$ cd emqtt +$ cd emqttd $ make && make dist -$ cd rel/emqtt +$ cd rel/emqttd -$ ./bin/emqtt console +$ ./bin/emqttd console ``` ## Deploy and Start @@ -57,18 +66,18 @@ $ ./bin/emqtt console ### start ``` -cp -R rel/emqtt $INSTALL_DIR +cp -R rel/emqttd $INSTALL_DIR -cd $INSTALL_DIR/emqtt +cd $INSTALL_DIR/emqttd -./bin/emqtt start +./bin/emqttd start ``` ### stop ``` -./bin/emqtt stop +./bin/emqttd stop ``` @@ -77,7 +86,7 @@ cd $INSTALL_DIR/emqtt ### etc/app.config ``` - {emqtt, [ + {emqttd, [ {auth, {anonymous, []}}, %internal, anonymous {listen, [ {mqtt, 1883, [ @@ -104,7 +113,7 @@ cd $INSTALL_DIR/emqtt ``` --name emqtt@127.0.0.1 +-name emqttd@127.0.0.1 -setcookie emqtt @@ -113,7 +122,7 @@ cd $INSTALL_DIR/emqtt When nodes clustered, vm.args should be configured as below: ``` --name emqtt@host1 +-name emqttd@host1 ``` ## Cluster @@ -123,22 +132,22 @@ Suppose we cluster two nodes on 'host1', 'host2', Steps: on 'host1': ``` -./bin/emqtt start +./bin/emqttd start ``` on 'host2': ``` -./bin/emqtt start +./bin/emqttd start -./bin/emqtt_ctl cluster emqtt@host1 +./bin/emqttd_ctl cluster emqttd@host1 ``` -Run './bin/emqtt_ctl cluster' on 'host1' or 'host2' to check cluster nodes. +Run './bin/emqttd_ctl cluster' on 'host1' or 'host2' to check cluster nodes. ## HTTP API -eMQTT support http to publish message. +eMQTTD support http to publish message. Example: @@ -163,7 +172,7 @@ message | Message ## Design -[Design Wiki](https://github.com/emqtt/emqtt/wiki) +[Design Wiki](https://github.com/emqtt/emqttd/wiki) ## License diff --git a/TODO b/TODO index c456a1161..3b2531023 100644 --- a/TODO +++ b/TODO @@ -1,4 +1,9 @@ +v0.9.0-alpha (2015-03-20) +------------------------- + +emqtt_sm, emqtt_cm, emqtt_pubsub performance issue... + v0.8.0-alpha (2015-03-20) ------------------------- diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index a61daf48e..3c0bbd24c 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.5.0"}, + {vsn, "0.5.4"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index d80979516..765f988e1 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -81,7 +81,7 @@ init([Node, SubTopic, Options]) -> true -> true = erlang:monitor_node(Node, true), State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}), - emqttd_pubsub:subscribe({SubTopic, ?QOS_0}, self()), + emqttd_pubsub:subscribe({SubTopic, ?QOS_0}), {ok, State}; false -> {stop, {cannot_connect, Node}} diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index ea73efa3e..b5f23d156 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -150,8 +150,8 @@ init([Options]) -> Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB, [ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics], % Create $SYS Topics - [{atomic, _} = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], - [{atomic, _} = create(systop(Topic)) || Topic <- Topics], + [ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], + [ok = create(systop(Topic)) || Topic <- Topics], SysInterval = proplists:get_value(sys_interval, Options, 60), State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, Delay = if diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 90091ee14..b0ec49ed5 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -86,7 +86,8 @@ lookup(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec register(ClientId :: binary(), Pid :: pid()) -> ok. register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> - gen_server:call(?SERVER, {register, ClientId, Pid}). + %%TODO: infinify to block requests when too many clients, this will be redesinged in 0.9.x... + gen_server:call(?SERVER, {register, ClientId, Pid}, infinity). %%------------------------------------------------------------------------------ %% @doc diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index 4fa2433a2..da8d2cf60 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -38,6 +38,8 @@ -export([status/1, broker/1, + stats/1, + metrics/1, cluster/1, listeners/1, bridges/1, @@ -84,12 +86,12 @@ userdel([Username]) -> broker([]) -> Funs = [sysdescr, version, uptime, datetime], - [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs]; + [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs]. -broker(["stats"]) -> - [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_broker:getstats()]; +stats([]) -> + [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_broker:getstats()]. -broker(["metrics"]) -> +metrics([]) -> [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()]. listeners([]) -> diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index ad1b4b85d..3858ec441 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -182,7 +182,7 @@ init([Options]) -> % Init metrics [new_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics - [{atomic, _} = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], + [ok = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], PubInterval = proplists:get_value(pub_interval, Options, 60), Delay = if PubInterval == 0 -> 0; diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 67b841dd8..e77c181e0 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -22,6 +22,8 @@ %%% @doc %%% emqttd core pubsub. %%% +%%% TODO: should not use gen_server:call to create, subscribe topics... +%%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_pubsub). @@ -46,8 +48,8 @@ -export([topics/0, create/1, - subscribe/2, - unsubscribe/2, + subscribe/1, + unsubscribe/1, publish/1, publish/2, %local node @@ -106,8 +108,8 @@ topics() -> %% @end %%------------------------------------------------------------------------------ -spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}. -create(Topic) -> - gen_server:call(?SERVER, {create, Topic}). +create(Topic) when is_binary(Topic) -> + {atomic, ok} = mnesia:transaction(fun trie_add/1, [Topic]), ok. %%------------------------------------------------------------------------------ %% @doc @@ -115,12 +117,25 @@ create(Topic) -> %% %% @end %%------------------------------------------------------------------------------ --spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}. -subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) -> - subscribe([{Topic, Qos}], SubPid); +-spec subscribe({binary(), mqtt_qos()} | list()) -> {ok, list(mqtt_qos())}. +subscribe({Topic, Qos}) when is_binary(Topic) -> + case subscribe([{Topic, Qos}]) of + {ok, [GrantedQos]} -> {ok, GrantedQos}; + {error, Error} -> {error, Error} + end; +subscribe(Topics = [{_Topic, _Qos}|_]) -> + subscribe(Topics, self(), []). -subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> - gen_server:call(?SERVER, {subscribe, Topics, SubPid}). +subscribe([], _SubPid, Acc) -> + {ok, lists:reverse(Acc)}; +%%TODO: check this function later. +subscribe([{Topic, Qos}|Topics], SubPid, Acc) -> + Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}, + F = fun() -> trie_add(Topic), mnesia:write(Subscriber) end, + case mnesia:transaction(F) of + {atomic, ok} -> subscribe(Topics, SubPid, [Qos|Acc]); + Error -> {error, Error} + end. %%------------------------------------------------------------------------------ %% @doc @@ -128,12 +143,27 @@ subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> %% %% @end %%------------------------------------------------------------------------------ --spec unsubscribe(binary() | list(binary()), pid()) -> ok. -unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) -> - unsubscribe([Topic], SubPid); +-spec unsubscribe(binary() | list(binary())) -> ok. +unsubscribe(Topic) when is_binary(Topic) -> + unsubscribe([Topic]); -unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> - gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}). +unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) -> + unsubscribe(Topics, self()). + +%%TODO: check this function later. +unsubscribe(Topics, SubPid) -> + F = fun() -> + Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid), + lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) -> + case lists:member(Topic, Topics) of + true -> mneisa:delete_object(Sub); + false -> ok + end + end, Subscribers) + %TODO: try to remove topic??? if topic is dynamic... + %%try_remove_topic(Topic) + end, + {atomic, _} = mneisa:transaction(F), ok. %%------------------------------------------------------------------------------ %% @doc @@ -164,7 +194,7 @@ publish(Topic, Msg) when is_binary(Topic) -> %%------------------------------------------------------------------------------ -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - Subscribers = ets:lookup(topic_subscriber, Topic), + Subscribers = mnesia:dirty_read(topic_subscriber, Topic), lists:foreach( fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> Msg1 = if @@ -193,72 +223,79 @@ match(Topic) when is_binary(Topic) -> %% ------------------------------------------------------------------ init([]) -> - mnesia:create_table(topic_trie, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, topic_trie)}]), + %% trie and topic tables, will be copied by all nodes. mnesia:create_table(topic_trie_node, [ {ram_copies, [node()]}, {attributes, record_info(fields, topic_trie_node)}]), + mnesia:add_table_copy(topic_trie_node, node(), ram_copies), + mnesia:create_table(topic_trie, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, topic_trie)}]), + mnesia:add_table_copy(topic_trie, node(), ram_copies), mnesia:create_table(topic, [ {type, bag}, {record_name, topic}, {ram_copies, [node()]}, {attributes, record_info(fields, topic)}]), - mnesia:add_table_copy(topic_trie, node(), ram_copies), - mnesia:add_table_copy(topic_trie_node, node(), ram_copies), mnesia:add_table_copy(topic, node(), ram_copies), - ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]), + mnesia:subscribe({table, topic, simple}), + %% local table, not shared with other table + mnesia:create_table(topic_subscriber, [ + {type, bag}, + {record_name, topic_subscriber}, + {ram_copies, [node()]}, + {attributes, record_info(fields, topic_subscriber)}, + {index, [subpid]}, + {local_content, true}]), + mnesia:subscribe({table, topic_subscriber, simple}), {ok, #state{}}. handle_call(getstats, _From, State = #state{max_subs = Max}) -> Stats = [{'topics/count', mnesia:table_info(topic, size)}, - {'subscribers/count', ets:info(topic_subscriber, size)}, + {'subscribers/count', mnesia:info(topic_subscriber, size)}, {'subscribers/max', Max}], {reply, Stats, State}; -handle_call({create, Topic}, _From, State) -> - Result = mnesia:transaction(fun trie_add/1, [Topic]), - {reply, Result, setstats(State)}; - -handle_call({subscribe, Topics, SubPid}, _From, State) -> - Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics], - Reply = - case [Err || Err = {error, _} <- Result] of - [] -> {ok, [Qos || {ok, Qos} <- Result]}; - Errors -> hd(Errors) - end, - {reply, Reply, setstats(State)}; - handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. - -handle_cast({unsubscribe, Topics, SubPid}, State) -> - lists:foreach(fun(Topic) -> - ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}), - try_remove_topic(Topic) - end, Topics), - {noreply, setstats(State)}; + lager:error("Bad Req: ~p", [Req]), + {reply, error, State}. handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. + lager:error("Bad Msg: ~p", [Msg]), + {noreply, State}. -handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) -> - case get({submon, Mon}) of - undefined -> - lager:error("unexpected 'DOWN': ~p", [Mon]); - SubPid -> - erase({submon, Mon}), - erase({subscriber, SubPid}), - Subs = ets:match_object(topic_subscriber, #topic_subscriber{subpid=SubPid, _='_'}), - [ets:delete_object(topic_subscriber, Sub) || Sub <- Subs], - [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs] - end, +%% a new record has been written. +handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State) -> + erlang:monitor(process, Pid), + {noreply, setstats(State)}; + +%% {write, #topic{}, _ActivityId} +%% {delete_object, _OldRecord, _ActivityId} +%% {delete, {Tab, Key}, ActivityId} +handle_info({mnesia_table_event, _Event}, State) -> + {noreply, setstats(State)}; + +handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> + F = fun() -> + %%TODO: how to read with write lock? + [mnesia:delete_object(Sub) || Sub <- mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid)] + %%TODO: try to remove dynamic topics without subscribers + %% [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs] + end, + case catch mnesia:transaction(F) of + {atomic, _} -> ok; + {aborted, Reason} -> lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]) + end, {noreply, setstats(State)}; handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. + lager:error("Bad Info: ~p", [Info]), + {noreply, State}. terminate(_Reason, _State) -> + mnesia:unsubscribe({table, topic, simple}), + mnesia:unsubscribe({table, topic_subscriber, simple}), + %%TODO: clear topics belongs to this node??? ok. code_change(_OldVsn, State, _Extra) -> @@ -267,57 +304,22 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= -subscribe_topic({Topic, Qos}, SubPid) -> - case mnesia:transaction(fun trie_add/1, [Topic]) of - {atomic, _} -> - case get({subscriber, SubPid}) of - undefined -> - %%TODO: refactor later... - MonRef = erlang:monitor(process, SubPid), - put({subcriber, SubPid}, MonRef), - put({submon, MonRef}, SubPid); - _ -> - already_monitored - end, - %% remove duplicated subscribers - try_remove_subscriber({Topic, Qos}, SubPid), - ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}), - %TODO: GrantedQos?? - {ok, Qos}; - {aborted, Reason} -> - {error, Reason} - end. -try_remove_subscriber({Topic, Qos}, SubPid) -> - case ets:lookup(topic_subscriber, Topic) of - [] -> - not_found; - Subs -> - DupSubs = [Sub || Sub = #topic_subscriber{qos = OldQos, subpid = OldPid} - <- Subs, Qos =/= OldQos, OldPid =:= SubPid], - case DupSubs of - [] -> ok; - [DupSub] -> - lager:warning("PubSub: remove duplicated subscriber ~p", [DupSub]), - ets:delete(topic_subscriber, DupSub) - end - end. - -try_remove_topic(Name) when is_binary(Name) -> - case ets:member(topic_subscriber, Name) of - false -> - Topic = emqttd_topic:new(Name), - Fun = fun() -> - mnesia:delete_object(Topic), - case mnesia:read(topic, Name) of - [] -> trie_delete(Name); - _ -> ignore - end - end, - mnesia:transaction(Fun); - true -> - ok - end. +%%try_remove_topic(Name) when is_binary(Name) -> +%% case ets:member(topic_subscriber, Name) of +%% false -> +%% Topic = emqttd_topic:new(Name), +%% Fun = fun() -> +%% mnesia:delete_object(Topic), +%% case mnesia:read(topic, Name) of +%% [] -> trie_delete(Name); +%% _ -> ignore +%% end +%% end, +%% mnesia:transaction(Fun); +%% true -> +%% ok +%% end. trie_add(Topic) when is_binary(Topic) -> mnesia:write(emqttd_topic:new(Topic)), @@ -325,7 +327,7 @@ trie_add(Topic) when is_binary(Topic) -> [TrieNode=#topic_trie_node{topic=undefined}] -> mnesia:write(TrieNode#topic_trie_node{topic=Topic}); [#topic_trie_node{topic=Topic}] -> - {atomic, already_exist}; + ok; [] -> %add trie path [trie_add_path(Triple) || Triple <- emqttd_topic:triples(Topic)], @@ -401,7 +403,7 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) -> setstats(State = #state{max_subs = Max}) -> emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)), - SubCount = ets:info(topic_subscriber, size), + SubCount = mnesia:table_info(topic_subscriber, size), emqttd_broker:setstat('subscribers/count', SubCount), if SubCount > Max -> diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 9754f8bd7..bf9831135 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -185,7 +185,7 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top _ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs]) end, SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), - {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics, self()), + {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), %%TODO: should be gen_event and notification... emqttd_server:subscribe([ Name || {Name, _} <- Topics ], self()), {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; @@ -208,7 +208,7 @@ unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, T BadUnsubs -> lager:warning("~s should not unsubscribe ~p", [ClientId, BadUnsubs]) end, %%unsubscribe from topic tree - ok = emqttd_pubsub:unsubscribe(Topics, self()), + ok = emqttd_pubsub:unsubscribe(Topics), SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics), {ok, SessState#session_state{submap = SubMap1}}; diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index 35c550ca0..2e751b9d4 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -149,8 +149,8 @@ case "$1" in ;; broker) - if [ $# -gt 2 ]; then - echo "Usage: $SCRIPT broker [status | stats | metrics]" + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT broker" exit 1 fi @@ -165,6 +165,41 @@ case "$1" in $NODETOOL rpc emqttd_ctl broker $@ ;; + stats) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT stats" + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl stats $@ + ;; + + metrics) + if [ $# -ne 1 ]; then + echo "Usage: $SCRIPT metrics" + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqttd_ctl metrics $@ + ;; + + bridges) # Make sure the local node IS running RES=`$NODETOOL ping` @@ -223,8 +258,10 @@ case "$1" in *) echo "Usage: $SCRIPT" - echo " status #query status" - echo " broker [stats | metrics] #query broker stats or metrics" + echo " status #query broker status" + echo " broker #query broker version, uptime and description" + echo " stats #query broker statistics of clients, topics, subscribers" + echo " metrics #query broker metrics" echo " cluster [] #query or cluster nodes" echo " plugins list #query loaded plugins" echo " plugins load #load plugin"