diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index d24999972..c2dfccad6 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,7 +28,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.9"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.10"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index f117fda05..5238328f0 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.15"}, + {vsn, "5.0.16"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 0152c240e..73c86fe04 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -153,6 +153,14 @@ store_retained(_, Msg = #message{topic = Topic}) -> end. clear_expired(_) -> + case mria_rlog:role() of + core -> + clear_expired(); + _ -> + ok + end. + +clear_expired() -> NowMs = erlang:system_time(millisecond), QH = qlc:q([ RetainedMsg @@ -263,12 +271,22 @@ reindex_status() -> do_store_retained(Msg, TopicTokens, ExpiryTime) -> %% Retained message is stored syncronously on all core nodes + %% + %% No transaction, meaning that concurrent writes in the cluster may + %% lead to inconsistent replicas. This could manifest in two clients + %% getting different retained messages for the same topic, depending + %% on which node they are connected to. We tolerate that. ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime), %% Since retained message was stored syncronously on all core nodes, %% now we are sure that %% * either we will write correct indices %% * or if we a replicant with outdated write indices due to reindexing, %% the correct indices will be added by reindexing + %% + %% No transacation as well, meaning that concurrent writes in the cluster + %% may lead to inconsistent index replicas. This essentially allows for + %% inconsistent query results, where index entry has different expiry time + %% than the message it points to. ok = do_store_retained_indices(TopicTokens, ExpiryTime). do_store_retained_message(Msg, TopicTokens, ExpiryTime) -> @@ -281,18 +299,20 @@ do_store_retained_message(Msg, TopicTokens, ExpiryTime) -> do_store_retained_indices(TopicTokens, ExpiryTime) -> Indices = dirty_indices(write), - ok = emqx_retainer_index:foreach_index_key( - fun(Key) -> do_store_retained_index(Key, ExpiryTime) end, - Indices, - TopicTokens - ). + ok = mria:async_dirty(?RETAINER_SHARD, fun() -> + emqx_retainer_index:foreach_index_key( + fun(Key) -> do_store_retained_index(Key, ExpiryTime) end, + Indices, + TopicTokens + ) + end). do_store_retained_index(Key, ExpiryTime) -> RetainedIndex = #retained_index{ key = Key, expiry_time = ExpiryTime }, - mria:dirty_write(?TAB_INDEX, RetainedIndex). + mnesia:write(?TAB_INDEX, RetainedIndex, write). msg_table(SearchTable) -> qlc:q([ diff --git a/changes/ce/perf-11389.en.md b/changes/ce/perf-11389.en.md new file mode 100644 index 000000000..053f7f58f --- /dev/null +++ b/changes/ce/perf-11389.en.md @@ -0,0 +1 @@ +Improved retained message publishing latency by consolidating multiple index update operations into a single mnesia activity, leveraging the new APIs introduced in mria 0.6.0. diff --git a/mix.exs b/mix.exs index c145c4a66..a39288409 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.15.9", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.15.10", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.11", override: true}, diff --git a/rebar.config b/rebar.config index 7a8e0bb02..131149f47 100644 --- a/rebar.config +++ b/rebar.config @@ -62,7 +62,7 @@ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.9"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.10"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}}