diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 3517d6b73..790e2d477 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -533,7 +533,7 @@ replay_streams(Session0 = #{replay := []}, _ClientInfo) -> %% mechanisms to replay them: pull_now(Session). --spec replay_batch(stream_state(), session(), clientinfo()) -> session(). +-spec replay_batch(stream_state(), session(), clientinfo()) -> session() | emqx_ds:error(_). replay_batch(Srs0, Session0, ClientInfo) -> #srs{batch_size = BatchSize} = Srs0, case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index e6ce5002a..61aa2a8ca 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -35,6 +35,7 @@ -export_type([ badrpc/0, + call_result/1, call_result/0, cast_result/0, multicall_result/1, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 49d7f95a2..1b5f21a11 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -171,13 +171,12 @@ drop_db(DB) -> -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> - case emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of + try emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of ok -> - ok; - Error = {error, _, _} -> - Error; - RPCError = {badrpc, _} -> - {error, recoverable, RPCError} + ok + catch + error:{Reason, _Call} when Reason == timeout; Reason == noproc -> + {error, recoverable, Reason} end. -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl index fcab12507..36a983ce9 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl @@ -64,7 +64,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - {ok, emqx_ds_storage_layer:iterator()} | {error, _}. + emqx_ds:make_iterator_result(). make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v2, [ DB, Shard, Stream, TopicFilter, StartTime @@ -77,9 +77,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:iterator(), pos_integer() ) -> - {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]} - | {ok, end_of_stream} - | {error, _}. + emqx_rpc:call_result(emqx_ds:next_result()). next(Node, DB, Shard, Iter, BatchSize) -> emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). @@ -103,7 +101,7 @@ store_batch(Node, DB, Shard, Batch, Options) -> emqx_ds_storage_layer:iterator(), emqx_ds:message_key() ) -> - {ok, emqx_ds_storage_layer:iterator()} | {error, _}. + emqx_ds:make_iterator_result(). update_iterator(Node, DB, Shard, OldIter, DSKey) -> erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [ DB, Shard, OldIter, DSKey