From 21fbac7a66e081b2cb0d109eab14c97e2ed11679 Mon Sep 17 00:00:00 2001 From: Dmitry Balabka Date: Wed, 9 Jul 2025 13:55:52 +0300 Subject: [PATCH 1/2] Use internal ip addresses for worker-to-scheduler communication --- dask_cloudprovider/aws/ec2.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dask_cloudprovider/aws/ec2.py b/dask_cloudprovider/aws/ec2.py index 8413865e..b249f800 100644 --- a/dask_cloudprovider/aws/ec2.py +++ b/dask_cloudprovider/aws/ec2.py @@ -80,6 +80,8 @@ def __init__( self.iam_instance_profile = iam_instance_profile self.instance_tags = instance_tags self.volume_tags = volume_tags + # TODO: we have to use self.config.get("public_ingress", True) + # instead for consistency with Azure and AWS providers self.use_private_ip = use_private_ip self.enable_detailed_monitoring = enable_detailed_monitoring self.spot = spot @@ -223,7 +225,11 @@ async def configure_vm(self, client): await asyncio.sleep(min(backoff, 10) + backoff % 1) # Exponential backoff with a cap of 10 seconds and some jitter backoff = backoff * 2 - return self.instance[ip_address_key], None + + if self.use_private_ip: + return self.instance["PrivateIpAddress"], None + + return self.instance["PrivateIpAddress"], self.instance["PublicIpAddress"] async def destroy_vm(self): boto_config = botocore.config.Config(retries=dict(max_attempts=10)) From 3d13138a10c723c9c008e68c8d5ffa589bafac3c Mon Sep 17 00:00:00 2001 From: Dmitry Balabka Date: Wed, 9 Jul 2025 17:40:21 +0300 Subject: [PATCH 2/2] Use internal ip for scheduler contact address --- dask_cloudprovider/generic/vmcluster.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask_cloudprovider/generic/vmcluster.py b/dask_cloudprovider/generic/vmcluster.py index d2f1eb65..84fafeb2 100644 --- a/dask_cloudprovider/generic/vmcluster.py +++ b/dask_cloudprovider/generic/vmcluster.py @@ -99,6 +99,8 @@ async def start(self): f"{self.cluster.protocol}://{external_ip}:{self.port}" ) + self.contact_address = f"{self.cluster.protocol}://{internal_ip}:{self.port}" + await self.wait_for_scheduler() await super().start()