diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index a8ba91175..f68c1391f 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -524,6 +524,29 @@ install_telemetry_handler(TestCase) -> end), Tid. +wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> + Events = receive_all_events(GaugeName, Timeout), + case lists:last(Events) of + #{measurements := #{gauge_set := ExpectedValue}} -> + ok; + #{measurements := #{gauge_set := Value}} -> + ct:fail( + "gauge ~p didn't reach expected value ~p; last value: ~p", + [GaugeName, ExpectedValue, Value] + ) + end. + +receive_all_events(EventName, Timeout) -> + receive_all_events(EventName, Timeout, []). + +receive_all_events(EventName, Timeout, Acc) -> + receive + {telemetry, #{name := [_, _, EventName]} = Event} -> + receive_all_events(EventName, Timeout, [Event | Acc]) + after Timeout -> + lists:reverse(Acc) + end. + wait_telemetry_event(TelemetryTable, EventName, ResourceId) -> wait_telemetry_event(TelemetryTable, EventName, ResourceId, #{timeout => 5_000, n_events => 1}). @@ -803,6 +826,8 @@ t_publish_success_batch(Config) -> ResourceId, #{timeout => 15_000, n_events => NumMessages} ), + wait_until_gauge_is(queuing, 0, _Timeout = 400), + wait_until_gauge_is(inflight, 0, _Timeout = 400), assert_metrics( #{ batching => 0,