From aecda09b9abe7a6cce73a4b8ced2f151b5749c90 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 19 Oct 2019 20:18:34 +0800 Subject: [PATCH] Add more test cases --- src/emqx_session.erl | 48 +++++---- test/emqx_session_SUITE.erl | 189 ++++++++++++++++++++++++++++++++---- 2 files changed, 203 insertions(+), 34 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 2b99d888b..67ac6e3ce 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -100,27 +100,25 @@ max_subscriptions :: non_neg_integer(), %% Upgrade QoS? upgrade_qos :: boolean(), - %% Client <- Broker: - %% Inflight QoS1, QoS2 messages sent to the client but unacked. + %% Client <- Broker: QoS1/2 messages sent to the client but unacked. inflight :: emqx_inflight:inflight(), - %% All QoS1, QoS2 messages published to when client is disconnected. - %% QoS 1 and QoS 2 messages pending transmission to the Client. + %% All QoS1/2 messages published to when client is disconnected, + %% or QoS1/2 messages pending transmission to the Client. %% - %% Optionally, QoS 0 messages pending transmission to the Client. + %% Optionally, QoS0 messages pending transmission to the Client. mqueue :: emqx_mqueue:mqueue(), %% Next packet id of the session next_pkt_id = 1 :: emqx_types:packet_id(), %% Retry interval for redelivering QoS1/2 messages retry_interval :: timeout(), - %% Client -> Broker: - %% Inflight QoS2 messages received from client and waiting for pubrel. + %% Client -> Broker: QoS2 messages received from client and waiting for pubrel. awaiting_rel :: map(), %% Max Packets Awaiting PUBREL max_awaiting_rel :: non_neg_integer(), %% Awaiting PUBREL Timeout await_rel_timeout :: timeout(), - %% Enqueue Count - enqueue_cnt :: non_neg_integer(), + %% Deliver Stats + deliver_stats :: emqx_types:stats(), %% Created at created_at :: pos_integer() }). @@ -131,7 +129,9 @@ -define(DEFAULT_BATCH_N, 1000). --define(ATTR_KEYS, [inflight_max, +-define(ATTR_KEYS, [inflight_cnt, + inflight_max, + mqueue_len, mqueue_max, retry_interval, awaiting_rel_max, @@ -168,7 +168,7 @@ ]). %%-------------------------------------------------------------------- -%% Init a session +%% Init a Session %%-------------------------------------------------------------------- %% @doc Init a session. @@ -184,10 +184,10 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> awaiting_rel = #{}, max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000), - enqueue_cnt = 0, created_at = erlang:system_time(second) }. +%% @private init mq init_mqueue(Zone) -> emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000), store_qos0 => get_env(Zone, mqueue_store_qos0, true), @@ -220,6 +220,8 @@ info(subscriptions_max, #session{max_subscriptions = MaxSubs}) -> info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) -> UpgradeQoS; info(inflight, #session{inflight = Inflight}) -> + Inflight; +info(inflight_cnt, #session{inflight = Inflight}) -> emqx_inflight:size(Inflight); info(inflight_max, #session{inflight = Inflight}) -> emqx_inflight:max_size(Inflight); @@ -234,13 +236,15 @@ info(mqueue_dropped, #session{mqueue = MQueue}) -> info(next_pkt_id, #session{next_pkt_id = PacketId}) -> PacketId; info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) -> - maps:size(AwaitingRel); + maps:values(AwaitingRel); +info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) -> + AwaitingRel; info(awaiting_rel_max, #session{max_awaiting_rel = MaxAwaitingRel}) -> MaxAwaitingRel; info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; -info(enqueue_cnt, #session{enqueue_cnt = Cnt}) -> - Cnt; +info(deliver_stats, #session{deliver_stats = Stats}) -> + Stats; info(created_at, #session{created_at = CreatedAt}) -> CreatedAt. @@ -506,7 +510,7 @@ enqueue(Delivers, Session = #session{subscriptions = Subs}) when is_list(Deliver || {deliver, Topic, Msg} <- Delivers], lists:foldl(fun enqueue/2, Session, Msgs); -enqueue(Msg, Session = #session{mqueue = Q, enqueue_cnt = Cnt}) +enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), if is_record(Dropped, message) -> @@ -514,7 +518,7 @@ enqueue(Msg, Session = #session{mqueue = Q, enqueue_cnt = Cnt}) [emqx_message:format(Dropped)]); true -> ok end, - Session#session{mqueue = NewQ, enqueue_cnt = Cnt+1}. + inc_deliver_stats(enqueue_cnt, Session#session{mqueue = NewQ}). %%-------------------------------------------------------------------- %% Awaiting ACK for QoS1/QoS2 Messages @@ -638,3 +642,13 @@ next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) -> next_pkt_id(Session = #session{next_pkt_id = Id}) -> Session#session{next_pkt_id = Id + 1}. +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +inc_deliver_stats(Key, Session) -> + inc_deliver_stats(Key, 1, Session). +inc_deliver_stats(Key, I, Session = #session{deliver_stats = Stats}) -> + NStats = maps:update_with(Key, fun(V) -> V+I end, I, Stats), + Session#session{deliver_stats = NStats}. + diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index dfa5da614..1fb41576f 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -23,22 +23,73 @@ -include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). +-import(emqx_session, [set_field/3]). + all() -> emqx_ct:all(?MODULE). -t_session_init(_) -> - error('TODO'). +%%-------------------------------------------------------------------- +%% CT callbacks +%%-------------------------------------------------------------------- + +init_per_testcase(_TestCase, Config) -> + %% Meck Broker + ok = meck:new(emqx_broker, [passthrough, no_history]), + ok = meck:new(emqx_hooks, [passthrough, no_history]), + ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end), + Config. + +end_per_testcase(_TestCase, Config) -> + ok = meck:unload(emqx_broker), + ok = meck:unload(emqx_hooks), + Config. %%-------------------------------------------------------------------- -%% Test cases for info/stats +%% Test cases for session init +%%-------------------------------------------------------------------- + +t_session_init(_) -> + Session = emqx_session:init(#{zone => external}, #{receive_maximum => 64}), + ?assertEqual(#{}, emqx_session:info(subscriptions, Session)), + ?assertEqual(0, emqx_session:info(subscriptions_cnt, Session)), + ?assertEqual(0, emqx_session:info(subscriptions_max, Session)), + ?assertEqual(false, emqx_session:info(upgrade_qos, Session)), + ?assertEqual(0, emqx_session:info(inflight_cnt, Session)), + ?assertEqual(64, emqx_session:info(inflight_max, Session)), + ?assertEqual(1, emqx_session:info(next_pkt_id, Session)), + ?assertEqual(0, emqx_session:info(retry_interval, Session)), + ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)), + ?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)), + ?assertEqual(3600000, emqx_session:info(awaiting_rel_timeout, Session)), + ?assert(is_integer(emqx_session:info(created_at, Session))). + +%%-------------------------------------------------------------------- +%% Test cases for session info/stats %%-------------------------------------------------------------------- t_session_info(_) -> - error('TODO'). + ?assertMatch(#{subscriptions := #{}, + subscriptions_max := 0, + upgrade_qos := false, + inflight := 0, + inflight_max := 64, + retry_interval := 0, + mqueue_len := 0, + mqueue_max := 1000, + mqueue_dropped := 0, + next_pkt_id := 1, + awaiting_rel := 0, + awaiting_rel_max := 0, + await_rel_timeout := 3600000 + }, emqx_session:info(session())). t_session_attrs(_) -> + Attrs = emqx_session:attrs(session()), + io:format("~p~n", [Attrs]), error('TODO'). t_session_stats(_) -> + Stats = emqx_session:stats(session()), + io:format("~p~n", [Stats]), error('TODO'). %%-------------------------------------------------------------------- @@ -46,36 +97,115 @@ t_session_stats(_) -> %%-------------------------------------------------------------------- t_subscribe(_) -> - error('TODO'). + ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), + ok = meck:expect(emqx_broker, set_subopts, fun(_, _) -> ok end), + {ok, Session} = emqx_session:subscribe( + clientinfo(), <<"#">>, subopts(), session()), + ?assertEqual(1, emqx_session:info(subscriptions_cnt, Session)). + +t_is_subscriptions_full_false(_) -> + Session = session(#{max_subscriptions => 0}), + ?assertNot(emqx_session:is_subscriptions_full(Session)). + +t_is_subscriptions_full_true(_) -> + Session = session(#{max_subscriptions => 1}), + ?assertNot(emqx_session:is_subscriptions_full(Session)), + Subs = #{<<"t1">> => subopts(), <<"t2">> => subopts()}, + NSession = set_field(subscriptions, Subs, Session), + ?assert(emqx_session:is_subscriptions_full(NSession)). t_unsubscribe(_) -> - error('TODO'). - -t_publish_qos0(_) -> - error('TODO'). - -t_publish_qos1(_) -> - error('TODO'). + ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end), + Session = session(#{subscriptions => #{<<"#">> => subopts()}}), + {ok, NSession} = emqx_session:unsubscribe(clientinfo(), <<"#">>, Session), + Error = emqx_session:unsubscribe(clientinfo(), <<"#">>, NSession), + ?assertEqual({error, ?RC_NO_SUBSCRIPTION_EXISTED}, Error). t_publish_qos2(_) -> - error('TODO'). + ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), + Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<"payload">>), + {ok, [], Session} = emqx_session:publish(1, Msg, session()), + ?assertEqual(awaiting_rel_cnt, emqx_session:info(awaiting_rel_cnt, Session)). + +t_publish_qos1(_) -> + ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), + Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>), + {ok, [], Session} = emqx_session:publish(1, Msg, session()). + +t_publish_qos0(_) -> + ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), + Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>), + {ok, [], Session} = emqx_session:publish(0, Msg, session()). + +t_is_awaiting_full_false(_) -> + ?assertNot(emqx_session:is_awaiting_full(session(#{max_awaiting_rel => 0}))). + +t_is_awaiting_full_true(_) -> + Session = session(#{max_awaiting_rel => 1, + awaiting_rel => #{1 => 1} + }), + ?assert(emqx_session:is_awaiting_full(Session)). t_puback(_) -> - error('TODO'). + Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>), + Inflight = emqx_inflight:insert(1, {Msg, os:timestamp()}, emqx_inflight:new()), + Session = set_field(inflight, Inflight, session()), + {ok, Msg, NSession} = emqx_session:puback(1, Session), + ?assertEqual([], emqx_session:info(inflight, NSession)). + +t_puback_error_packet_id_in_use(_) -> + Inflight = emqx_inflight:insert(1, {pubrel, os:timestamp()}, emqx_inflight:new()), + Session = set_field(inflight, Inflight, session()), + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session). + +t_puback_error_packet_id_not_found(_) -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()). t_pubrec(_) -> - error('TODO'). + Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), + Inflight = emqx_inflight:insert(2, {Msg, os:timestamp()}, emqx_inflight:new()), + Session = set_field(inflight, Inflight, session()), + {ok, Msg, NSession} = emqx_session:pubrec(2, Session), + ?assertMatch([{pubrel, _}], emqx_session:info(inflight, NSession)). + +t_pubrec_error_packet_id_in_use(_) -> + Inflight = emqx_inflight:insert(1, {pubrel, ts()}, emqx_inflight:new()), + Session = set_field(inflight, Inflight, session()), + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, session()). + +t_pubrec_error_packet_id_not_found(_) -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()). t_pubrel(_) -> - error('TODO'). + Session = set_field(awaiting_rel, #{1 => os:timestamp()}, session()), + {ok, NSession} = emqx_session:pubrel(1, Session), + ?assertEqual(#{}, emqx_session:info(awaiting_rel, NSession)). + +t_pubrel_id_not_found(_) -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()). t_pubcomp(_) -> - error('TODO'). + Inflight = emqx_inflight:insert(2, {pubrel, os:timestamp()}, emqx_inflight:new()), + Session = emqx_session:set_field(inflight, Inflight, session()), + {ok, NSession} = emqx_session:pubcomp(2, Session), + ?assertEqual([], emqx_session:info(inflight, NSession)). + +t_pubcomp_id_not_found(_) -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(2, session()). %%-------------------------------------------------------------------- %% Test cases for deliver/retry %%-------------------------------------------------------------------- +t_dequeue(_) -> + {ok, Session} = emqx_session:dequeue(session()). + +t_bach_n(_) -> + error('TODO'). + +t_dequeue_with_msgs(_) -> + error('TODO'). + t_deliver(_) -> error('TODO'). @@ -101,3 +231,28 @@ t_redeliver(_) -> t_expire(_) -> error('TODO'). +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +session() -> session(#{}). +session(InitFields) when is_map(InitFields) -> + maps:fold(fun(Field, Value, Session) -> + emqx_session:set_field(Field, Value, Session) + end, + emqx_session:init(#{zone => zone}, #{receive_maximum => 0}), + InitFields). + + +clientinfo() -> clientinfo(#{}). +clientinfo(Init) -> + maps:merge(#{clientid => <<"clientid">>, + username => <<"username">> + }, Init). + +subopts() -> subopts(#{}). +subopts(Init) -> + maps:merge(?DEFAULT_SUBOPTS, Init). + +ts() -> erlang:system_time(second). +