diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 00c03fd3a..fdc6d255b 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -146,29 +146,22 @@ on_stop(InstId, _State = #{pool_name := PoolName}) -> on_get_status(_InstId, State = #{client_config := Config}) -> case emqx_s3_client:aws_config(Config) of {error, Reason} -> - {?status_disconnected, State, Reason}; + {?status_disconnected, State, map_error_details(Reason)}; AWSConfig -> try erlcloud_s3:list_buckets(AWSConfig) of Props when is_list(Props) -> ?status_connected catch - error:{aws_error, {http_error, _Code, _, Reason}} -> - {?status_disconnected, State, Reason}; - error:{aws_error, {socket_error, Reason}} -> - {?status_disconnected, State, Reason} + error:Error -> + {?status_disconnected, State, map_error_details(Error)} end end. -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> {ok, state()} | {error, _Reason}. on_add_channel(_InstId, State = #{channels := Channels}, ChannelId, Config) -> - try - ChannelState = start_channel(State, Config), - {ok, State#{channels => Channels#{ChannelId => ChannelState}}} - catch - throw:Reason -> - {error, Reason} - end. + ChannelState = start_channel(State, Config), + {ok, State#{channels => Channels#{ChannelId => ChannelState}}}. -spec on_remove_channel(_InstanceId :: resource_id(), state(), channel_id()) -> {ok, state()}. @@ -217,7 +210,8 @@ start_channel(State, #{ max_records := MaxRecords }, container := Container, - bucket := Bucket + bucket := Bucket, + key := Key } }) -> AggregId = {Type, Name}, @@ -226,7 +220,7 @@ start_channel(State, #{ max_records => MaxRecords, work_dir => work_dir(Type, Name) }, - Template = ensure_ok(emqx_bridge_s3_upload:mk_key_template(Parameters)), + Template = emqx_bridge_s3_upload:mk_key_template(Key), DeliveryOpts = #{ bucket => Bucket, key => Template, @@ -253,11 +247,6 @@ start_channel(State, #{ on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end }. -ensure_ok({ok, V}) -> - V; -ensure_ok({error, Reason}) -> - throw(Reason). - upload_options(Parameters) -> #{acl => maps:get(acl, Parameters, undefined)}. @@ -285,7 +274,7 @@ channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, S check_bucket_accessible(Bucket, #{client_config := Config}) -> case emqx_s3_client:aws_config(Config) of {error, Reason} -> - throw({unhealthy_target, Reason}); + throw({unhealthy_target, map_error_details(Reason)}); AWSConfig -> try erlcloud_s3:list_objects(Bucket, [{max_keys, 1}], AWSConfig) of Props when is_list(Props) -> @@ -293,8 +282,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) -> catch error:{aws_error, {http_error, 404, _, _Reason}} -> throw({unhealthy_target, "Bucket does not exist"}); - error:{aws_error, {socket_error, Reason}} -> - throw({unhealthy_target, emqx_utils:format(Reason)}) + error:Error -> + throw({unhealthy_target, map_error_details(Error)}) end end. @@ -304,8 +293,7 @@ check_aggreg_upload_errors(AggregId) -> %% TODO %% This approach means that, for example, 3 upload failures will cause %% the channel to be marked as unhealthy for 3 consecutive health checks. - ErrorMessage = emqx_utils:format(Error), - throw({unhealthy_target, ErrorMessage}); + throw({unhealthy_target, map_error_details(Error)}); [] -> ok end. @@ -384,16 +372,38 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), ok; {error, Reason} -> - {error, {unrecoverable_error, Reason}} + {error, {unrecoverable_error, emqx_utils:explain_posix(Reason)}} end. -map_error({socket_error, _} = Reason) -> - {recoverable_error, Reason}; -map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 -> +map_error(Error) -> + {map_error_class(Error), map_error_details(Error)}. + +map_error_class({s3_error, _, _}) -> + unrecoverable_error; +map_error_class({aws_error, Error}) -> + map_error_class(Error); +map_error_class({socket_error, _}) -> + recoverable_error; +map_error_class({http_error, Status, _, _}) when Status >= 500 -> %% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList - {recoverable_error, Reason}; -map_error(Reason) -> - {unrecoverable_error, Reason}. + recoverable_error; +map_error_class(_Error) -> + unrecoverable_error. + +map_error_details({s3_error, Code, Message}) -> + emqx_utils:format("S3 error: ~s ~s", [Code, Message]); +map_error_details({aws_error, Error}) -> + map_error_details(Error); +map_error_details({socket_error, Reason}) -> + emqx_utils:format("Socket error: ~s", [emqx_utils:readable_error_msg(Reason)]); +map_error_details({http_error, _, _, _} = Error) -> + emqx_utils:format("AWS error: ~s", [map_aws_error_details(Error)]); +map_error_details({failed_to_obtain_credentials, Error}) -> + emqx_utils:format("Unable to obtain AWS credentials: ~s", [map_error_details(Error)]); +map_error_details({upload_failed, Error}) -> + map_error_details(Error); +map_error_details(Error) -> + Error. render_bucket(Template, Data) -> case emqx_template:render(Template, {emqx_jsonish, Data}) of @@ -416,6 +426,32 @@ render_content(Template, Data) -> iolist_to_string(IOList) -> unicode:characters_to_list(IOList). +%% + +-include_lib("xmerl/include/xmerl.hrl"). + +-spec map_aws_error_details(_AWSError) -> + unicode:chardata(). +map_aws_error_details({http_error, _Status, _, Body}) -> + try xmerl_scan:string(unicode:characters_to_list(Body), [{quiet, true}]) of + {Error = #xmlElement{name = 'Error'}, _} -> + map_aws_error_details(Error); + _ -> + Body + catch + exit:_ -> + Body + end; +map_aws_error_details(#xmlElement{content = Content}) -> + Code = extract_xml_text(lists:keyfind('Code', #xmlElement.name, Content)), + Message = extract_xml_text(lists:keyfind('Message', #xmlElement.name, Content)), + [Code, $:, $\s | Message]. + +extract_xml_text(#xmlElement{content = Content}) -> + [Fragment || #xmlText{value = Fragment} <- Content]; +extract_xml_text(false) -> + []. + %% `emqx_connector_aggreg_delivery` APIs -spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t(). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl index 2bf12f24b..bedefc7c5 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_upload.erl @@ -29,7 +29,10 @@ ]). %% Internal exports --export([convert_actions/2]). +-export([ + convert_actions/2, + validate_key_template/1 +]). -define(DEFAULT_AGGREG_BATCH_SIZE, 100). -define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>). @@ -137,7 +140,10 @@ fields(s3_aggregated_upload_parameters) -> )} ], emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [ - {key, #{desc => ?DESC(s3_aggregated_upload_key)}} + {key, #{ + desc => ?DESC(s3_aggregated_upload_key), + validator => fun ?MODULE:validate_key_template/1 + }} ]), emqx_s3_schema:fields(s3_uploader) ]); @@ -246,23 +252,13 @@ convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := Resou Conf#{<<"resource_opts">> := NResourceOpts} end. -%% Interpreting options - --spec mk_key_template(_Parameters :: map()) -> - {ok, emqx_template:str()} | {error, _Reason}. -mk_key_template(#{key := Key}) -> - Template = emqx_template:parse(Key), +validate_key_template(Conf) -> + Template = emqx_template:parse(Conf), case validate_bindings(emqx_template:placeholders(Template)) of - UsedBindings when is_list(UsedBindings) -> - SuffixTemplate = mk_suffix_template(UsedBindings), - case emqx_template:is_const(SuffixTemplate) of - true -> - {ok, Template}; - false -> - {ok, Template ++ SuffixTemplate} - end; - Error = {error, _} -> - Error + Bindings when is_list(Bindings) -> + ok; + {error, {disallowed_placeholders, Disallowed}} -> + {error, emqx_utils:format("Template placeholders are disallowed: ~p", [Disallowed])} end. validate_bindings(Bindings) -> @@ -276,7 +272,22 @@ validate_bindings(Bindings) -> [] -> Bindings; Disallowed -> - {error, {invalid_key_template, {disallowed_placeholders, Disallowed}}} + {error, {disallowed_placeholders, Disallowed}} + end. + +%% Interpreting options + +-spec mk_key_template(unicode:chardata()) -> + emqx_template:str(). +mk_key_template(Key) -> + Template = emqx_template:parse(Key), + UsedBindings = emqx_template:placeholders(Template), + SuffixTemplate = mk_suffix_template(UsedBindings), + case emqx_template:is_const(SuffixTemplate) of + true -> + Template; + false -> + Template ++ SuffixTemplate end. mk_suffix_template(UsedBindings) -> diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl index f8eaa1b3a..ea69a346f 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl @@ -134,6 +134,22 @@ action_config(Name, ConnectorId) -> t_start_stop(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped). +t_create_unavailable_credentials(Config) -> + ConnectorName = ?config(connector_name, Config), + ConnectorType = ?config(connector_type, Config), + ConnectorConfig = maps:without( + [<<"access_key_id">>, <<"secret_access_key">>], + ?config(connector_config, Config) + ), + ?assertMatch( + {ok, + {{_HTTP, 201, _}, _, #{ + <<"status_reason">> := + <<"Unable to obtain AWS credentials:", _/bytes>> + }}}, + emqx_bridge_v2_testlib:create_connector_api(ConnectorName, ConnectorType, ConnectorConfig) + ). + t_ignore_batch_opts(Config) -> {ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config), ?assertMatch( @@ -159,6 +175,13 @@ t_start_broken_update_restart(Config) -> _Attempts = 20, ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ConnectorId)) ), + ?assertMatch( + {ok, + {{_HTTP, 200, _}, _, #{ + <<"status_reason">> := <<"AWS error: SignatureDoesNotMatch:", _/bytes>> + }}}, + emqx_bridge_v2_testlib:get_connector_api(Type, Name) + ), ?assertMatch( {ok, {{_HTTP, 200, _}, _, _}}, emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConf) diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index b7c17bbaa..345c2e9aa 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -177,6 +177,27 @@ t_create_invalid_config(Config) -> ) ). +t_create_invalid_config_key_template(Config) -> + ?assertMatch( + {error, + {_Status, _, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"Template placeholders are disallowed:", _/bytes>>, + <<"path">> := <<"root.parameters.key">> + } + }}}, + emqx_bridge_v2_testlib:create_bridge_api( + Config, + _Overrides = #{ + <<"parameters">> => #{ + <<"key">> => <<"${action}/${foo}:${bar.rfc3339}">> + } + } + ) + ). + t_update_invalid_config(Config) -> ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch( diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 536b427b3..e4f0d91d1 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -65,6 +65,7 @@ flattermap/2, tcp_keepalive_opts/4, format/1, + format/2, call_first_defined/1, ntoa/1, foldl_while/3, @@ -565,6 +566,9 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) -> format(Term) -> iolist_to_binary(io_lib:format("~0p", [Term])). +format(Fmt, Args) -> + unicode:characters_to_binary(io_lib:format(Fmt, Args)). + -spec call_first_defined(list({module(), atom(), list()})) -> term() | no_return(). call_first_defined([{Module, Function, Args} | Rest]) -> try diff --git a/changes/ee/breaking-13332.en.md b/changes/ee/breaking-13332.en.md new file mode 100644 index 000000000..0b5bf5896 --- /dev/null +++ b/changes/ee/breaking-13332.en.md @@ -0,0 +1,4 @@ +When an S3 Bridge is improperly configured, error messages now contain more informative and easy to read details. + +## Breaking changes +* S3 Bridge configuration with invalid aggregated upload key template will no longer work. Before this change, such configuration was considered valid but the bridge would never work anyway.