From 5f851058015e7b783e49f4978f1146ab63e2436d Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 15 Feb 2024 11:54:35 +0100 Subject: [PATCH] feat(sessds): Specialize the interval queue for positive numbers --- .../emqx_persistent_session_ds_inflight.erl | 49 +++++++++---------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl index 349713bf6..21194c8c2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl @@ -211,17 +211,17 @@ pubrec(SeqNo, Rec = #inflight{pubrec_queue = Q0}) -> %%%% Interval queue: %% "Interval queue": a data structure that represents a queue of -%% monotonically increasing integers in a compact manner. It is -%% functionally equivalent to a `queue:queue(integer())'. +%% monotonically increasing non-negative integers in a compact manner. +%% It is functionally equivalent to a `queue:queue(integer())'. -record(iqueue, { %% Head interval: - head :: integer() | undefined, - head_end :: integer() | undefined, + head = 0 :: integer(), + head_end = 0 :: integer(), %% Intermediate ranges: queue :: queue:queue({integer(), integer()}), %% End interval: - tail :: integer() | undefined, - tail_end :: integer() | undefined + tail = 0 :: integer(), + tail_end = 0 :: integer() }). -type iqueue() :: #iqueue{}. @@ -233,17 +233,20 @@ iqueue_new() -> %% @doc Push a value into the interval queue: -spec ipush(integer(), iqueue()) -> iqueue(). -ipush(Val, Q = #iqueue{tail = undefined, tail_end = undefined}) -> +ipush(Val, Q = #iqueue{tail_end = Val, head_end = Val}) -> + %% Optimization: head and tail intervals overlap, and the newly + %% inserted value extends both. Attach it to both intervals, to + %% avoid `queue:out' in `ipop': Q#iqueue{ - tail = Val, - tail_end = Val + 1 + tail_end = Val + 1, + head_end = Val + 1 }; ipush(Val, Q = #iqueue{tail_end = Val}) -> %% Extend tail interval: Q#iqueue{ tail_end = Val + 1 }; -ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when Val > End -> +ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when is_number(Val), Val > End -> IQ = queue:in({Tl, End}, IQ0), %% Begin a new interval: Q#iqueue{ @@ -253,28 +256,24 @@ ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when Val > End - }. -spec ipop(iqueue()) -> {{value, integer()}, iqueue()} | {empty, iqueue()}. -ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when is_number(HdEnd), Hd < HdEnd -> +ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when Hd < HdEnd -> + %% Head interval is not empty. Consume a value from it: {{value, Hd}, Q#iqueue{head = Hd + 1}}; +ipop(Q = #iqueue{head_end = End, tail_end = End}) -> + %% Head interval is fully consumed, and it's overlaps with the + %% tail interval. It means the queue is empty: + {empty, Q}; ipop(Q = #iqueue{head = Hd0, tail = Tl, tail_end = TlEnd, queue = IQ0}) -> + %% Head interval is fully consumed, and it doesn't overlap with + %% the tail interval. Replace the head interval with the next + %% interval from the queue or with the tail interval: case queue:out(IQ0) of {{value, {Hd, HdEnd}}, IQ} -> - ipop(Q#iqueue{head = nmax(Hd0, Hd), head_end = HdEnd, queue = IQ}); + ipop(Q#iqueue{head = max(Hd0, Hd), head_end = HdEnd, queue = IQ}); {empty, _} -> - do_ipop(Q#iqueue{head = nmax(Hd0, Tl), head_end = TlEnd}) + ipop(Q#iqueue{head = max(Hd0, Tl), head_end = TlEnd}) end. -do_ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when is_number(HdEnd), Hd < HdEnd -> - {{value, Hd}, Q#iqueue{head = Hd + 1}}; -do_ipop(Q) -> - {empty, Q}. - -nmax(undefined, N) -> - N; -nmax(N, undefined) -> - N; -nmax(N, M) -> - max(N, M). - -ifdef(TEST). %% Test that behavior of iqueue is identical to that of a regular queue of integers: