diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index d681c82c3..0cdd02f35 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -23,6 +23,7 @@ -include("emqx_resource_utils.hrl"). -include("emqx_resource_errors.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(gen_statem). @@ -65,6 +66,10 @@ ?REPLY(FROM, REQUEST, SENT, RESULT) || ?QUERY(FROM, REQUEST, SENT) <- BATCH ]). +-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerPid), + {Ref, BatchOrQuery, IsRetriable, WorkerPid} +). +-define(RETRY_IDX, 3). -type id() :: binary(). -type index() :: pos_integer(). @@ -282,7 +287,7 @@ pick_cast(Id, Key, Query) -> resume_from_blocked(Data) -> #{inflight_tid := InflightTID} = Data, - case inflight_get_first(InflightTID) of + case inflight_get_first_retriable(InflightTID) of empty -> {next_state, running, Data}; {Ref, FirstQuery} -> @@ -298,6 +303,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> index := Index, resume_interval := ResumeT } = Data0, + ?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), QueryOpts = #{}, %% if we are retrying an inflight query, it has been sent HasBeenSent = true, @@ -306,7 +312,13 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> %% Send failed because resource is down {nack, PostFn} -> PostFn(), - ?tp(resource_worker_retry_inflight_failed, #{query_or_batch => QueryOrBatch}), + ?tp( + resource_worker_retry_inflight_failed, + #{ + ref => Ref, + query_or_batch => QueryOrBatch + } + ), {keep_state, Data0, {state_timeout, ResumeT, unblock}}; %% Send ok or failed but the resource is working {ack, PostFn} -> @@ -318,7 +330,13 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> %% requests (repeated and original) have the safe `Ref', %% we bump the counter when removing it from the table. IsAcked andalso PostFn(), - ?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}), + ?tp( + resource_worker_retry_inflight_succeeded, + #{ + ref => Ref, + query_or_batch => QueryOrBatch + } + ), resume_from_blocked(Data0) end. @@ -431,17 +449,37 @@ do_flush( %% And only in that case. nack -> ok = replayq:ack(Q1, QAckRef), - ShouldPreserveInInflight = - is_inflight_full_result(Result) orelse + %% We might get a retriable response without having added + %% the request to the inflight table (e.g.: sync request, + %% but resource health check failed prior to calling and + %% so we didn't even call it). In that case, we must then + %% add it to the inflight table. + IsRetriable = + is_recoverable_error_result(Result) orelse is_not_connected_result(Result), - ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Request, Id, Index), + ShouldPreserveInInflight = is_not_connected_result(Result), + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerPid), + ShouldPreserveInInflight andalso + inflight_append(InflightTID, InflightItem, Id, Index), + IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp( + resource_worker_flush_nack, + #{ + ref => Ref, + is_retriable => IsRetriable, + batch_or_query => Request, + result => Result + } + ), {next_state, blocked, Data0}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp(resource_worker_flush_ack, #{batch_or_query => Request}), case queue_count(Q1) > 0 of true -> {keep_state, Data0, [{next_event, internal, flush}]}; @@ -471,17 +509,37 @@ do_flush(Data0, #{ %% And only in that case. nack -> ok = replayq:ack(Q1, QAckRef), - ShouldPreserveInInflight = - is_inflight_full_result(Result) orelse + %% We might get a retriable response without having added + %% the request to the inflight table (e.g.: sync request, + %% but resource health check failed prior to calling and + %% so we didn't even call it). In that case, we must then + %% add it to the inflight table. + IsRetriable = + is_recoverable_error_result(Result) orelse is_not_connected_result(Result), - ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Batch, Id, Index), + ShouldPreserveInInflight = is_not_connected_result(Result), + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + ShouldPreserveInInflight andalso + inflight_append(InflightTID, InflightItem, Id, Index), + IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp( + resource_worker_flush_nack, + #{ + ref => Ref, + is_retriable => IsRetriable, + batch_or_query => Batch, + result => Result + } + ), {next_state, blocked, Data0}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp(resource_worker_flush_ack, #{batch_or_query => Batch}), CurrentCount = queue_count(Q1), case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> @@ -603,11 +661,6 @@ handle_query_result_pure(Id, Result, HasBeenSent) -> end, {ack, PostFn}. -is_inflight_full_result({async_return, inflight_full}) -> - true; -is_inflight_full_result(_) -> - false. - is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when Error =:= not_connected; Error =:= blocked -> @@ -615,6 +668,11 @@ is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when is_not_connected_result(_) -> false. +is_recoverable_error_result({error, {recoverable_error, _Reason}}) -> + true; +is_recoverable_error_result(_) -> + false. + call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of @@ -653,7 +711,7 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ). apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> - ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), + ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => sync}), InflightTID = maps:get(inflight_name, QueryOpts, undefined), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), ?APPLY_RESOURCE( @@ -664,13 +722,18 @@ apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, %% when resuming. {async_return, inflight_full}; false -> - ok = inflight_append(InflightTID, Ref, Query, Id, Index), + IsRetriable = false, + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), + ok = inflight_append(InflightTID, InflightItem, Id, Index), Mod:on_query(Id, Request, ResSt) end, Request ); apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> - ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), + ?tp(call_query_async, #{ + id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async + }), InflightTID = maps:get(inflight_name, QueryOpts, undefined), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), ?APPLY_RESOURCE( @@ -681,14 +744,19 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt false -> ReplyFun = fun ?MODULE:reply_after_query/7, Args = [self(), Id, Index, InflightTID, Ref, Query], - ok = inflight_append(InflightTID, Ref, Query, Id, Index), + IsRetriable = false, + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), + ok = inflight_append(InflightTID, InflightItem, Id, Index), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), {async_return, Result} end, Request ); apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> - ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), + ?tp(call_batch_query, #{ + id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync + }), InflightTID = maps:get(inflight_name, QueryOpts, undefined), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), Requests = [Request || ?QUERY(_From, Request, _) <- Batch], @@ -700,13 +768,18 @@ apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, %% when resuming. {async_return, inflight_full}; false -> - ok = inflight_append(InflightTID, Ref, Batch, Id, Index), + IsRetriable = false, + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + ok = inflight_append(InflightTID, InflightItem, Id, Index), Mod:on_batch_query(Id, Requests, ResSt) end, Batch ); apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> - ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), + ?tp(call_batch_query_async, #{ + id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async + }), InflightTID = maps:get(inflight_name, QueryOpts, undefined), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), ?APPLY_RESOURCE( @@ -718,7 +791,10 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt ReplyFun = fun ?MODULE:batch_reply_after_query/7, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], - ok = inflight_append(InflightTID, Ref, Batch, Id, Index), + IsRetriable = false, + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + ok = inflight_append(InflightTID, InflightItem, Id, Index), Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), {async_return, Result} end, @@ -738,8 +814,18 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee IsAcked andalso PostFn(), case Action of nack -> + ?tp(resource_worker_reply_after_query, #{ + action => nack, + batch_or_query => ?QUERY(From, Request, HasBeenSent), + result => Result + }), ?MODULE:block(Pid); ack -> + ?tp(resource_worker_reply_after_query, #{ + action => ack, + batch_or_query => ?QUERY(From, Request, HasBeenSent), + result => Result + }), ok end. @@ -756,8 +842,14 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), case Action of nack -> + ?tp(resource_worker_reply_after_query, #{ + action => nack, batch_or_query => Batch, result => Result + }), ?MODULE:block(Pid); ack -> + ?tp(resource_worker_reply_after_query, #{ + action => ack, batch_or_query => Batch, result => Result + }), ok end. @@ -802,24 +894,28 @@ inflight_new(InfltWinSZ, Id, Index) -> emqx_resource_worker_inflight_tab, [ordered_set, public, {write_concurrency, true}] ), - inflight_append(TableId, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index), + inflight_append(TableId, {?MAX_SIZE_REF, {max_size, InfltWinSZ}}, Id, Index), %% we use this counter because we might deal with batches as %% elements. - inflight_append(TableId, ?SIZE_REF, 0, Id, Index), + inflight_append(TableId, {?SIZE_REF, 0}, Id, Index), TableId. -inflight_get_first(InflightTID) -> - case ets:next(InflightTID, ?MAX_SIZE_REF) of +-spec inflight_get_first_retriable(ets:tid()) -> + empty | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}. +inflight_get_first_retriable(InflightTID) -> + MatchSpec = + ets:fun2ms( + fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerPid)) when + IsRetriable =:= true + -> + {Ref, BatchOrQuery} + end + ), + case ets:select(InflightTID, MatchSpec, _Limit = 1) of '$end_of_table' -> empty; - Ref -> - case ets:lookup(InflightTID, Ref) of - [Object] -> - Object; - [] -> - %% it might have been dropped - inflight_get_first(InflightTID) - end + {[{Ref, BatchOrQuery}], _Continuation} -> + {Ref, BatchOrQuery} end. is_inflight_full(undefined) -> @@ -844,37 +940,60 @@ inflight_num_msgs(InflightTID) -> [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF), Size. -inflight_append(undefined, _Ref, _Query, _Id, _Index) -> +inflight_append(undefined, _InflightItem, _Id, _Index) -> ok; -inflight_append(InflightTID, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) -> +inflight_append( + InflightTID, + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerPid), + Id, + Index +) -> Batch = mark_as_sent(Batch0), - IsNew = ets:insert_new(InflightTID, {Ref, Batch}), + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - ?tp(resource_worker_appended_to_inflight, #{batch => Batch, is_new => IsNew}), + ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; -inflight_append(InflightTID, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) -> +inflight_append( + InflightTID, + ?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerPid), + Id, + Index +) -> Query = mark_as_sent(Query0), - IsNew = ets:insert_new(InflightTID, {Ref, Query}), + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), + IsNew = ets:insert_new(InflightTID, InflightItem), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - ?tp(resource_worker_appended_to_inflight, #{query => Query, is_new => IsNew}), + ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; -inflight_append(InflightTID, Ref, Data, _Id, _Index) -> +inflight_append(InflightTID, {Ref, Data}, _Id, _Index) -> ets:insert(InflightTID, {Ref, Data}), %% this is a metadata row being inserted; therefore, we don't bump %% the inflight metric. ok. +%% a request was already appended and originally not retriable, but an +%% error occurred and it is now retriable. +mark_inflight_as_retriable(undefined, _Ref) -> + ok; +mark_inflight_as_retriable(InflightTID, Ref) -> + _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}), + ok. + ack_inflight(undefined, _Ref, _Id, _Index) -> false; ack_inflight(InflightTID, Ref, Id, Index) -> Count = case ets:take(InflightTID, Ref) of - [{Ref, ?QUERY(_, _, _)}] -> 1; - [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); - _ -> 0 + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerPid)] -> + 1; + [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerPid)] -> + length(Batch); + _ -> + 0 end, IsAcked = Count > 0, IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 6c03e93cc..bbd2ba058 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -85,6 +85,9 @@ on_query(_InstId, get_state_failed, State) -> on_query(_InstId, block, #{pid := Pid}) -> Pid ! block, ok; +on_query(_InstId, block_now, #{pid := Pid}) -> + Pid ! block, + {error, {resource_error, #{reason => blocked, msg => blocked}}}; on_query(_InstId, resume, #{pid := Pid}) -> Pid ! resume, ok; @@ -138,7 +141,13 @@ on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) -> ok; on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) -> Pid ! {get, ReplyFun}, - ok. + ok; +on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) -> + Pid ! {block_now, ReplyFun}, + {ok, Pid}; +on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) -> + Pid ! {big_payload, Payload, ReplyFun}, + {ok, Pid}. on_batch_query(InstId, BatchReq, State) -> %% Requests can be either 'get_counter' or 'inc_counter', but @@ -147,17 +156,22 @@ on_batch_query(InstId, BatchReq, State) -> {inc_counter, _} -> batch_inc_counter(sync, InstId, BatchReq, State); get_counter -> - batch_get_counter(sync, InstId, State) + batch_get_counter(sync, InstId, State); + {big_payload, _Payload} -> + batch_big_payload(sync, InstId, BatchReq, State) end. on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> - %% Requests can be either 'get_counter' or 'inc_counter', but - %% cannot be mixed. + %% Requests can be of multiple types, but cannot be mixed. case hd(BatchReq) of {inc_counter, _} -> batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State); get_counter -> - batch_get_counter({async, ReplyFunAndArgs}, InstId, State) + batch_get_counter({async, ReplyFunAndArgs}, InstId, State); + block_now -> + on_query_async(InstId, block_now, ReplyFunAndArgs, State); + {big_payload, _Payload} -> + batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State) end. batch_inc_counter(CallMode, InstId, BatchReq, State) -> @@ -184,6 +198,19 @@ batch_get_counter(sync, InstId, State) -> batch_get_counter({async, ReplyFunAndArgs}, InstId, State) -> on_query_async(InstId, get_counter, ReplyFunAndArgs, State). +batch_big_payload(sync, InstId, Batch, State) -> + [Res | _] = lists:map( + fun(Req = {big_payload, _}) -> on_query(InstId, Req, State) end, + Batch + ), + Res; +batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}) -> + lists:foreach( + fun(Req = {big_payload, _}) -> on_query_async(InstId, Req, ReplyFunAndArgs, State) end, + Batch + ), + {ok, Pid}. + on_get_status(_InstId, #{health_check_error := true}) -> disconnected; on_get_status(_InstId, #{pid := Pid}) -> @@ -199,7 +226,11 @@ spawn_counter_process(Name, Register) -> Pid. counter_loop() -> - counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}). + counter_loop(#{ + counter => 0, + status => running, + incorrect_status_count => 0 + }). counter_loop( #{ @@ -213,6 +244,12 @@ counter_loop( block -> ct:pal("counter recv: ~p", [block]), State#{status => blocked}; + {block_now, ReplyFun} -> + ct:pal("counter recv: ~p", [block_now]), + apply_reply( + ReplyFun, {error, {resource_error, #{reason => blocked, msg => blocked}}} + ), + State#{status => blocked}; resume -> {messages, Msgs} = erlang:process_info(self(), messages), ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index d837cbc8c..2d6856acf 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1254,6 +1254,174 @@ t_always_overflow(_Config) -> ), ok. +t_retry_sync_inflight(_Config) -> + ResumeInterval = 1_000, + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 1, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + QueryOpts = #{}, + ?check_trace( + begin + %% now really make the resource go into `blocked' state. + %% this results in a retriable error when sync. + ok = emqx_resource:simple_sync_query(?ID, block), + {{error, {recoverable_error, incorrect_status}}, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), + #{?snk_kind := resource_worker_retry_inflight_failed}, + ResumeInterval * 2 + ), + {ok, {ok, _}} = + ?wait_async_action( + ok = emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := resource_worker_retry_inflight_succeeded}, + ResumeInterval * 3 + ), + ok + end, + [fun ?MODULE:assert_retry_fail_then_succeed_inflight/1] + ), + ok. + +t_retry_sync_inflight_batch(_Config) -> + ResumeInterval = 1_000, + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 200, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + QueryOpts = #{}, + ?check_trace( + begin + %% now really make the resource go into `blocked' state. + %% this results in a retriable error when sync. + ok = emqx_resource:simple_sync_query(?ID, block), + {{error, {recoverable_error, incorrect_status}}, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), + #{?snk_kind := resource_worker_retry_inflight_failed}, + ResumeInterval * 2 + ), + {ok, {ok, _}} = + ?wait_async_action( + ok = emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := resource_worker_retry_inflight_succeeded}, + ResumeInterval * 3 + ), + ok + end, + [fun ?MODULE:assert_retry_fail_then_succeed_inflight/1] + ), + ok. + +t_dont_retry_async_inflight(_Config) -> + ResumeInterval = 1_000, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + QueryOpts = #{}, + ?check_trace( + begin + %% block, + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, block_now), + #{?snk_kind := resource_worker_enter_blocked}, + ResumeInterval * 2 + ), + + %% then send an async request; that shouldn't be retriable. + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), + #{?snk_kind := resource_worker_flush_ack}, + ResumeInterval * 2 + ), + + %% will re-enter running because the single request is not retriable + {ok, _} = ?block_until( + #{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2 + ), + ok + end, + [fun ?MODULE:assert_no_retry_inflight/1] + ), + ok. + +t_dont_retry_async_inflight_batch(_Config) -> + ResumeInterval = 1_000, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 200, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + QueryOpts = #{}, + ?check_trace( + begin + %% block, + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, block_now), + #{?snk_kind := resource_worker_enter_blocked}, + ResumeInterval * 2 + ), + + %% then send an async request; that shouldn't be retriable. + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), + #{?snk_kind := resource_worker_flush_ack}, + ResumeInterval * 2 + ), + + %% will re-enter running because the single request is not retriable + {ok, _} = ?block_until( + #{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2 + ), + ok + end, + [fun ?MODULE:assert_no_retry_inflight/1] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -1317,3 +1485,27 @@ tap_metrics(Line) -> {ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID), ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]), #{counters => C, gauges => G}. + +assert_no_retry_inflight(Trace) -> + ?assertEqual([], ?of_kind(resource_worker_retry_inflight_failed, Trace)), + ?assertEqual([], ?of_kind(resource_worker_retry_inflight_succeeded, Trace)), + ok. + +assert_retry_fail_then_succeed_inflight(Trace) -> + ?assert( + ?strict_causality( + #{?snk_kind := resource_worker_flush_nack, ref := _Ref}, + #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + Trace + ) + ), + %% not strict causality because it might retry more than once + %% before restoring the resource health. + ?assert( + ?causality( + #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref}, + Trace + ) + ), + ok.