From 849fe0c2c8f05ee60c1243bb69c08ef96ec8a04d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 25 Mar 2024 13:43:46 +0100 Subject: [PATCH] feat(routing): add schema conflict resolution procedure In the log message printed when a schema conflict in cluster routing is detected. --- apps/emqx/src/emqx_router.erl | 43 +++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 380dddc55..c8d82dda3 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -656,8 +656,8 @@ init_schema() -> ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]), ok = emqx_trie:wait_for_tables(), ConfSchema = emqx_config:get([broker, routing, storage_schema]), - ClusterSchema = discover_cluster_schema_vsn(), - Schema = choose_schema_vsn(ConfSchema, ClusterSchema), + ClusterState = discover_cluster_schema_vsn(), + Schema = choose_schema_vsn(ConfSchema, ClusterState), ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), case Schema of ConfSchema -> @@ -681,7 +681,8 @@ deinit_schema() -> _ = persistent_term:erase(?PT_SCHEMA_VSN), ok. --spec discover_cluster_schema_vsn() -> schemavsn() | undefined. +-spec discover_cluster_schema_vsn() -> + {schemavsn() | undefined, _State :: [{node(), schemavsn() | undefined, _Details}]}. discover_cluster_schema_vsn() -> discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]). @@ -723,13 +724,17 @@ discover_cluster_schema_vsn(Nodes) -> "There are nodes in the cluster with different configured routing " "storage schemas. This probably means that some nodes use v1 schema " "and some use v2, independently of each other. The routing is likely " - "broken. Manual intervention required." + "broken. Manual intervention required.", + action => mk_conflict_resolution_action(Responses) }), error(conflicting_routing_schemas_configured_in_cluster) end. --spec choose_schema_vsn(schemavsn(), schemavsn() | undefined) -> schemavsn(). -choose_schema_vsn(ConfSchema, ClusterSchema) -> +-spec choose_schema_vsn( + schemavsn(), + _ClusterState :: {schemavsn() | undefined, [{node(), schemavsn() | undefined, _Details}]} +) -> schemavsn(). +choose_schema_vsn(ConfSchema, {ClusterSchema, State}) -> case detect_table_schema_vsn() of [ClusterSchema] -> %% Table contents match configured schema in the cluster. @@ -753,7 +758,8 @@ choose_schema_vsn(ConfSchema, ClusterSchema) -> "by the cluster. This probably means that some nodes in the cluster " "use v1 schema and some use v2, independently of each other. The " "routing is likely broken. Manual intervention and full cluster " - "restart is required. This node will shut down." + "restart is required. This node will shut down.", + action => mk_conflict_resolution_action(State) }), error(conflicting_routing_schemas_detected_in_cluster) end. @@ -767,6 +773,29 @@ detect_table_schema_vsn() -> is_empty(Tab) -> ets:first(Tab) =:= '$end_of_table'. +mk_conflict_resolution_action(State) -> + NodesV1 = [Node || {Node, v1, _} <- State], + NodesUnknown = [Node || {Node, unknown, _} <- State], + Format = + "Following EMQX nodes are running with conflicting schema:" + "\n ~p" + "Please take the following steps to resolve the conflict:" + "\n 1. Stop listeners on those nodes: `$ emqx eval 'emqx_listener:stop()'`" + "\n 2. Wait until they are safe to restart." + "\n This could take some time, depending on the number of clients and their subscriptions." + "\n Those conditions should be true for each of the nodes in order to proceed:" + "\n * `$ emqx eval 'ets:info(emqx_subscriber, size)'` prints `0`." + "\n * `$ emqx ctl topics list` prints `No topics.`" + "\n 3. Upgrade the nodes to the latest version." + "\n 4. Restart the nodes.", + FormatUnkown = + "Additionally, following nodes were unreachable during startup:" + "\n ~p" + "It's strongly advised to include them in the manual resolution procedure as well.", + Message = io_lib:format(Format, [NodesV1]), + MessageUnknown = [io_lib:format(FormatUnkown, [NodesUnknown]) || NodesUnknown =/= []], + unicode:characters_to_list(Message ++ "\n" ++ MessageUnknown). + %%-------------------------------------------------------------------- %% gen_server callbacks %%--------------------------------------------------------------------