Merge pull request #6604 from terry-xiaoyu/bridge_bug_fixes_4
refactor(resource): improve the process starting/stopping resource instances
This commit is contained in:
commit
2259f3ba64
|
|
@ -979,7 +979,7 @@ authenticator_examples() ->
|
||||||
mechanism => <<"password-based">>,
|
mechanism => <<"password-based">>,
|
||||||
backend => <<"http">>,
|
backend => <<"http">>,
|
||||||
method => <<"post">>,
|
method => <<"post">>,
|
||||||
url => <<"http://127.0.0.2:8080">>,
|
url => <<"http://127.0.0.1:18083">>,
|
||||||
headers => #{
|
headers => #{
|
||||||
<<"content-type">> => <<"application/json">>
|
<<"content-type">> => <<"application/json">>
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ load/0
|
-export([ load/0
|
||||||
|
, lookup/1
|
||||||
, lookup/2
|
, lookup/2
|
||||||
, lookup/3
|
, lookup/3
|
||||||
, list/0
|
, list/0
|
||||||
|
|
@ -191,6 +192,10 @@ list_bridges_by_connector(ConnectorId) ->
|
||||||
[B || B = #{raw_config := #{<<"connector">> := Id}} <- list(),
|
[B || B = #{raw_config := #{<<"connector">> := Id}} <- list(),
|
||||||
ConnectorId =:= Id].
|
ConnectorId =:= Id].
|
||||||
|
|
||||||
|
lookup(Id) ->
|
||||||
|
{Type, Name} = parse_bridge_id(Id),
|
||||||
|
lookup(Type, Name).
|
||||||
|
|
||||||
lookup(Type, Name) ->
|
lookup(Type, Name) ->
|
||||||
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
|
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
|
||||||
lookup(Type, Name, RawConf).
|
lookup(Type, Name, RawConf).
|
||||||
|
|
@ -218,7 +223,7 @@ create(Type, Name, Conf) ->
|
||||||
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
|
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
|
||||||
config => Conf}),
|
config => Conf}),
|
||||||
case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
|
case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
|
||||||
parse_confs(Type, Name, Conf), #{force_create => true}) of
|
parse_confs(Type, Name, Conf), #{async_create => true}) of
|
||||||
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
|
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
|
||||||
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
|
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
|
|
@ -247,7 +252,7 @@ update(Type, Name, {OldConf, Conf}) ->
|
||||||
?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one"
|
?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one"
|
||||||
, type => Type, name => Name, config => Conf}),
|
, type => Type, name => Name, config => Conf}),
|
||||||
create(Type, Name, Conf);
|
create(Type, Name, Conf);
|
||||||
{error, Reason} -> {update_bridge_failed, Reason}
|
{error, Reason} -> {error, {update_bridge_failed, Reason}}
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
%% we don't need to recreate the bridge if this config change is only to
|
%% we don't need to recreate the bridge if this config change is only to
|
||||||
|
|
@ -263,7 +268,8 @@ recreate(Type, Name) ->
|
||||||
|
|
||||||
recreate(Type, Name, Conf) ->
|
recreate(Type, Name, Conf) ->
|
||||||
emqx_resource:recreate_local(resource_id(Type, Name),
|
emqx_resource:recreate_local(resource_id(Type, Name),
|
||||||
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
|
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf),
|
||||||
|
#{async_create => true}).
|
||||||
|
|
||||||
create_dry_run(Type, Conf) ->
|
create_dry_run(Type, Conf) ->
|
||||||
Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},
|
Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},
|
||||||
|
|
|
||||||
|
|
@ -160,6 +160,7 @@ t_http_crud_apis(_) ->
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
|
||||||
%% send an message to emqx and the message should be forwarded to the HTTP server
|
%% send an message to emqx and the message should be forwarded to the HTTP server
|
||||||
|
wait_for_resource_ready(BridgeID, 5),
|
||||||
Body = <<"my msg">>,
|
Body = <<"my msg">>,
|
||||||
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
|
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
|
||||||
?assert(
|
?assert(
|
||||||
|
|
@ -212,6 +213,7 @@ t_http_crud_apis(_) ->
|
||||||
}, jsx:decode(Bridge3Str)),
|
}, jsx:decode(Bridge3Str)),
|
||||||
|
|
||||||
%% send an message to emqx again, check the path has been changed
|
%% send an message to emqx again, check the path has been changed
|
||||||
|
wait_for_resource_ready(BridgeID, 5),
|
||||||
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
|
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
|
||||||
?assert(
|
?assert(
|
||||||
receive
|
receive
|
||||||
|
|
@ -320,3 +322,14 @@ auth_header_() ->
|
||||||
|
|
||||||
operation_path(Oper, BridgeID) ->
|
operation_path(Oper, BridgeID) ->
|
||||||
uri(["bridges", BridgeID, "operation", Oper]).
|
uri(["bridges", BridgeID, "operation", Oper]).
|
||||||
|
|
||||||
|
wait_for_resource_ready(InstId, 0) ->
|
||||||
|
ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
|
||||||
|
ct:fail(wait_resource_timeout);
|
||||||
|
wait_for_resource_ready(InstId, Retry) ->
|
||||||
|
case emqx_bridge:lookup(InstId) of
|
||||||
|
{ok, #{resource_data := #{status := started}}} -> ok;
|
||||||
|
_ ->
|
||||||
|
timer:sleep(100),
|
||||||
|
wait_for_resource_ready(InstId, Retry-1)
|
||||||
|
end.
|
||||||
|
|
|
||||||
|
|
@ -241,6 +241,7 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
emqx:subscribe(LocalTopic),
|
emqx:subscribe(LocalTopic),
|
||||||
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
|
wait_for_resource_ready(BridgeIDIngress, 5),
|
||||||
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
||||||
%% we should receive a message on the local broker, with specified topic
|
%% we should receive a message on the local broker, with specified topic
|
||||||
?assert(
|
?assert(
|
||||||
|
|
@ -309,6 +310,7 @@ t_mqtt_conn_bridge_egress(_) ->
|
||||||
emqx:subscribe(RemoteTopic),
|
emqx:subscribe(RemoteTopic),
|
||||||
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
|
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
|
|
||||||
%% we should receive a message on the "remote" broker, with specified topic
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
|
|
@ -370,6 +372,7 @@ t_mqtt_conn_update(_) ->
|
||||||
, <<"status">> := <<"connected">>
|
, <<"status">> := <<"connected">>
|
||||||
, <<"connector">> := ConnctorID
|
, <<"connector">> := ConnctorID
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
wait_for_resource_ready(BridgeIDEgress, 2),
|
||||||
|
|
||||||
%% then we try to update 'server' of the connector, to an unavailable IP address
|
%% then we try to update 'server' of the connector, to an unavailable IP address
|
||||||
%% the update should fail because of 'unreachable' or 'connrefused'
|
%% the update should fail because of 'unreachable' or 'connrefused'
|
||||||
|
|
@ -412,6 +415,11 @@ t_mqtt_conn_update2(_) ->
|
||||||
, <<"status">> := <<"disconnected">>
|
, <<"status">> := <<"disconnected">>
|
||||||
, <<"connector">> := ConnctorID
|
, <<"connector">> := ConnctorID
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
%% We try to fix the 'server' parameter, to another unavailable server..
|
||||||
|
%% The update should success: we don't check the connectivity of the new config
|
||||||
|
%% if the resource is now disconnected.
|
||||||
|
{ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
|
||||||
|
?MQTT_CONNECOTR2(<<"127.0.0.1:2604">>)),
|
||||||
%% we fix the 'server' parameter to a normal one, it should work
|
%% we fix the 'server' parameter to a normal one, it should work
|
||||||
{ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
|
{ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
|
||||||
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
|
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
|
||||||
|
|
@ -444,9 +452,9 @@ t_mqtt_conn_update3(_) ->
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||||
}),
|
}),
|
||||||
#{ <<"id">> := BridgeIDEgress
|
#{ <<"id">> := BridgeIDEgress
|
||||||
, <<"status">> := <<"connected">>
|
|
||||||
, <<"connector">> := ConnctorID
|
, <<"connector">> := ConnctorID
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
wait_for_resource_ready(BridgeIDEgress, 2),
|
||||||
|
|
||||||
%% delete the connector should fail because it is in use by a bridge
|
%% delete the connector should fail because it is in use by a bridge
|
||||||
{ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
|
{ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
|
||||||
|
|
@ -499,6 +507,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
||||||
emqx:subscribe(LocalTopic),
|
emqx:subscribe(LocalTopic),
|
||||||
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
|
wait_for_resource_ready(BridgeIDIngress, 5),
|
||||||
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
||||||
%% we should receive a message on the local broker, with specified topic
|
%% we should receive a message on the local broker, with specified topic
|
||||||
?assert(
|
?assert(
|
||||||
|
|
@ -563,6 +572,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
emqx:subscribe(RemoteTopic),
|
emqx:subscribe(RemoteTopic),
|
||||||
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
|
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
%% we should receive a message on the "remote" broker, with specified topic
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
?assert(
|
?assert(
|
||||||
|
|
@ -583,6 +593,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
RuleTopic = <<"t/1">>,
|
RuleTopic = <<"t/1">>,
|
||||||
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
|
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
|
||||||
emqx:subscribe(RemoteTopic2),
|
emqx:subscribe(RemoteTopic2),
|
||||||
|
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||||
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
|
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
|
||||||
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
||||||
#{ <<"id">> := RuleId
|
#{ <<"id">> := RuleId
|
||||||
|
|
@ -646,3 +657,13 @@ auth_header_() ->
|
||||||
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
|
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
|
||||||
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
|
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
|
||||||
|
|
||||||
|
wait_for_resource_ready(InstId, 0) ->
|
||||||
|
ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
|
||||||
|
ct:fail(wait_resource_timeout);
|
||||||
|
wait_for_resource_ready(InstId, Retry) ->
|
||||||
|
case emqx_bridge:lookup(InstId) of
|
||||||
|
{ok, #{resource_data := #{status := started}}} -> ok;
|
||||||
|
_ ->
|
||||||
|
timer:sleep(100),
|
||||||
|
wait_for_resource_ready(InstId, Retry-1)
|
||||||
|
end.
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@
|
||||||
%% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails,
|
%% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails,
|
||||||
%% the 'status' of the resource will be 'stopped' in this case.
|
%% the 'status' of the resource will be 'stopped' in this case.
|
||||||
%% Defaults to 'false'
|
%% Defaults to 'false'
|
||||||
force_create => boolean()
|
async_create => boolean()
|
||||||
}.
|
}.
|
||||||
-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
|
-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
|
||||||
undefined.
|
undefined.
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,7 @@
|
||||||
%% Calls to the callback module with current resource state
|
%% Calls to the callback module with current resource state
|
||||||
%% They also save the state after the call finished (except query/2,3).
|
%% They also save the state after the call finished (except query/2,3).
|
||||||
-export([ restart/1 %% restart the instance.
|
-export([ restart/1 %% restart the instance.
|
||||||
|
, restart/2
|
||||||
, health_check/1 %% verify if the resource is working normally
|
, health_check/1 %% verify if the resource is working normally
|
||||||
, stop/1 %% stop the instance
|
, stop/1 %% stop the instance
|
||||||
, query/2 %% query the instance
|
, query/2 %% query the instance
|
||||||
|
|
@ -68,7 +69,6 @@
|
||||||
-export([ call_start/3 %% start the instance
|
-export([ call_start/3 %% start the instance
|
||||||
, call_health_check/3 %% verify if the resource is working normally
|
, call_health_check/3 %% verify if the resource is working normally
|
||||||
, call_stop/3 %% stop the instance
|
, call_stop/3 %% stop the instance
|
||||||
, call_config_merge/4 %% merge the config when updating
|
|
||||||
, call_jsonify/2
|
, call_jsonify/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
|
@ -86,12 +86,9 @@
|
||||||
|
|
||||||
-optional_callbacks([ on_query/4
|
-optional_callbacks([ on_query/4
|
||||||
, on_health_check/2
|
, on_health_check/2
|
||||||
, on_config_merge/3
|
|
||||||
, on_jsonify/1
|
, on_jsonify/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
|
|
||||||
|
|
||||||
-callback on_jsonify(resource_config()) -> jsx:json_term().
|
-callback on_jsonify(resource_config()) -> jsx:json_term().
|
||||||
|
|
||||||
%% when calling emqx_resource:start/1
|
%% when calling emqx_resource:start/1
|
||||||
|
|
@ -169,18 +166,17 @@ create_dry_run(ResourceType, Config) ->
|
||||||
-spec create_dry_run_local(resource_type(), resource_config()) ->
|
-spec create_dry_run_local(resource_type(), resource_config()) ->
|
||||||
ok | {error, Reason :: term()}.
|
ok | {error, Reason :: term()}.
|
||||||
create_dry_run_local(ResourceType, Config) ->
|
create_dry_run_local(ResourceType, Config) ->
|
||||||
InstId = emqx_resource_instance:make_test_id(),
|
call_instance(<<?TEST_ID_PREFIX>>, {create_dry_run, ResourceType, Config}).
|
||||||
call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}).
|
|
||||||
|
|
||||||
-spec recreate(instance_id(), resource_type(), resource_config(), term()) ->
|
-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
|
||||||
{ok, resource_data()} | {error, Reason :: term()}.
|
{ok, resource_data()} | {error, Reason :: term()}.
|
||||||
recreate(InstId, ResourceType, Config, Params) ->
|
recreate(InstId, ResourceType, Config, Opts) ->
|
||||||
cluster_call(recreate_local, [InstId, ResourceType, Config, Params]).
|
cluster_call(recreate_local, [InstId, ResourceType, Config, Opts]).
|
||||||
|
|
||||||
-spec recreate_local(instance_id(), resource_type(), resource_config(), term()) ->
|
-spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
|
||||||
{ok, resource_data()} | {error, Reason :: term()}.
|
{ok, resource_data()} | {error, Reason :: term()}.
|
||||||
recreate_local(InstId, ResourceType, Config, Params) ->
|
recreate_local(InstId, ResourceType, Config, Opts) ->
|
||||||
call_instance(InstId, {recreate, InstId, ResourceType, Config, Params}).
|
call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}).
|
||||||
|
|
||||||
-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
|
-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
|
||||||
remove(InstId) ->
|
remove(InstId) ->
|
||||||
|
|
@ -216,7 +212,11 @@ query(InstId, Request, AfterQuery) ->
|
||||||
|
|
||||||
-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
|
-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
|
||||||
restart(InstId) ->
|
restart(InstId) ->
|
||||||
call_instance(InstId, {restart, InstId}).
|
restart(InstId, #{}).
|
||||||
|
|
||||||
|
-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
|
||||||
|
restart(InstId, Opts) ->
|
||||||
|
call_instance(InstId, {restart, InstId, Opts}).
|
||||||
|
|
||||||
-spec stop(instance_id()) -> ok | {error, Reason :: term()}.
|
-spec stop(instance_id()) -> ok | {error, Reason :: term()}.
|
||||||
stop(InstId) ->
|
stop(InstId) ->
|
||||||
|
|
@ -276,14 +276,6 @@ call_health_check(InstId, Mod, ResourceState) ->
|
||||||
call_stop(InstId, Mod, ResourceState) ->
|
call_stop(InstId, Mod, ResourceState) ->
|
||||||
?SAFE_CALL(Mod:on_stop(InstId, ResourceState)).
|
?SAFE_CALL(Mod:on_stop(InstId, ResourceState)).
|
||||||
|
|
||||||
-spec call_config_merge(module(), resource_config(), resource_config(), term()) ->
|
|
||||||
resource_config().
|
|
||||||
call_config_merge(Mod, OldConfig, NewConfig, Params) ->
|
|
||||||
case erlang:function_exported(Mod, on_config_merge, 3) of
|
|
||||||
true -> ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params));
|
|
||||||
false -> NewConfig
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec call_jsonify(module(), resource_config()) -> jsx:json_term().
|
-spec call_jsonify(module(), resource_config()) -> jsx:json_term().
|
||||||
call_jsonify(Mod, Config) ->
|
call_jsonify(Mod, Config) ->
|
||||||
case erlang:function_exported(Mod, on_jsonify, 1) of
|
case erlang:function_exported(Mod, on_jsonify, 1) of
|
||||||
|
|
@ -330,17 +322,17 @@ check_and_create_local(InstId, ResourceType, RawConfig, Opts) ->
|
||||||
check_and_do(ResourceType, RawConfig,
|
check_and_do(ResourceType, RawConfig,
|
||||||
fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end).
|
fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end).
|
||||||
|
|
||||||
-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) ->
|
-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
|
||||||
{ok, resource_data()} | {error, term()}.
|
{ok, resource_data()} | {error, term()}.
|
||||||
check_and_recreate(InstId, ResourceType, RawConfig, Params) ->
|
check_and_recreate(InstId, ResourceType, RawConfig, Opts) ->
|
||||||
check_and_do(ResourceType, RawConfig,
|
check_and_do(ResourceType, RawConfig,
|
||||||
fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Params) end).
|
fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end).
|
||||||
|
|
||||||
-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), term()) ->
|
-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
|
||||||
{ok, resource_data()} | {error, term()}.
|
{ok, resource_data()} | {error, term()}.
|
||||||
check_and_recreate_local(InstId, ResourceType, RawConfig, Params) ->
|
check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) ->
|
||||||
check_and_do(ResourceType, RawConfig,
|
check_and_do(ResourceType, RawConfig,
|
||||||
fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Params) end).
|
fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end).
|
||||||
|
|
||||||
check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
|
check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
|
||||||
case check_config(ResourceType, RawConfig) of
|
case check_config(ResourceType, RawConfig) of
|
||||||
|
|
|
||||||
|
|
@ -15,14 +15,18 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_resource_health_check).
|
-module(emqx_resource_health_check).
|
||||||
|
|
||||||
-export([child_spec/2]).
|
-export([ start_link/2
|
||||||
|
, create_checker/2
|
||||||
-export([start_link/2]).
|
, delete_checker/1
|
||||||
|
]).
|
||||||
|
|
||||||
-export([health_check/2]).
|
-export([health_check/2]).
|
||||||
|
|
||||||
|
-define(SUP, emqx_resource_health_check_sup).
|
||||||
|
-define(ID(NAME), {resource_health_check, NAME}).
|
||||||
|
|
||||||
child_spec(Name, Sleep) ->
|
child_spec(Name, Sleep) ->
|
||||||
#{id => {health_check, Name},
|
#{id => ?ID(Name),
|
||||||
start => {?MODULE, start_link, [Name, Sleep]},
|
start => {?MODULE, start_link, [Name, Sleep]},
|
||||||
restart => transient,
|
restart => transient,
|
||||||
shutdown => 5000, type => worker, modules => [?MODULE]}.
|
shutdown => 5000, type => worker, modules => [?MODULE]}.
|
||||||
|
|
@ -31,8 +35,26 @@ start_link(Name, Sleep) ->
|
||||||
Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]),
|
Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]),
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
||||||
|
create_checker(Name, Sleep) ->
|
||||||
|
create_checker(Name, Sleep, false).
|
||||||
|
|
||||||
|
create_checker(Name, Sleep, Retry) ->
|
||||||
|
case supervisor:start_child(?SUP, child_spec(Name, Sleep)) of
|
||||||
|
{ok, _} -> ok;
|
||||||
|
{error, already_present} -> ok;
|
||||||
|
{error, {already_started, _}} when Retry == false ->
|
||||||
|
ok = delete_checker(Name),
|
||||||
|
create_checker(Name, Sleep, true);
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
delete_checker(Name) ->
|
||||||
|
case supervisor:terminate_child(?SUP, ?ID(Name)) of
|
||||||
|
ok -> supervisor:delete_child(?SUP, ?ID(Name));
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
health_check(Name, SleepTime) ->
|
health_check(Name, SleepTime) ->
|
||||||
timer:sleep(SleepTime),
|
|
||||||
case emqx_resource:health_check(Name) of
|
case emqx_resource:health_check(Name) of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_alarm:deactivate(Name);
|
emqx_alarm:deactivate(Name);
|
||||||
|
|
@ -40,4 +62,5 @@ health_check(Name, SleepTime) ->
|
||||||
emqx_alarm:activate(Name, #{name => Name},
|
emqx_alarm:activate(Name, #{name => Name},
|
||||||
<<Name/binary, " health check failed">>)
|
<<Name/binary, " health check failed">>)
|
||||||
end,
|
end,
|
||||||
|
timer:sleep(SleepTime),
|
||||||
health_check(Name, SleepTime).
|
health_check(Name, SleepTime).
|
||||||
|
|
@ -19,9 +19,7 @@
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
-export([init/1,
|
-export([init/1]).
|
||||||
create_health_check_process/2,
|
|
||||||
delete_health_check_process/1]).
|
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
@ -29,12 +27,3 @@ start_link() ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
|
SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
|
||||||
{ok, {SupFlags, []}}.
|
{ok, {SupFlags, []}}.
|
||||||
|
|
||||||
create_health_check_process(Name, Sleep) ->
|
|
||||||
supervisor:start_child(emqx_resource_health_check_sup,
|
|
||||||
emqx_resource_health_check:child_spec(Name, Sleep)).
|
|
||||||
|
|
||||||
delete_health_check_process(Name) ->
|
|
||||||
_ = supervisor:terminate_child(emqx_resource_health_check_sup, {health_check, Name}),
|
|
||||||
_ = supervisor:delete_child(emqx_resource_health_check_sup, {health_check, Name}),
|
|
||||||
ok.
|
|
||||||
|
|
@ -103,17 +103,17 @@ init({Pool, Id}) ->
|
||||||
handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) ->
|
handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) ->
|
||||||
{reply, do_create(InstId, ResourceType, Config, Opts), State};
|
{reply, do_create(InstId, ResourceType, Config, Opts), State};
|
||||||
|
|
||||||
handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) ->
|
handle_call({create_dry_run, ResourceType, Config}, _From, State) ->
|
||||||
{reply, do_create_dry_run(InstId, ResourceType, Config), State};
|
{reply, do_create_dry_run(ResourceType, Config), State};
|
||||||
|
|
||||||
handle_call({recreate, InstId, ResourceType, Config, Params}, _From, State) ->
|
handle_call({recreate, InstId, ResourceType, Config, Opts}, _From, State) ->
|
||||||
{reply, do_recreate(InstId, ResourceType, Config, Params), State};
|
{reply, do_recreate(InstId, ResourceType, Config, Opts), State};
|
||||||
|
|
||||||
handle_call({remove, InstId}, _From, State) ->
|
handle_call({remove, InstId}, _From, State) ->
|
||||||
{reply, do_remove(InstId), State};
|
{reply, do_remove(InstId), State};
|
||||||
|
|
||||||
handle_call({restart, InstId}, _From, State) ->
|
handle_call({restart, InstId, Opts}, _From, State) ->
|
||||||
{reply, do_restart(InstId), State};
|
{reply, do_restart(InstId, Opts), State};
|
||||||
|
|
||||||
handle_call({stop, InstId}, _From, State) ->
|
handle_call({stop, InstId}, _From, State) ->
|
||||||
{reply, do_stop(InstId), State};
|
{reply, do_stop(InstId), State};
|
||||||
|
|
@ -140,25 +140,30 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
%% suppress the race condition check, as these functions are protected in gproc workers
|
%% suppress the race condition check, as these functions are protected in gproc workers
|
||||||
-dialyzer({nowarn_function, [do_recreate/4,
|
-dialyzer({nowarn_function, [ do_recreate/4
|
||||||
do_create/4,
|
, do_create/4
|
||||||
do_restart/1,
|
, do_restart/2
|
||||||
do_stop/1,
|
, do_start/4
|
||||||
do_health_check/1]}).
|
, do_stop/1
|
||||||
|
, do_health_check/1
|
||||||
|
, start_and_check/5
|
||||||
|
]}).
|
||||||
|
|
||||||
do_recreate(InstId, ResourceType, NewConfig, Params) ->
|
do_recreate(InstId, ResourceType, NewConfig, Opts) ->
|
||||||
case lookup(InstId) of
|
case lookup(InstId) of
|
||||||
{ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
|
{ok, #{mod := ResourceType, status := started} = Data} ->
|
||||||
Config = emqx_resource:call_config_merge(ResourceType, OldConfig,
|
%% If this resource is in use (status='started'), we should make sure
|
||||||
NewConfig, Params),
|
%% the new config is OK before removing the old one.
|
||||||
TestInstId = make_test_id(),
|
case do_create_dry_run(ResourceType, NewConfig) of
|
||||||
case do_create_dry_run(TestInstId, ResourceType, Config) of
|
|
||||||
ok ->
|
ok ->
|
||||||
do_remove(ResourceType, InstId, ResourceState, false),
|
do_remove(Data, false),
|
||||||
do_create(InstId, ResourceType, Config, #{force_create => true});
|
do_create(InstId, ResourceType, NewConfig, Opts);
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end;
|
end;
|
||||||
|
{ok, #{mod := ResourceType, status := _} = Data} ->
|
||||||
|
do_remove(Data, false),
|
||||||
|
do_create(InstId, ResourceType, NewConfig, Opts);
|
||||||
{ok, #{mod := Mod}} when Mod =/= ResourceType ->
|
{ok, #{mod := Mod}} when Mod =/= ResourceType ->
|
||||||
{error, updating_to_incorrect_resource_type};
|
{error, updating_to_incorrect_resource_type};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
|
|
@ -166,105 +171,96 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_create(InstId, ResourceType, Config, Opts) ->
|
do_create(InstId, ResourceType, Config, Opts) ->
|
||||||
ForceCreate = maps:get(force_create, Opts, false),
|
|
||||||
case lookup(InstId) of
|
case lookup(InstId) of
|
||||||
{ok, _} -> {ok, already_created};
|
{ok, _} ->
|
||||||
_ ->
|
{ok, already_created};
|
||||||
Res0 = #{id => InstId, mod => ResourceType, config => Config,
|
{error, not_found} ->
|
||||||
status => starting, state => undefined},
|
case do_start(InstId, ResourceType, Config, Opts) of
|
||||||
%% The `emqx_resource:call_start/3` need the instance exist beforehand
|
ok ->
|
||||||
ets:insert(emqx_resource_instance, {InstId, Res0}),
|
ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId),
|
||||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
|
||||||
{ok, ResourceState} ->
|
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
|
|
||||||
%% this is the first time we do health check, this will update the
|
|
||||||
%% status and then do ets:insert/2
|
|
||||||
_ = do_health_check(Res0#{state => ResourceState}),
|
|
||||||
HealthCheckInterval = maps:get(health_check_interval, Opts, 15000),
|
|
||||||
emqx_resource_health_check_sup:create_health_check_process(InstId, HealthCheckInterval),
|
|
||||||
{ok, force_lookup(InstId)};
|
{ok, force_lookup(InstId)};
|
||||||
{error, Reason} when ForceCreate == true ->
|
Error ->
|
||||||
logger:error("start ~ts resource ~ts failed: ~p, "
|
Error
|
||||||
"force_create it as a stopped resource",
|
|
||||||
[ResourceType, InstId, Reason]),
|
|
||||||
ets:insert(emqx_resource_instance, {InstId, Res0}),
|
|
||||||
{ok, Res0};
|
|
||||||
{error, Reason} when ForceCreate == false ->
|
|
||||||
ets:delete(emqx_resource_instance, InstId),
|
|
||||||
{error, Reason}
|
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_create_dry_run(InstId, ResourceType, Config) ->
|
do_create_dry_run(ResourceType, Config) ->
|
||||||
|
InstId = make_test_id(),
|
||||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||||
{ok, ResourceState0} ->
|
{ok, ResourceState} ->
|
||||||
Return = case emqx_resource:call_health_check(InstId, ResourceType, ResourceState0) of
|
case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
|
||||||
{ok, ResourceState1} -> ok;
|
{ok, _} -> ok;
|
||||||
{error, Reason, ResourceState1} ->
|
{error, Reason, _} -> {error, Reason}
|
||||||
{error, Reason}
|
end;
|
||||||
end,
|
|
||||||
_ = emqx_resource:call_stop(InstId, ResourceType, ResourceState1),
|
|
||||||
Return;
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_remove(InstId) ->
|
do_remove(Instance) ->
|
||||||
case lookup(InstId) of
|
do_remove(Instance, true).
|
||||||
{ok, #{mod := Mod, state := ResourceState}} ->
|
|
||||||
do_remove(Mod, InstId, ResourceState);
|
|
||||||
Error ->
|
|
||||||
Error
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_remove(Mod, InstId, ResourceState) ->
|
do_remove(InstId, ClearMetrics) when is_binary(InstId) ->
|
||||||
do_remove(Mod, InstId, ResourceState, true).
|
do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]);
|
||||||
|
do_remove(#{id := InstId} = Data, ClearMetrics) ->
|
||||||
do_remove(Mod, InstId, ResourceState, ClearMetrics) ->
|
_ = do_stop(Data),
|
||||||
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
|
||||||
ets:delete(emqx_resource_instance, InstId),
|
ets:delete(emqx_resource_instance, InstId),
|
||||||
case ClearMetrics of
|
case ClearMetrics of
|
||||||
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
|
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
|
||||||
false -> ok
|
false -> ok
|
||||||
end,
|
end,
|
||||||
_ = emqx_resource_health_check_sup:delete_health_check_process(InstId),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_restart(InstId) ->
|
do_restart(InstId, Opts) ->
|
||||||
case lookup(InstId) of
|
case lookup(InstId) of
|
||||||
{ok, #{mod := Mod, state := ResourceState, config := Config} = Data} ->
|
{ok, #{mod := ResourceType, config := Config} = Data} ->
|
||||||
_ = case ResourceState of
|
ok = do_stop(Data),
|
||||||
undefined -> ok;
|
do_start(InstId, ResourceType, Config, Opts);
|
||||||
_ -> emqx_resource:call_stop(InstId, Mod, ResourceState)
|
Error ->
|
||||||
end,
|
Error
|
||||||
case emqx_resource:call_start(InstId, Mod, Config) of
|
end.
|
||||||
{ok, NewResourceState} ->
|
|
||||||
ets:insert(emqx_resource_instance,
|
do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) ->
|
||||||
{InstId, Data#{state => NewResourceState, status => started}}),
|
InitData = #{id => InstId, mod => ResourceType, config => Config,
|
||||||
ok;
|
status => starting, state => undefined},
|
||||||
|
%% The `emqx_resource:call_start/3` need the instance exist beforehand
|
||||||
|
ets:insert(emqx_resource_instance, {InstId, InitData}),
|
||||||
|
case maps:get(async_create, Opts, false) of
|
||||||
|
false ->
|
||||||
|
start_and_check(InstId, ResourceType, Config, Opts, InitData);
|
||||||
|
true ->
|
||||||
|
spawn(fun() ->
|
||||||
|
start_and_check(InstId, ResourceType, Config, Opts, InitData)
|
||||||
|
end),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
start_and_check(InstId, ResourceType, Config, Opts, Data) ->
|
||||||
|
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||||
|
{ok, ResourceState} ->
|
||||||
|
Data2 = Data#{state => ResourceState},
|
||||||
|
ets:insert(emqx_resource_instance, {InstId, Data2}),
|
||||||
|
case maps:get(async_create, Opts, false) of
|
||||||
|
false -> do_health_check(Data2);
|
||||||
|
true -> emqx_resource_health_check:create_checker(InstId,
|
||||||
|
maps:get(health_check_interval, Opts, 15000))
|
||||||
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end;
|
|
||||||
Error ->
|
|
||||||
Error
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_stop(InstId) ->
|
do_stop(InstId) when is_binary(InstId) ->
|
||||||
case lookup(InstId) of
|
do_with_instance_data(InstId, fun do_stop/1, []);
|
||||||
{ok, #{mod := Mod, state := ResourceState} = Data} ->
|
do_stop(#{state := undefined}) ->
|
||||||
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
|
||||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
|
||||||
ok;
|
ok;
|
||||||
Error ->
|
do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) ->
|
||||||
Error
|
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
||||||
end.
|
_ = emqx_resource_health_check:delete_checker(InstId),
|
||||||
|
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
||||||
|
ok.
|
||||||
|
|
||||||
do_health_check(InstId) when is_binary(InstId) ->
|
do_health_check(InstId) when is_binary(InstId) ->
|
||||||
case lookup(InstId) of
|
do_with_instance_data(InstId, fun do_health_check/1, []);
|
||||||
{ok, Data} -> do_health_check(Data);
|
|
||||||
Error -> Error
|
|
||||||
end;
|
|
||||||
do_health_check(#{state := undefined}) ->
|
do_health_check(#{state := undefined}) ->
|
||||||
{error, resource_not_initialized};
|
{error, resource_not_initialized};
|
||||||
do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
|
do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
|
||||||
|
|
@ -284,6 +280,12 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
|
||||||
%% internal functions
|
%% internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
do_with_instance_data(InstId, Do, Args) ->
|
||||||
|
case lookup(InstId) of
|
||||||
|
{ok, Data} -> erlang:apply(Do, [Data | Args]);
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
proc_name(Mod, Id) ->
|
proc_name(Mod, Id) ->
|
||||||
list_to_atom(lists:concat([Mod, "_", Id])).
|
list_to_atom(lists:concat([Mod, "_", Id])).
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ init([]) ->
|
||||||
#{id => emqx_resource_health_check_sup,
|
#{id => emqx_resource_health_check_sup,
|
||||||
start => {emqx_resource_health_check_sup, start_link, []},
|
start => {emqx_resource_health_check_sup, start_link, []},
|
||||||
restart => transient,
|
restart => transient,
|
||||||
shutdown => 5000, type => supervisor, modules => [emqx_resource_health_check_sup]},
|
shutdown => infinity, type => supervisor, modules => [emqx_resource_health_check_sup]},
|
||||||
{ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}.
|
{ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}.
|
||||||
|
|
||||||
%% internal functions
|
%% internal functions
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue