diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index c7fa3552b..4143c9ffd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -68,6 +68,8 @@ make_iterator_result/1, make_iterator_result/0, make_delete_iterator_result/1, make_delete_iterator_result/0, + error/1, + ds_specific_stream/0, ds_specific_iterator/0, ds_specific_generation_rank/0, @@ -118,14 +120,14 @@ -type message_key() :: binary(). --type store_batch_result() :: ok | {error, _}. +-type store_batch_result() :: ok | error(_). --type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}. +-type make_iterator_result(Iterator) :: {ok, Iterator} | error(_). -type make_iterator_result() :: make_iterator_result(iterator()). -type next_result(Iterator) :: - {ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | {error, _}. + {ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | error(_). -type next_result() :: next_result(iterator()). @@ -142,6 +144,8 @@ -type delete_next_result() :: delete_next_result(delete_iterator()). +-type error(Reason) :: {error, recoverable | unrecoverable, Reason}. + %% Timestamp %% Earliest possible timestamp is 0. %% TODO granularity? Currently, we should always use milliseconds, as that's the unit we diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index ed3a93212..49d7f95a2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -171,7 +171,14 @@ drop_db(DB) -> -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> - emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts). + case emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of + ok -> + ok; + Error = {error, _, _} -> + Error; + RPCError = {badrpc, _} -> + {error, recoverable, RPCError} + end. -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. @@ -180,7 +187,14 @@ get_streams(DB, TopicFilter, StartTime) -> lists:flatmap( fun(Shard) -> Node = node_of_shard(DB, Shard), - Streams = emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime), + Streams = + try + emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime) + catch + error:{erpc, _} -> + %% TODO: log? + [] + end, lists:map( fun({RankY, StorageLayerStream}) -> RankX = Shard, @@ -198,35 +212,29 @@ get_streams(DB, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) -> ?stream_v2(Shard, StorageStream) = Stream, Node = node_of_shard(DB, Shard), - case emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of + try emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; - Err = {error, _} -> - Err + Error = {error, _, _} -> + Error + catch + error:RPCError = {erpc, _} -> + {error, recoverable, RPCError} end. --spec update_iterator( - emqx_ds:db(), - iterator(), - emqx_ds:message_key() -) -> +-spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) -> emqx_ds:make_iterator_result(iterator()). update_iterator(DB, OldIter, DSKey) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, Node = node_of_shard(DB, Shard), - case - emqx_ds_proto_v4:update_iterator( - Node, - DB, - Shard, - StorageIter, - DSKey - ) - of + try emqx_ds_proto_v4:update_iterator(Node, DB, Shard, StorageIter, DSKey) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; - Err = {error, _} -> - Err + Error = {error, _, _} -> + Error + catch + error:RPCError = {erpc, _} -> + {error, recoverable, RPCError} end. -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). @@ -245,8 +253,12 @@ next(DB, Iter0, BatchSize) -> {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; - Other -> - Other + Ok = {ok, _} -> + Ok; + Error = {error, _, _} -> + Error; + RPCError = {badrpc, _} -> + {error, recoverable, RPCError} end. -spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). @@ -337,7 +349,7 @@ do_get_streams_v2(DB, Shard, TopicFilter, StartTime) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - {ok, emqx_ds_storage_layer:iterator()} | {error, _}. + emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> error(obsolete_api). @@ -348,7 +360,7 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - {ok, emqx_ds_storage_layer:iterator()} | {error, _}. + emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d265d8fec..8de64e313 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -230,7 +230,7 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomic := true}) -> +store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> {ok, Batch} = rocksdb:batch(), lists:foreach( fun(Msg) -> @@ -240,18 +240,17 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomi end, Messages ), - Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), + Result = rocksdb:write_batch(DB, Batch, []), rocksdb:release_batch(Batch), - Res; -store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> - lists:foreach( - fun(Msg) -> - {Key, _} = make_key(S, Msg), - Val = serialize(Msg), - rocksdb:put(DB, Data, Key, Val, []) - end, - Messages - ). + %% NOTE + %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to + %% observe until there's `{no_slowdown, true}` in write options. + case Result of + ok -> + ok; + {error, {error, Reason}} -> + {error, unrecoverable, {rocksdb, Reason}} + end. -spec get_streams( emqx_ds_storage_layer:shard_id(), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index e0bf1fa1b..7cb1b67f7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -256,12 +256,10 @@ make_iterator( Err end; {error, not_found} -> - {error, end_of_stream} + {error, unrecoverable, generation_not_found} end. --spec update_iterator( - shard_id(), iterator(), emqx_ds:message_key() -) -> +-spec update_iterator(shard_id(), iterator(), emqx_ds:message_key()) -> emqx_ds:make_iterator_result(iterator()). update_iterator( Shard, @@ -281,7 +279,7 @@ update_iterator( Err end; {error, not_found} -> - {error, end_of_stream} + {error, unrecoverable, generation_not_found} end. -spec next(shard_id(), iterator(), pos_integer()) -> @@ -298,12 +296,12 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch {ok, end_of_stream}; {ok, GenIter, Batch} -> {ok, Iter#{?enc := GenIter}, Batch}; - Error = {error, _} -> + Error = {error, _, _} -> Error end; {error, not_found} -> %% generation was possibly dropped by GC - {ok, end_of_stream} + {error, unrecoverable, generation_not_found} end. -spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index a0dae0e6f..b491657b0 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -404,7 +404,10 @@ t_drop_generation_with_never_used_iterator(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), - ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter0, 1)), + ?assertMatch( + {error, unrecoverable, generation_not_found, []}, + iterate(DB, Iter0, 1) + ), %% New iterator for the new stream will only see the later messages. [{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), @@ -453,9 +456,10 @@ t_drop_generation_with_used_once_iterator(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), - ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter1, 1)), - - ok. + ?assertMatch( + {error, unrecoverable, generation_not_found, []}, + iterate(DB, Iter1, 1) + ). t_drop_generation_update_iterator(_Config) -> %% This checks the behavior of `emqx_ds:update_iterator' after the generation @@ -481,9 +485,10 @@ t_drop_generation_update_iterator(_Config) -> ok = emqx_ds:add_generation(DB), ok = emqx_ds:drop_generation(DB, GenId0), - ?assertEqual({error, end_of_stream}, emqx_ds:update_iterator(DB, Iter1, Key2)), - - ok. + ?assertEqual( + {error, unrecoverable, generation_not_found}, + emqx_ds:update_iterator(DB, Iter1, Key2) + ). t_make_iterator_stale_stream(_Config) -> %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying @@ -507,7 +512,7 @@ t_make_iterator_stale_stream(_Config) -> ok = emqx_ds:drop_generation(DB, GenId0), ?assertEqual( - {error, end_of_stream}, + {error, unrecoverable, generation_not_found}, emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime) ), @@ -605,8 +610,8 @@ iterate(DB, It0, BatchSize, Acc) -> iterate(DB, It, BatchSize, Acc ++ Msgs); {ok, end_of_stream} -> {ok, end_of_stream, Acc}; - Ret -> - Ret + {error, Class, Reason} -> + {error, Class, Reason, Acc} end. %% CT callbacks