diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 7f696e3ce..7432fe3c7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -121,7 +121,7 @@ open_db(DB, CreateOpts) -> -spec add_generation(emqx_ds:db()) -> ok | {error, _}. add_generation(DB) -> Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB), - _ = emqx_ds_proto_v2:add_generation(Nodes, DB), + _ = emqx_ds_proto_v3:add_generation(Nodes, DB), ok. -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. @@ -157,7 +157,7 @@ drop_generation(DB, {Shard, GenId}) -> -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> Nodes = list_nodes(), - _ = emqx_ds_proto_v2:drop_db(Nodes, DB), + _ = emqx_ds_proto_v3:drop_db(Nodes, DB), _ = emqx_ds_replication_layer_meta:drop_db(DB), emqx_ds_builtin_sup:stop_db(DB), ok. @@ -174,7 +174,7 @@ get_streams(DB, TopicFilter, StartTime) -> lists:flatmap( fun(Shard) -> Node = node_of_shard(DB, Shard), - Streams = emqx_ds_proto_v1:get_streams(Node, DB, Shard, TopicFilter, StartTime), + Streams = emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, StartTime), lists:map( fun({RankY, Stream}) -> RankX = Shard, @@ -196,7 +196,7 @@ get_streams(DB, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) -> #{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream, Node = node_of_shard(DB, Shard), - case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of + case emqx_ds_proto_v3:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Err = {error, _} -> @@ -213,7 +213,7 @@ update_iterator(DB, OldIter, DSKey) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, Node = node_of_shard(DB, Shard), case - emqx_ds_proto_v2:update_iterator( + emqx_ds_proto_v3:update_iterator( Node, DB, Shard, @@ -239,7 +239,7 @@ next(DB, Iter0, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic layer. - case emqx_ds_proto_v1:next(Node, DB, Shard, StorageIter0, BatchSize) of + case emqx_ds_proto_v3:next(Node, DB, Shard, StorageIter0, BatchSize) of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 842e8e5ed..8b37b29cb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -172,4 +172,4 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), - erlang:send_after(Interval, self(), flush). + erlang:send_after(Interval, self(), ?flush).