diff --git a/apps/emqx_stomp/src/emqx_stomp_frame.erl b/apps/emqx_stomp/src/emqx_stomp_frame.erl index 212242969..fa37c2f64 100644 --- a/apps/emqx_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_stomp/src/emqx_stomp_frame.erl @@ -123,6 +123,8 @@ parse(<<>>, Parser) -> parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); +parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none -> + parse(Phase, Bytes, State); parse(Bytes, Parser = #{pre := Pre}) -> parse(<
>, maps:without([pre], Parser));
@@ -153,6 +155,8 @@ parse(command, <>, State = #parser_state{acc = Acc}) ->
parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
parse(command, <>, State) ->
parse(command, Rest, acc(Ch, State));
+parse(command, <<>>, State) ->
+ {more, #{phase => command, state => State}};
parse(headers, <>, State) ->
parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
@@ -165,11 +169,15 @@ parse(hdname, <>, State = #parser_state{acc = Acc}) ->
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
parse(hdname, <>, State) ->
parse(hdname, Rest, acc(Ch, State));
+parse(hdname, <<>>, State) ->
+ {more, #{phase => hdname, state => State}};
parse(hdvalue, <>, State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
parse(headers, Rest, State#parser_state{headers = add_header(Name, Acc, Headers), hdname = undefined, acc = <<>>});
parse(hdvalue, <>, State) ->
- parse(hdvalue, Rest, acc(Ch, State)).
+ parse(hdvalue, Rest, acc(Ch, State));
+parse(hdvalue, <<>>, State) ->
+ {more, #{phase => hdvalue, state => State}}.
%% @private
parse(body, <<>>, State, Length) ->
diff --git a/apps/emqx_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_stomp/test/emqx_stomp_SUITE.erl
index c8ab88311..e2599ab51 100644
--- a/apps/emqx_stomp/test/emqx_stomp_SUITE.erl
+++ b/apps/emqx_stomp/test/emqx_stomp_SUITE.erl
@@ -17,6 +17,7 @@
-module(emqx_stomp_SUITE).
-include_lib("emqx_stomp/include/emqx_stomp.hrl").
+-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-compile(nowarn_export_all).
@@ -324,6 +325,40 @@ t_ack(_) ->
body = _}, _} = parse(Data4)
end).
+t_1000_msg_send(_) ->
+ with_connection(fun(Sock) ->
+ gen_tcp:send(Sock, serialize(<<"CONNECT">>,
+ [{<<"accept-version">>, ?STOMP_VER},
+ {<<"host">>, <<"127.0.0.1:61613">>},
+ {<<"login">>, <<"guest">>},
+ {<<"passcode">>, <<"guest">>},
+ {<<"heart-beat">>, <<"0,0">>}])),
+ {ok, Data} = gen_tcp:recv(Sock, 0),
+ {ok, #stomp_frame{command = <<"CONNECTED">>,
+ headers = _,
+ body = _}, _} = parse(Data),
+
+ Topic = <<"/queue/foo">>,
+ SendFun = fun() ->
+ gen_tcp:send(Sock, serialize(<<"SEND">>,
+ [{<<"destination">>, Topic}],
+ <<"msgtest">>))
+ end,
+
+ RecvFun = fun() ->
+ receive
+ {deliver, Topic, _Msg}->
+ ok
+ after 100 ->
+ ?assert(false, "waiting message timeout")
+ end
+ end,
+
+ emqx:subscribe(Topic),
+ lists:foreach(fun(_) -> SendFun() end, lists:seq(1, 1000)),
+ lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
+ end).
+
with_connection(DoFun) ->
{ok, Sock} = gen_tcp:connect({127, 0, 0, 1},
61613,