@@ -543,7 +543,7 @@ async def _run_python_registry_publish_pipeline(context: PublishConnectorContext
543
543
return results , False
544
544
545
545
546
- async def run_connector_rollback_pipeline (context : PublishConnectorContext ) -> ConnectorReport :
546
+ async def run_connector_rollback_pipeline (context : PublishConnectorContext , semaphore : anyio . Semaphore ) -> ConnectorReport :
547
547
"""Run a rollback pipeline for a single connector.
548
548
549
549
1. Check if the current metadata is a release candidate
@@ -554,19 +554,22 @@ async def run_connector_rollback_pipeline(context: PublishConnectorContext) -> C
554
554
"""
555
555
556
556
results = []
557
- async with context :
558
- assert context .rollout_mode == RolloutMode .ROLLBACK , "This pipeline can only run in rollback mode."
559
- assert context .connector .metadata .get ("releases" , {}).get (
560
- "isReleaseCandidate" , True
561
- ), "This pipeline can only run for release candidates."
562
- results .append (
563
- await MetadataRollbackReleaseCandidate (context , context .metadata_bucket_name , context .metadata_service_gcs_credentials ).run ()
564
- )
557
+ async with semaphore :
558
+ async with context :
559
+ assert context .rollout_mode == RolloutMode .ROLLBACK , "This pipeline can only run in rollback mode."
560
+ assert context .connector .metadata .get ("releases" , {}).get (
561
+ "isReleaseCandidate" , True
562
+ ), "This pipeline can only run for release candidates."
563
+ results .append (
564
+ await MetadataRollbackReleaseCandidate (
565
+ context , context .metadata_bucket_name , context .metadata_service_gcs_credentials
566
+ ).run ()
567
+ )
565
568
566
569
return ConnectorReport (context , results , name = "ROLLBACK RESULTS" )
567
570
568
571
569
- async def run_connector_promote_pipeline (context : PublishConnectorContext ) -> ConnectorReport :
572
+ async def run_connector_promote_pipeline (context : PublishConnectorContext , semaphore : anyio . Semaphore ) -> ConnectorReport :
570
573
"""Run a promote pipeline for a single connector.
571
574
572
575
1. Publish the release candidate version docker image with the latest tag.
@@ -576,19 +579,20 @@ async def run_connector_promote_pipeline(context: PublishConnectorContext) -> Co
576
579
ConnectorReport: The reports holding promote results.
577
580
"""
578
581
results = []
579
- async with context :
580
- assert context .rollout_mode == RolloutMode .PROMOTE , "This pipeline can only run in promote mode."
581
- assert context .connector .metadata .get ("releases" , {}).get (
582
- "isReleaseCandidate" , True
583
- ), "This pipeline can only run for release candidates."
584
- metadata_promote_result = await MetadataPromoteReleaseCandidate (
585
- context , context .metadata_bucket_name , context .metadata_service_gcs_credentials
586
- ).run ()
587
- results .append (metadata_promote_result )
588
- if metadata_promote_result .status is StepStatus .FAILURE :
589
- return ConnectorReport (context , results , name = "PROMOTE RESULTS" )
590
- publish_latest_tag_results = await PushVersionImageAsLatest (context ).run ()
591
- results .append (publish_latest_tag_results )
582
+ async with semaphore :
583
+ async with context :
584
+ assert context .rollout_mode == RolloutMode .PROMOTE , "This pipeline can only run in promote mode."
585
+ assert context .connector .metadata .get ("releases" , {}).get (
586
+ "isReleaseCandidate" , True
587
+ ), "This pipeline can only run for release candidates."
588
+ metadata_promote_result = await MetadataPromoteReleaseCandidate (
589
+ context , context .metadata_bucket_name , context .metadata_service_gcs_credentials
590
+ ).run ()
591
+ results .append (metadata_promote_result )
592
+ if metadata_promote_result .status is StepStatus .FAILURE :
593
+ return ConnectorReport (context , results , name = "PROMOTE RESULTS" )
594
+ publish_latest_tag_results = await PushVersionImageAsLatest (context ).run ()
595
+ results .append (publish_latest_tag_results )
592
596
return ConnectorReport (context , results , name = "PROMOTE RESULTS" )
593
597
594
598
0 commit comments