diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 49e650496..2062982b8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -437,8 +437,7 @@ ra_start_shard(DB, Shard) -> ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, - ?batch_messages => Messages, - ?timestamp => emqx_ds:timestamp_us() + ?batch_messages => Messages }, case ra:process_command(ra_leader_servers(DB, Shard), Command) of {ok, Result, _Leader} -> @@ -534,8 +533,7 @@ apply( #{index := RaftIdx}, #{ ?tag := ?BATCH, - ?batch_messages := MessagesIn, - ?timestamp := TimestampLocal + ?batch_messages := MessagesIn }, #{latest := Latest} = State ) -> @@ -543,18 +541,30 @@ apply( %% Unique timestamp tracking real time closely. %% With microsecond granularity it should be nearly impossible for it to run %% too far ahead than the real time clock. - Timestamp = max(Latest + 1, TimestampLocal), - Messages = assign_timestamps(Timestamp, MessagesIn), + {NLatest, Messages} = assign_timestamps(Latest, MessagesIn), + %% TODO + %% Batch is now reversed, but it should not make a lot of difference. + %% Even if it would be in order, it's still possible to write messages far away + %% in the past, i.e. when replica catches up with the leader. Storage layer + %% currently relies on wall clock time to decide if it's safe to iterate over + %% next epoch, this is likely wrong. Ideally it should rely on consensus clock + %% time instead. Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}), - %% NOTE: Last assigned timestamp. - NLatest = Timestamp + length(Messages) - 1, NState = State#{latest := NLatest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, {NState, Result, Effect}. -assign_timestamps(Timestamp, [MessageIn | Rest]) -> - Message = emqx_message:set_timestamp(Timestamp, MessageIn), - [Message | assign_timestamps(Timestamp + 1, Rest)]; -assign_timestamps(_Timestamp, []) -> - []. +assign_timestamps(Latest, Messages) -> + assign_timestamps(Latest, Messages, []). + +assign_timestamps(Latest, [MessageIn | Rest], Acc) -> + case emqx_message:timestamp(MessageIn) of + Later when Later > Latest -> + assign_timestamps(Later, Rest, [MessageIn | Acc]); + _Earlier -> + Message = emqx_message:set_timestamp(Latest + 1, MessageIn), + assign_timestamps(Latest + 1, Rest, [Message | Acc]) + end; +assign_timestamps(Latest, [], Acc) -> + {Latest, Acc}. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index a226e5350..9b24f63ae 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -64,7 +64,8 @@ start_link(DB, Shard) -> store_batch(DB, Messages, Opts) -> Sync = maps:get(sync, Opts, true), lists:foreach( - fun(Message) -> + fun(MessageIn) -> + Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn), Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync}) end, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index c958e56dc..26a79cbc5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -93,8 +93,7 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> lists:foreach( fun(Msg) -> - Id = erlang:unique_integer([monotonic]), - Key = <>, + Key = <<(emqx_message:timestamp(Msg)):64>>, Val = term_to_binary(Msg), rocksdb:put(DB, CF, Key, Val, []) end,