@@ -183,7 +183,7 @@ def test_fallback_on_startup(self):
183183 self .start_server (name = "server2" ) as server2 ,
184184 self .start_client () as client ,
185185 ):
186- self .assert_ads_connections (
186+ self .check_ads_connections_statuses (
187187 client = client ,
188188 primary_status = channelz_pb2 .ChannelConnectivityState .TRANSIENT_FAILURE ,
189189 fallback_status = channelz_pb2 .ChannelConnectivityState .TRANSIENT_FAILURE ,
@@ -227,24 +227,20 @@ def test_fallback_on_startup(self):
227227 and stats .rpcs_by_peer ["server1" ] > 0 ,
228228 )
229229 retryer (client .get_stats , 10 )
230- self .assert_ads_connections (
230+ self .check_ads_connections_statuses (
231231 client = client ,
232232 primary_status = channelz_pb2 .ChannelConnectivityState .TRANSIENT_FAILURE ,
233233 fallback_status = None ,
234234 )
235235 # Primary control plane down, cached value is used
236- stats = client .get_stats (5 )
237- self .assertEqual (stats .num_failures , 0 )
238- self .assertEqual (stats .rpcs_by_peer ["server1" ], 5 )
239- self .assert_ads_connections (
236+ self .wait_for_given_server_to_receive_rpcs (client , "server1" )
237+ self .check_ads_connections_statuses (
240238 client = client ,
241239 primary_status = channelz_pb2 .ChannelConnectivityState .TRANSIENT_FAILURE ,
242240 fallback_status = None ,
243241 )
244242 # Fallback control plane down, cached value is used
245- stats = client .get_stats (5 )
246- self .assertEqual (stats .num_failures , 0 )
247- self .assertEqual (stats .rpcs_by_peer ["server1" ], 5 )
243+ self .wait_for_given_server_to_receive_rpcs (client , "server1" )
248244
249245 def test_fallback_mid_startup (self ):
250246 # Run the mesh, excluding the client
@@ -269,31 +265,27 @@ def test_fallback_mid_startup(self):
269265 )
270266 # Run client
271267 with self .start_client () as client :
272- self .assert_ads_connections (
268+ self .check_ads_connections_statuses (
273269 client ,
274270 primary_status = channelz_pb2 .ChannelConnectivityState .TRANSIENT_FAILURE ,
275271 fallback_status = channelz_pb2 .ChannelConnectivityState .READY ,
276272 )
277273 # Secondary xDS config start, send traffic to server2
278- stats = client .get_stats (5 )
279- self .assertEqual (stats .num_failures , 0 )
280- self .assertGreater (stats .rpcs_by_peer ["server2" ], 0 )
281- self .assertNotIn ("server1" , stats .rpcs_by_peer )
274+ self .wait_for_given_server_to_receive_rpcs (client , "server2" )
282275 # Rerun primary control plane
283276 with self .start_control_plane (
284277 "primary_xds_config_run_2" ,
285278 port = self .primary_port ,
286279 upstream_port = server1 .port ,
287280 ):
288- self .assert_ads_connections (
281+ self .check_ads_connections_statuses (
289282 client ,
290283 primary_status = channelz_pb2 .ChannelConnectivityState .READY ,
291284 fallback_status = None ,
292285 )
293- stats = client .get_stats (10 )
294- self .assertEqual (stats .num_failures , 0 )
295- self .assertIn ("server1" , stats .rpcs_by_peer )
296- self .assertGreater (stats .rpcs_by_peer ["server1" ], 0 )
286+ self .wait_for_given_server_to_receive_rpcs (
287+ client , "server1"
288+ )
297289
298290 def test_fallback_mid_update (self ):
299291 with (
@@ -318,8 +310,7 @@ def test_fallback_mid_update(self):
318310 fallback_status = None ,
319311 )
320312 # Secondary xDS config start, send traffic to server2
321- stats = client .get_stats (5 )
322- self .assertGreater (stats .rpcs_by_peer ["server1" ], 0 )
313+ self .wait_for_given_server_to_receive_rpcs (client , "server1" )
323314 primary .stop_on_resource_request (
324315 "type.googleapis.com/envoy.config.cluster.v3.Cluster" ,
325316 "test_cluster_2" ,
@@ -337,13 +328,7 @@ def test_fallback_mid_update(self):
337328 primary_status = channelz_pb2 .ChannelConnectivityState .TRANSIENT_FAILURE ,
338329 fallback_status = channelz_pb2 .ChannelConnectivityState .READY ,
339330 )
340- retryer = retryers .constant_retryer (
341- wait_fixed = datetime .timedelta (seconds = 1 ),
342- timeout = datetime .timedelta (seconds = 20 ),
343- check_result = lambda stats : stats .num_failures == 0
344- and "server2" in stats .rpcs_by_peer ,
345- )
346- retryer (client .get_stats , 10 )
331+ self .wait_for_given_server_to_receive_rpcs (client , "server2" )
347332 # Check that post-recovery uses a new config
348333 with self .start_control_plane (
349334 name = "primary_xds_config_run_2" ,
@@ -355,13 +340,17 @@ def test_fallback_mid_update(self):
355340 primary_status = channelz_pb2 .ChannelConnectivityState .READY ,
356341 fallback_status = None ,
357342 )
358- retryer = retryers .constant_retryer (
359- wait_fixed = datetime .timedelta (seconds = 1 ),
360- timeout = datetime .timedelta (seconds = 20 ),
361- check_result = lambda stats : stats .num_failures == 0
362- and "server3" in stats .rpcs_by_peer ,
363- )
364- retryer (client .get_stats , 10 )
343+ self .wait_for_given_server_to_receive_rpcs (client , "server3" )
344+
345+ def wait_for_given_server_to_receive_rpcs (self , client , server_name ):
346+ retryer = retryers .constant_retryer (
347+ wait_fixed = datetime .timedelta (seconds = 1 ),
348+ timeout = datetime .timedelta (seconds = 60 ),
349+ check_result = lambda stats : stats .num_failures == 0
350+ and server_name in stats .rpcs_by_peer
351+ and len (stats .rpcs_by_peer ) == 1 ,
352+ )
353+ retryer (client .get_stats , 10 )
365354
366355 def check_ads_connections_statuses (
367356 self , client , primary_status , fallback_status
0 commit comments