diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 195db7c34..ef1600500 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -232,7 +232,8 @@ shard_replication_spec(DB, Shard, Opts) -> #{ id => {Shard, replication}, start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]}, - restart => transient, + shutdown => 10_000, + restart => permanent, type => worker }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index f4c0d3b01..e0e70596a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -189,22 +189,9 @@ add_local_server(DB, Shard) -> -spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | emqx_ds:error(_Reason). drop_local_server(DB, Shard) -> - ShardServers = shard_servers(DB, Shard), + %% NOTE: Timeouts are ignored, it's a best effort attempt. + _ = prep_stop_server(DB, Shard), LocalServer = local_server(DB, Shard), - case lookup_leader(DB, Shard) of - LocalServer -> - %% NOTE - %% Trigger leadership transfer *and* force to wait until the new leader - %% is elected and updated in the leaderboard. This should help to avoid - %% edge cases where entries appended right before removal are duplicated - %% due to client retries. - %% Timeouts are ignored, it's a best effort attempt. - [Candidate | _] = lists:delete(LocalServer, ShardServers), - _ = ra:transfer_leadership(LocalServer, Candidate), - _ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end); - _Another -> - ok - end, case remove_server(DB, Shard, LocalServer) of ok -> ra:force_delete_server(DB, LocalServer); @@ -300,7 +287,7 @@ ra_overview(Server) -> init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - ok = start_shard(DB, Shard, Opts), + ok = start_server(DB, Shard, Opts), {ok, {DB, Shard}}. handle_call(_Call, _From, State) -> @@ -310,18 +297,18 @@ handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, {DB, Shard}) -> + %% NOTE: Timeouts are ignored, it's a best effort attempt. + catch prep_stop_server(DB, Shard), LocalServer = get_local_server(DB, Shard), ok = ra:stop_server(DB, LocalServer). %% -start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> +start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), case ra:restart_server(DB, LocalServer) of - ok -> - Bootstrap = false; {error, name_not_registered} -> Bootstrap = true, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, @@ -339,31 +326,27 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> initial_members => Servers, machine => Machine, log_init_args => LogOpts - }) + }); + ok -> + Bootstrap = false; + {error, {already_started, _}} -> + Bootstrap = false end, - case Servers of - [LocalServer | _] -> - %% TODO - %% Not super robust, but we probably don't expect nodes to be down - %% when we bring up a fresh consensus group. Triggering election - %% is not really required otherwise. - %% TODO - %% Ensure that doing that on node restart does not disrupt consensus. - %% Edit: looks like it doesn't, this could actually be quite useful - %% to "steal" leadership from nodes that have too much leader load. - %% TODO - %% It doesn't really work that way. There's `ra:transfer_leadership/2` - %% for that. - try - ra:trigger_election(LocalServer, _Timeout = 1_000) - catch - %% TODO - %% Tolerating exceptions because server might be occupied with log - %% replay for a while. - exit:{timeout, _} when not Bootstrap -> - ok - end; - _ -> + %% NOTE + %% Triggering election is necessary when a new consensus group is being brought up. + %% TODO + %% It's probably a good idea to rebalance leaders across the cluster from time to + %% time. There's `ra:transfer_leadership/2` for that. + try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of + false -> + ok; + ok -> + ok + catch + %% TODO + %% Tolerating exceptions because server might be occupied with log replay for + %% a while. + exit:{timeout, _} when not Bootstrap -> ok end. @@ -379,6 +362,29 @@ server_uid(_DB, Shard) -> %% +prep_stop_server(DB, Shard) -> + prep_stop_server(DB, Shard, 5_000). + +prep_stop_server(DB, Shard, Timeout) -> + LocalServer = get_local_server(DB, Shard), + Candidates = lists:delete(LocalServer, shard_servers(DB, Shard)), + case lookup_leader(DB, Shard) of + LocalServer when Candidates =/= [] -> + %% NOTE + %% Trigger leadership transfer *and* force to wait until the new leader + %% is elected and updated in the leaderboard. This should help to avoid + %% edge cases where entries appended right before removal are duplicated + %% due to client retries. + %% TODO: Candidate may be offline. + [Candidate | _] = Candidates, + _ = ra:transfer_leadership(LocalServer, Candidate), + wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end, Timeout); + _Another -> + ok + end. + +%% + memoize(Fun, Args) -> %% NOTE: Assuming that the function is pure and never returns `undefined`. case persistent_term:get([Fun | Args], undefined) of @@ -390,8 +396,8 @@ memoize(Fun, Args) -> Result end. -wait_until(Fun) -> - wait_until(Fun, 5_000, 250). +wait_until(Fun, Timeout) -> + wait_until(Fun, Timeout, 100). wait_until(Fun, Timeout, Sleep) -> Deadline = erlang:monotonic_time(millisecond) + Timeout, diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 9fc55d170..3b0e37c7f 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -435,8 +435,8 @@ t_rebalance_offline_restarts(Config) -> erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) ), ?retry( - 500, - 10, + 1000, + 5, ?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes]) ),