From b28898a2b80d3fc58b1ec26e4c199e1d820514bd Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 13 Oct 2025 14:56:34 +0545 Subject: [PATCH 01/18] Add SGLang Router Min Support --- src/dstack/_internal/core/backends/base/compute.py | 7 ++++++- src/dstack/_internal/core/models/gateways.py | 2 ++ .../_internal/proxy/gateway/resources/nginx/service.jinja2 | 2 +- src/dstack/_internal/proxy/gateway/routers/registry.py | 4 ++++ src/dstack/_internal/proxy/gateway/schemas/registry.py | 1 + src/dstack/_internal/proxy/gateway/services/nginx.py | 1 + src/dstack/_internal/proxy/gateway/services/registry.py | 3 +++ src/dstack/_internal/proxy/lib/models.py | 1 + src/dstack/_internal/server/services/gateways/client.py | 2 ++ src/dstack/_internal/server/services/services/__init__.py | 7 +++++++ 10 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 6a1f6af4a..931b097aa 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -979,7 +979,12 @@ def get_dstack_gateway_wheel(build: str) -> str: r.raise_for_status() build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) - return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + # For testing + logger.debug( + "Using test gateway wheel: https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" + ) + return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" def get_dstack_gateway_commands() -> List[str]: diff --git a/src/dstack/_internal/core/models/gateways.py b/src/dstack/_internal/core/models/gateways.py index 6a480b580..d7a7fdd2c 100644 --- a/src/dstack/_internal/core/models/gateways.py +++ b/src/dstack/_internal/core/models/gateways.py @@ -44,12 +44,14 @@ class GatewayCertificate(CoreModel): ] +# https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 class GatewayConfiguration(CoreModel): type: Literal["gateway"] = "gateway" name: Annotated[Optional[str], Field(description="The gateway name")] = None default: Annotated[bool, Field(description="Make the gateway default")] = False backend: Annotated[BackendType, Field(description="The gateway backend")] region: Annotated[str, Field(description="The gateway region")] + router: Annotated[Optional[str], Field(description="The router type, e.g. `sglang`")] = None domain: Annotated[ Optional[str], Field(description="The gateway domain, e.g. `example.com`") ] = None diff --git a/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 b/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 index b096fa80e..5da4b6282 100644 --- a/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 +++ b/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 @@ -5,7 +5,7 @@ limit_req_zone {{ zone.key }} zone={{ zone.name }}:10m rate={{ zone.rpm }}r/m; {% if replicas %} upstream {{ domain }}.upstream { {% for replica in replicas %} - server unix:{{ replica.socket }}; # replica {{ replica.id }} + server unix:{{ replica.socket }}; # replica {{ replica.id }} router={{ router }} {% endfor %} } {% else %} diff --git a/src/dstack/_internal/proxy/gateway/routers/registry.py b/src/dstack/_internal/proxy/gateway/routers/registry.py index e1bfa4ff2..5d0aa2953 100644 --- a/src/dstack/_internal/proxy/gateway/routers/registry.py +++ b/src/dstack/_internal/proxy/gateway/routers/registry.py @@ -13,8 +13,10 @@ from dstack._internal.proxy.gateway.services.nginx import Nginx from dstack._internal.proxy.lib.deps import get_service_connection_pool from dstack._internal.proxy.lib.services.service_connection import ServiceConnectionPool +from dstack._internal.utils.logging import get_logger router = APIRouter(prefix="/{project_name}") +logger = get_logger(__name__) @router.post("/services/register") @@ -25,6 +27,7 @@ async def register_service( nginx: Annotated[Nginx, Depends(get_nginx)], service_conn_pool: Annotated[ServiceConnectionPool, Depends(get_service_connection_pool)], ) -> OkResponse: + logger.debug(f"[SglangRouterTesting] Gateway API Reception Router: {body.router}") await registry_services.register_service( project_name=project_name.lower(), run_name=body.run_name.lower(), @@ -36,6 +39,7 @@ async def register_service( model=body.options.openai.model if body.options.openai is not None else None, ssh_private_key=body.ssh_private_key, repo=repo, + router=body.router, nginx=nginx, service_conn_pool=service_conn_pool, ) diff --git a/src/dstack/_internal/proxy/gateway/schemas/registry.py b/src/dstack/_internal/proxy/gateway/schemas/registry.py index 8ab69b6af..117152a95 100644 --- a/src/dstack/_internal/proxy/gateway/schemas/registry.py +++ b/src/dstack/_internal/proxy/gateway/schemas/registry.py @@ -44,6 +44,7 @@ class RegisterServiceRequest(BaseModel): options: Options ssh_private_key: str rate_limits: tuple[RateLimit, ...] = () + router: Optional[str] = None class RegisterReplicaRequest(BaseModel): diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 2d3e755ac..f51c94a02 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -64,6 +64,7 @@ class ServiceConfig(SiteConfig): limit_req_zones: list[LimitReqZoneConfig] locations: list[LocationConfig] replicas: list[ReplicaConfig] + router: Optional[str] = None class ModelEntrypointConfig(SiteConfig): diff --git a/src/dstack/_internal/proxy/gateway/services/registry.py b/src/dstack/_internal/proxy/gateway/services/registry.py index 3ea412d79..ce7cbca2e 100644 --- a/src/dstack/_internal/proxy/gateway/services/registry.py +++ b/src/dstack/_internal/proxy/gateway/services/registry.py @@ -44,6 +44,7 @@ async def register_service( repo: GatewayProxyRepo, nginx: Nginx, service_conn_pool: ServiceConnectionPool, + router: Optional[str] = None, ) -> None: service = models.Service( project_name=project_name, @@ -54,6 +55,7 @@ async def register_service( auth=auth, client_max_body_size=client_max_body_size, replicas=(), + router=router, ) async with lock: @@ -335,6 +337,7 @@ async def get_nginx_service_config( limit_req_zones=limit_req_zones, locations=locations, replicas=sorted(replicas, key=lambda r: r.id), # sort for reproducible configs + router=service.router, ) diff --git a/src/dstack/_internal/proxy/lib/models.py b/src/dstack/_internal/proxy/lib/models.py index 5cb5471d8..4e7046167 100644 --- a/src/dstack/_internal/proxy/lib/models.py +++ b/src/dstack/_internal/proxy/lib/models.py @@ -57,6 +57,7 @@ class Service(ImmutableModel): client_max_body_size: int # only enforced on gateways strip_prefix: bool = True # only used in-server replicas: tuple[Replica, ...] + router: Optional[str] = None @property def domain_safe(self) -> str: diff --git a/src/dstack/_internal/server/services/gateways/client.py b/src/dstack/_internal/server/services/gateways/client.py index f8c090079..33bfee3f5 100644 --- a/src/dstack/_internal/server/services/gateways/client.py +++ b/src/dstack/_internal/server/services/gateways/client.py @@ -45,6 +45,7 @@ async def register_service( options: dict, rate_limits: list[RateLimit], ssh_private_key: str, + router: Optional[str] = None, ): if "openai" in options: entrypoint = f"gateway.{domain.split('.', maxsplit=1)[1]}" @@ -59,6 +60,7 @@ async def register_service( "options": options, "rate_limits": [limit.dict() for limit in rate_limits], "ssh_private_key": ssh_private_key, + "router": router, } resp = await self._client.post( self._url(f"/api/registry/{project}/services/register"), json=payload diff --git a/src/dstack/_internal/server/services/services/__init__.py b/src/dstack/_internal/server/services/services/__init__.py index a8089a93a..e6f2b19aa 100644 --- a/src/dstack/_internal/server/services/services/__init__.py +++ b/src/dstack/_internal/server/services/services/__init__.py @@ -82,6 +82,11 @@ async def _register_service_in_gateway( gateway_configuration = get_gateway_configuration(gateway) service_https = _get_service_https(run_spec, gateway_configuration) + router = gateway_configuration.router + logger.debug(f"[SglangRouterTesting] Configuration parsing: {router}") + logger.debug( + f"[SglangRouterTesting] Configuration parsing dict: {gateway_configuration.dict()}" + ) service_protocol = "https" if service_https else "http" if service_https and gateway_configuration.certificate is None: @@ -107,6 +112,7 @@ async def _register_service_in_gateway( conn = await get_or_add_gateway_connection(session, gateway.id) try: logger.debug("%s: registering service as %s", fmt(run_model), service_spec.url) + logger.debug(f"[SglangRouterTesting] Service Registration Router: {router}") async with conn.client() as client: await client.register_service( project=run_model.project.name, @@ -119,6 +125,7 @@ async def _register_service_in_gateway( options=service_spec.options, rate_limits=run_spec.configuration.rate_limits, ssh_private_key=run_model.project.ssh_private_key, + router=router, ) logger.info("%s: service is registered as %s", fmt(run_model), service_spec.url) except SSHError: From 82bae8ed8f407c1466911adf7f1505fb7a72c58e Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 06:53:47 +0545 Subject: [PATCH 02/18] Add Test Log to check Registration conf --- gateway/pyproject.toml | 2 +- .../_internal/core/backends/base/compute.py | 3 ++ .../_internal/proxy/gateway/services/nginx.py | 40 ++++++++++++++++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/gateway/pyproject.toml b/gateway/pyproject.toml index a67171c25..ee7659955 100644 --- a/gateway/pyproject.toml +++ b/gateway/pyproject.toml @@ -11,7 +11,7 @@ requires-python = ">=3.10" dynamic = ["version"] dependencies = [ # release builds of dstack-gateway depend on a PyPI version of dstack instead - "dstack[gateway] @ git+https://github.com/dstackai/dstack.git@master", + "dstack[gateway] @ git+https://github.com/Bihan/dstack.git@add_sglang_router_minimal_support", ] [tool.setuptools.package-data] diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 931b097aa..1fd94e04a 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -840,6 +840,7 @@ def get_gateway_user_data(authorized_key: str) -> str: packages=[ "nginx", "python3.10-venv", + "python3-pip", # Add pip for sglang-router installation ], snap={"commands": [["install", "--classic", "certbot"]]}, runcmd=[ @@ -850,6 +851,8 @@ def get_gateway_user_data(authorized_key: str) -> str: "s/# server_names_hash_bucket_size 64;/server_names_hash_bucket_size 128;/", "/etc/nginx/nginx.conf", ], + # Install sglang-router system-wide. Can be conditionally installed in the future. + ["pip", "install", "sglang-router"], ["su", "ubuntu", "-c", " && ".join(get_dstack_gateway_commands())], ], ssh_authorized_keys=[authorized_key], diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index f51c94a02..30f5c232a 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -82,7 +82,7 @@ def __init__(self, conf_dir: Path = Path("/etc/nginx/sites-enabled")) -> None: async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: logger.debug("Registering %s domain %s", conf.type, conf.domain) conf_name = self.get_config_name(conf.domain) - + logger.debug(f"[SglangRouterTesting] Register Conf object dict: {conf.dict()}") async with self._lock: if conf.https: await run_async(self.run_certbot, conf.domain, acme) @@ -107,6 +107,44 @@ def reload() -> None: if r.returncode != 0: raise UnexpectedProxyError("Failed to reload nginx") + @staticmethod + def start_sglang_router(replicas: int) -> None: + """Start sglang-router service, killing existing one if running.""" + try: + # Kill existing sglang-router if running + result = subprocess.run( + ["pgrep", "-f", "sglang_router.launch_router"], capture_output=True, timeout=5 + ) + if result.returncode == 0: + logger.info("Killing existing sglang-router...") + subprocess.run(["pkill", "-f", "sglang_router.launch_router"], timeout=5) + # Wait a moment for the process to terminate + import time + + time.sleep(1) + + # Generate worker URLs based on replica count + worker_urls = [] + for i in range(1, replicas + 1): + worker_urls.append(f"http://127.0.0.1:{10000 + i}") + + # Start sglang-router with system-wide installation + logger.info(f"Starting sglang-router with {replicas} replicas...") + cmd = ( + [ + "python3", + "-m", + "sglang_router.launch_router", # Use system python3 + "--worker-urls", + ] + + worker_urls + + ["--host", "0.0.0.0", "--port", "3000"] + ) + subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + except Exception as e: + logger.error(f"Failed to start sglang-router: {e}") + def write_conf(self, conf: str, conf_name: str) -> None: """Update config and reload nginx. Rollback changes on error.""" conf_path = self._conf_dir / conf_name From 27c5204af9b502c14ccf01de4463807c13bd745c Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 08:24:12 +0545 Subject: [PATCH 03/18] Add start sglang-router --- src/dstack/_internal/proxy/gateway/services/nginx.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 30f5c232a..0b1fa1144 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -87,6 +87,13 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: if conf.https: await run_async(self.run_certbot, conf.domain, acme) await run_async(self.write_conf, conf.render(), conf_name) + # Start sglang-router if router is sglang + if hasattr(conf, "router") and conf.router == "sglang": + replicas = len(conf.replicas) if hasattr(conf, "replicas") and conf.replicas else 1 + logger.debug( + f"[SglangRouterTesting] Starting sglang-router with {replicas} replicas" + ) + await run_async(self.start_sglang_router, replicas) logger.info("Registered %s domain %s", conf.type, conf.domain) From b2f10936ebeacb904c7a479f4c5832bb2cec5dbc Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 13:48:49 +0545 Subject: [PATCH 04/18] Add sglang_workers jinga template --- .../resources/nginx/sglang_workers.jinja2 | 23 +++++++++++++++++++ .../_internal/proxy/gateway/services/nginx.py | 16 +++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 diff --git a/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 b/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 new file mode 100644 index 000000000..a6d612d36 --- /dev/null +++ b/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 @@ -0,0 +1,23 @@ +{% for replica in replicas %} +# Worker {{ loop.index }} +upstream sglang_worker_{{ loop.index }}_upstream { + server unix:{{ replica.socket }}; +} + +server { + listen 127.0.0.1:{{ 10000 + loop.index }}; + access_log off; # disable access logs for this internal endpoint + + proxy_read_timeout 300s; + proxy_send_timeout 300s; + + location / { + proxy_pass http://sglang_worker_{{ loop.index }}_upstream; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header Connection ""; + proxy_set_header Upgrade $http_upgrade; + } +} +{% endfor %} diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 0b1fa1144..78b2b832e 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -93,6 +93,7 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: logger.debug( f"[SglangRouterTesting] Starting sglang-router with {replicas} replicas" ) + await run_async(self.write_sglang_workers_conf, conf) await run_async(self.start_sglang_router, replicas) logger.info("Registered %s domain %s", conf.type, conf.domain) @@ -214,6 +215,21 @@ def write_global_conf(self) -> None: conf = read_package_resource("00-log-format.conf") self.write_conf(conf, "00-log-format.conf") + def write_sglang_workers_conf(self, conf: SiteConfig) -> None: + workers_config = generate_sglang_workers_config(conf) + workers_conf_name = f"sglang-workers.{conf.domain}.conf" + workers_conf_path = self._conf_dir / workers_conf_name + sudo_write(workers_conf_path, workers_config) + self.reload() + + +def generate_sglang_workers_config(conf: SiteConfig) -> str: + template = read_package_resource("sglang_workers.jinja2") + return jinja2.Template(template).render( + replicas=conf.replicas, + proxy_port=PROXY_PORT_ON_GATEWAY, + ) + def read_package_resource(file: str) -> str: return ( From 3871f05ce3ee43e3f52a531541deee8d7a33abf0 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 15:09:34 +0545 Subject: [PATCH 05/18] Modify service.jinja2 upstream block --- .../_internal/proxy/gateway/resources/nginx/service.jinja2 | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 b/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 index 5da4b6282..8c50db11e 100644 --- a/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 +++ b/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 @@ -4,9 +4,13 @@ limit_req_zone {{ zone.key }} zone={{ zone.name }}:10m rate={{ zone.rpm }}r/m; {% if replicas %} upstream {{ domain }}.upstream { + {% if router == "sglang" %} + server 127.0.0.1:3000; # SGLang router on the gateway + {% else %} {% for replica in replicas %} - server unix:{{ replica.socket }}; # replica {{ replica.id }} router={{ router }} + server unix:{{ replica.socket }}; # replica {{ replica.id }} {% endfor %} + {% endif %} } {% else %} From fa6d992af694ce0c6c7ee58fefd7cc10522d134d Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 15:41:39 +0545 Subject: [PATCH 06/18] Add sglang log file --- src/dstack/_internal/proxy/gateway/services/nginx.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 78b2b832e..66fecdae8 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -146,7 +146,16 @@ def start_sglang_router(replicas: int) -> None: "--worker-urls", ] + worker_urls - + ["--host", "0.0.0.0", "--port", "3000"] + + [ + "--host", + "0.0.0.0", + "--port", + "3000", + "--log-level", + "debug", + "--log-dir", + "./router_logs", + ] ) subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) From 4a47d86fee72cfef8b2b58c12d787d07d6388a95 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 19:36:04 +0545 Subject: [PATCH 07/18] Add sglang router clean up in unregister method --- .../_internal/proxy/gateway/services/nginx.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 66fecdae8..1f0b015fe 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -105,6 +105,10 @@ async def unregister(self, domain: str) -> None: return async with self._lock: await run_async(sudo_rm, conf_path) + workers_conf_path = self._conf_dir / f"sglang-workers.{domain}.conf" + if workers_conf_path.exists(): + await run_async(sudo_rm, workers_conf_path) + await run_async(self.stop_sglang_router) await run_async(self.reload) logger.info("Unregistered domain %s", domain) @@ -162,6 +166,30 @@ def start_sglang_router(replicas: int) -> None: except Exception as e: logger.error(f"Failed to start sglang-router: {e}") + @staticmethod + def stop_sglang_router() -> None: + try: + result = subprocess.run( + ["pgrep", "-f", "sglang_router.launch_router"], capture_output=True, timeout=5 + ) + if result.returncode == 0: + logger.info("Stopping sglang-router process...") + subprocess.run(["pkill", "-f", "sglang_router.launch_router"], timeout=5) + else: + logger.debug("No sglang-router process found to stop") + + log_dir = Path("./router_logs") + if log_dir.exists(): + logger.debug("Cleaning up router logs...") + import shutil + + shutil.rmtree(log_dir, ignore_errors=True) + else: + logger.debug("No router logs directory found to clean up") + + except Exception as e: + logger.error(f"Failed to stop sglang-router: {e}") + def write_conf(self, conf: str, conf_name: str) -> None: """Update config and reload nginx. Rollback changes on error.""" conf_path = self._conf_dir / conf_name From ccae80eb02f6d1fa1c9d1f767fce1062d575468b Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 20:21:36 +0545 Subject: [PATCH 08/18] Add test log to check unregister --- src/dstack/_internal/proxy/gateway/services/nginx.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 1f0b015fe..bf41c1f4b 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -106,6 +106,7 @@ async def unregister(self, domain: str) -> None: async with self._lock: await run_async(sudo_rm, conf_path) workers_conf_path = self._conf_dir / f"sglang-workers.{domain}.conf" + logger.debug(f"[SglangRouterTesting] Workers conf path: {workers_conf_path}") if workers_conf_path.exists(): await run_async(sudo_rm, workers_conf_path) await run_async(self.stop_sglang_router) From 0b7a6a1aab64e1cf1d709eb5679173d6ad96c579 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 15 Oct 2025 13:12:17 +0545 Subject: [PATCH 09/18] Increase sglang router-request-timeout --- src/dstack/_internal/proxy/gateway/services/nginx.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index bf41c1f4b..385e34165 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -147,7 +147,7 @@ def start_sglang_router(replicas: int) -> None: [ "python3", "-m", - "sglang_router.launch_router", # Use system python3 + "sglang_router.launch_router", "--worker-urls", ] + worker_urls @@ -160,6 +160,8 @@ def start_sglang_router(replicas: int) -> None: "debug", "--log-dir", "./router_logs", + "--request-timeout-secs", + "1800", ] ) subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) From b32c8ddff3b5aa941028b20321d65df7761e34a1 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 15 Oct 2025 14:58:03 +0545 Subject: [PATCH 10/18] Change sglang process to sglang::router --- src/dstack/_internal/proxy/gateway/services/nginx.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 385e34165..30b45cf02 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -126,11 +126,11 @@ def start_sglang_router(replicas: int) -> None: try: # Kill existing sglang-router if running result = subprocess.run( - ["pgrep", "-f", "sglang_router.launch_router"], capture_output=True, timeout=5 + ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 ) if result.returncode == 0: logger.info("Killing existing sglang-router...") - subprocess.run(["pkill", "-f", "sglang_router.launch_router"], timeout=5) + subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) # Wait a moment for the process to terminate import time @@ -173,11 +173,11 @@ def start_sglang_router(replicas: int) -> None: def stop_sglang_router() -> None: try: result = subprocess.run( - ["pgrep", "-f", "sglang_router.launch_router"], capture_output=True, timeout=5 + ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 ) if result.returncode == 0: logger.info("Stopping sglang-router process...") - subprocess.run(["pkill", "-f", "sglang_router.launch_router"], timeout=5) + subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) else: logger.debug("No sglang-router process found to stop") From 36e84e66cea9241518f03b850221bbf03c32be07 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 15 Oct 2025 18:33:56 +0545 Subject: [PATCH 11/18] Clean development code --- gateway/pyproject.toml | 2 +- src/dstack/_internal/core/backends/base/compute.py | 7 +------ src/dstack/_internal/core/models/gateways.py | 1 - .../_internal/proxy/gateway/routers/registry.py | 3 --- .../_internal/proxy/gateway/services/nginx.py | 14 +------------- .../_internal/server/services/services/__init__.py | 5 ----- 6 files changed, 3 insertions(+), 29 deletions(-) diff --git a/gateway/pyproject.toml b/gateway/pyproject.toml index ee7659955..a67171c25 100644 --- a/gateway/pyproject.toml +++ b/gateway/pyproject.toml @@ -11,7 +11,7 @@ requires-python = ">=3.10" dynamic = ["version"] dependencies = [ # release builds of dstack-gateway depend on a PyPI version of dstack instead - "dstack[gateway] @ git+https://github.com/Bihan/dstack.git@add_sglang_router_minimal_support", + "dstack[gateway] @ git+https://github.com/dstackai/dstack.git@master", ] [tool.setuptools.package-data] diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 1fd94e04a..ee3e7e10c 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -982,12 +982,7 @@ def get_dstack_gateway_wheel(build: str) -> str: r.raise_for_status() build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) - # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" - # For testing - logger.debug( - "Using test gateway wheel: https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" - ) - return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" + return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" def get_dstack_gateway_commands() -> List[str]: diff --git a/src/dstack/_internal/core/models/gateways.py b/src/dstack/_internal/core/models/gateways.py index d7a7fdd2c..159ada65c 100644 --- a/src/dstack/_internal/core/models/gateways.py +++ b/src/dstack/_internal/core/models/gateways.py @@ -44,7 +44,6 @@ class GatewayCertificate(CoreModel): ] -# https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 class GatewayConfiguration(CoreModel): type: Literal["gateway"] = "gateway" name: Annotated[Optional[str], Field(description="The gateway name")] = None diff --git a/src/dstack/_internal/proxy/gateway/routers/registry.py b/src/dstack/_internal/proxy/gateway/routers/registry.py index 5d0aa2953..dd4f63f32 100644 --- a/src/dstack/_internal/proxy/gateway/routers/registry.py +++ b/src/dstack/_internal/proxy/gateway/routers/registry.py @@ -13,10 +13,8 @@ from dstack._internal.proxy.gateway.services.nginx import Nginx from dstack._internal.proxy.lib.deps import get_service_connection_pool from dstack._internal.proxy.lib.services.service_connection import ServiceConnectionPool -from dstack._internal.utils.logging import get_logger router = APIRouter(prefix="/{project_name}") -logger = get_logger(__name__) @router.post("/services/register") @@ -27,7 +25,6 @@ async def register_service( nginx: Annotated[Nginx, Depends(get_nginx)], service_conn_pool: Annotated[ServiceConnectionPool, Depends(get_service_connection_pool)], ) -> OkResponse: - logger.debug(f"[SglangRouterTesting] Gateway API Reception Router: {body.router}") await registry_services.register_service( project_name=project_name.lower(), run_name=body.run_name.lower(), diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 30b45cf02..83cf22dc1 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -82,17 +82,12 @@ def __init__(self, conf_dir: Path = Path("/etc/nginx/sites-enabled")) -> None: async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: logger.debug("Registering %s domain %s", conf.type, conf.domain) conf_name = self.get_config_name(conf.domain) - logger.debug(f"[SglangRouterTesting] Register Conf object dict: {conf.dict()}") async with self._lock: if conf.https: await run_async(self.run_certbot, conf.domain, acme) await run_async(self.write_conf, conf.render(), conf_name) - # Start sglang-router if router is sglang if hasattr(conf, "router") and conf.router == "sglang": replicas = len(conf.replicas) if hasattr(conf, "replicas") and conf.replicas else 1 - logger.debug( - f"[SglangRouterTesting] Starting sglang-router with {replicas} replicas" - ) await run_async(self.write_sglang_workers_conf, conf) await run_async(self.start_sglang_router, replicas) @@ -106,7 +101,6 @@ async def unregister(self, domain: str) -> None: async with self._lock: await run_async(sudo_rm, conf_path) workers_conf_path = self._conf_dir / f"sglang-workers.{domain}.conf" - logger.debug(f"[SglangRouterTesting] Workers conf path: {workers_conf_path}") if workers_conf_path.exists(): await run_async(sudo_rm, workers_conf_path) await run_async(self.stop_sglang_router) @@ -122,26 +116,20 @@ def reload() -> None: @staticmethod def start_sglang_router(replicas: int) -> None: - """Start sglang-router service, killing existing one if running.""" try: - # Kill existing sglang-router if running result = subprocess.run( ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 ) if result.returncode == 0: - logger.info("Killing existing sglang-router...") + logger.info("Stopping existing sglang-router...") subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) - # Wait a moment for the process to terminate import time time.sleep(1) - - # Generate worker URLs based on replica count worker_urls = [] for i in range(1, replicas + 1): worker_urls.append(f"http://127.0.0.1:{10000 + i}") - # Start sglang-router with system-wide installation logger.info(f"Starting sglang-router with {replicas} replicas...") cmd = ( [ diff --git a/src/dstack/_internal/server/services/services/__init__.py b/src/dstack/_internal/server/services/services/__init__.py index e6f2b19aa..05c1fa909 100644 --- a/src/dstack/_internal/server/services/services/__init__.py +++ b/src/dstack/_internal/server/services/services/__init__.py @@ -83,10 +83,6 @@ async def _register_service_in_gateway( gateway_configuration = get_gateway_configuration(gateway) service_https = _get_service_https(run_spec, gateway_configuration) router = gateway_configuration.router - logger.debug(f"[SglangRouterTesting] Configuration parsing: {router}") - logger.debug( - f"[SglangRouterTesting] Configuration parsing dict: {gateway_configuration.dict()}" - ) service_protocol = "https" if service_https else "http" if service_https and gateway_configuration.certificate is None: @@ -112,7 +108,6 @@ async def _register_service_in_gateway( conn = await get_or_add_gateway_connection(session, gateway.id) try: logger.debug("%s: registering service as %s", fmt(run_model), service_spec.url) - logger.debug(f"[SglangRouterTesting] Service Registration Router: {router}") async with conn.client() as client: await client.register_service( project=run_model.project.name, From 25aacca923595186591d9d14ac9d999d5db6ad28 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 20 Oct 2025 18:21:20 +0545 Subject: [PATCH 12/18] Test is_sglang_router_running --- .../_internal/core/backends/base/compute.py | 3 +- .../_internal/proxy/gateway/services/nginx.py | 35 ++++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index ee3e7e10c..c670b980e 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -982,7 +982,8 @@ def get_dstack_gateway_wheel(build: str) -> str: r.raise_for_status() build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) - return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" def get_dstack_gateway_commands() -> List[str]: diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 83cf22dc1..a38fef06b 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -89,7 +89,7 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: if hasattr(conf, "router") and conf.router == "sglang": replicas = len(conf.replicas) if hasattr(conf, "replicas") and conf.replicas else 1 await run_async(self.write_sglang_workers_conf, conf) - await run_async(self.start_sglang_router, replicas) + await run_async(self.start_or_update_sglang_router, replicas) logger.info("Registered %s domain %s", conf.type, conf.domain) @@ -114,18 +114,31 @@ def reload() -> None: if r.returncode != 0: raise UnexpectedProxyError("Failed to reload nginx") + @staticmethod + def start_or_update_sglang_router(replicas: int) -> None: + """Start the sglang router if not running; otherwise update workers via HTTP API.""" + # Start router (without workers) if it's not running, then sync workers + if not Nginx.is_sglang_router_running(): + logger.info("Sglang router not running, starting with %d replicas", replicas) + Nginx.start_sglang_router(replicas) + + @staticmethod + def is_sglang_router_running() -> bool: + result = subprocess.run(["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5) + return result.returncode == 0 + @staticmethod def start_sglang_router(replicas: int) -> None: try: - result = subprocess.run( - ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 - ) - if result.returncode == 0: - logger.info("Stopping existing sglang-router...") - subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) - import time - - time.sleep(1) + # result = subprocess.run( + # ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 + # ) + # if result.returncode == 0: + # logger.info("Stopping existing sglang-router...") + # subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) + # import time + + # time.sleep(1) worker_urls = [] for i in range(1, replicas + 1): worker_urls.append(f"http://127.0.0.1:{10000 + i}") @@ -148,8 +161,6 @@ def start_sglang_router(replicas: int) -> None: "debug", "--log-dir", "./router_logs", - "--request-timeout-secs", - "1800", ] ) subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) From f08f0fad5169d0f90426962663c1ec338e2b93fd Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 20 Oct 2025 18:54:12 +0545 Subject: [PATCH 13/18] Add HTTP add worker endpoint --- .../_internal/core/backends/base/compute.py | 1 + .../_internal/proxy/gateway/services/nginx.py | 132 +++++++++++++++++- 2 files changed, 132 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index c670b980e..bf37a494b 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -983,6 +983,7 @@ def get_dstack_gateway_wheel(build: str) -> str: build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + logger.debug("Using temp wheel") return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index a38fef06b..3b9ba6e93 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -1,4 +1,5 @@ import importlib.resources +import json import subprocess import tempfile from asyncio import Lock @@ -120,13 +121,142 @@ def start_or_update_sglang_router(replicas: int) -> None: # Start router (without workers) if it's not running, then sync workers if not Nginx.is_sglang_router_running(): logger.info("Sglang router not running, starting with %d replicas", replicas) - Nginx.start_sglang_router(replicas) + Nginx.start_only_sglang_router() + Nginx.update_sglang_router_workers(replicas) @staticmethod def is_sglang_router_running() -> bool: result = subprocess.run(["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5) return result.returncode == 0 + @staticmethod + def start_only_sglang_router() -> None: + """Start sglang router""" + try: + # Start sglang router + logger.info("Starting sglang-router...") + cmd = [ + "python3", + "-m", + "sglang_router.launch_router", + "--host", + "0.0.0.0", + "--port", + "3000", + "--log-level", + "debug", + "--log-dir", + "./router_logs", + ] + subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # Wait for router to start + import time + + time.sleep(2) + + # Verify router is running + if not Nginx.is_sglang_router_running(): + raise Exception("Failed to start sglang router") + + logger.info("Sglang router started successfully") + + except Exception as e: + logger.error(f"Failed to start sglang-router: {e}") + raise + + @staticmethod + def get_sglang_router_workers() -> list[dict]: + try: + result = subprocess.run( + ["curl", "-s", "http://localhost:3000/workers"], capture_output=True, timeout=5 + ) + if result.returncode == 0: + response = json.loads(result.stdout.decode()) + return response.get("workers", []) + return [] + except Exception as e: + logger.error(f"Error getting sglang router workers: {e}") + return [] + + @staticmethod + def update_sglang_router_workers(replicas: int) -> None: + """Update sglang router workers via HTTP API""" + try: + # Get current workers + current_workers = Nginx.get_sglang_router_workers() + current_worker_urls = {worker["url"] for worker in current_workers} + current_count = len(current_worker_urls) + + if current_count == replicas: + logger.info("Sglang router already has %d workers, no update needed", replicas) + return + + # Calculate target worker URLs + target_worker_urls = {f"http://127.0.0.1:{10000 + i}" for i in range(1, replicas + 1)} + + # Workers to add + workers_to_add = target_worker_urls - current_worker_urls + # Workers to remove + workers_to_remove = current_worker_urls - target_worker_urls + + logger.info( + "Sglang router update: adding %d workers, removing %d workers", + len(workers_to_add), + len(workers_to_remove), + ) + + # Add new workers + for worker_url in sorted(workers_to_add): + success = Nginx.add_sglang_router_worker(worker_url) + if not success: + logger.warning("Failed to add worker %s, continuing with others", worker_url) + + # Remove old workers + # for worker_url in sorted(workers_to_remove): + # success = Nginx.remove_sglang_router_worker(worker_url) + # if not success: + # logger.warning( + # "Failed to remove worker %s, continuing with others", worker_url + # ) + + except Exception as e: + logger.error(f"Error updating sglang router workers: {e}") + + @staticmethod + def add_sglang_router_worker(worker_url: str) -> bool: + try: + payload = {"url": worker_url, "worker_type": "regular"} + result = subprocess.run( + [ + "curl", + "-X", + "POST", + "http://localhost:3000/workers", + "-H", + "Content-Type: application/json", + "-d", + json.dumps(payload), + ], + capture_output=True, + timeout=5, + ) + + if result.returncode == 0: + response = json.loads(result.stdout.decode()) + if response.get("status") == "accepted": + logger.info("Added worker %s to sglang router (queued)", worker_url) + return True + else: + logger.error("Failed to add worker %s: %s", worker_url, response) + return False + else: + logger.error("Failed to add worker %s: %s", worker_url, result.stderr.decode()) + return False + except Exception as e: + logger.error(f"Error adding worker {worker_url}: {e}") + return False + @staticmethod def start_sglang_router(replicas: int) -> None: try: From 875353ddc06fd8f8c61b812ffdd32c813f048d5a Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 20 Oct 2025 19:40:50 +0545 Subject: [PATCH 14/18] Update start or update router --- src/dstack/_internal/proxy/gateway/services/nginx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 3b9ba6e93..f2a041be8 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -122,7 +122,7 @@ def start_or_update_sglang_router(replicas: int) -> None: if not Nginx.is_sglang_router_running(): logger.info("Sglang router not running, starting with %d replicas", replicas) Nginx.start_only_sglang_router() - Nginx.update_sglang_router_workers(replicas) + Nginx.update_sglang_router_workers(replicas) @staticmethod def is_sglang_router_running() -> bool: From 907f71ffc1f275c705e7c0c2466b33ef65d59f69 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 20 Oct 2025 20:12:34 +0545 Subject: [PATCH 15/18] Add remove worker endpoint --- .../_internal/proxy/gateway/services/nginx.py | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index f2a041be8..342c2d2c7 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -2,6 +2,7 @@ import json import subprocess import tempfile +import urllib.parse from asyncio import Lock from pathlib import Path from typing import Optional @@ -213,12 +214,12 @@ def update_sglang_router_workers(replicas: int) -> None: logger.warning("Failed to add worker %s, continuing with others", worker_url) # Remove old workers - # for worker_url in sorted(workers_to_remove): - # success = Nginx.remove_sglang_router_worker(worker_url) - # if not success: - # logger.warning( - # "Failed to remove worker %s, continuing with others", worker_url - # ) + for worker_url in sorted(workers_to_remove): + success = Nginx.remove_sglang_router_worker(worker_url) + if not success: + logger.warning( + "Failed to remove worker %s, continuing with others", worker_url + ) except Exception as e: logger.error(f"Error updating sglang router workers: {e}") @@ -257,6 +258,34 @@ def add_sglang_router_worker(worker_url: str) -> bool: logger.error(f"Error adding worker {worker_url}: {e}") return False + @staticmethod + def remove_sglang_router_worker(worker_url: str) -> bool: + """Remove a single worker from sglang router""" + try: + # URL encode the worker URL for the DELETE request + encoded_url = urllib.parse.quote(worker_url, safe="") + + result = subprocess.run( + ["curl", "-X", "DELETE", f"http://localhost:3000/workers/{encoded_url}"], + capture_output=True, + timeout=5, + ) + + if result.returncode == 0: + response = json.loads(result.stdout.decode()) + if response.get("status") == "accepted": + logger.info("Removed worker %s from sglang router (queued)", worker_url) + return True + else: + logger.error("Failed to remove worker %s: %s", worker_url, response) + return False + else: + logger.error("Failed to remove worker %s: %s", worker_url, result.stderr.decode()) + return False + except Exception as e: + logger.error(f"Error removing worker {worker_url}: {e}") + return False + @staticmethod def start_sglang_router(replicas: int) -> None: try: From c9d77222e08002e23f8074a64bd161a52c9b86d4 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 21 Oct 2025 07:29:07 +0545 Subject: [PATCH 16/18] Clean sglang autoscaling --- .../_internal/core/backends/base/compute.py | 4 +- .../_internal/proxy/gateway/services/nginx.py | 86 +++++-------------- 2 files changed, 22 insertions(+), 68 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index bf37a494b..ee3e7e10c 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -982,9 +982,7 @@ def get_dstack_gateway_wheel(build: str) -> str: r.raise_for_status() build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) - # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" - logger.debug("Using temp wheel") - return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" + return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" def get_dstack_gateway_commands() -> List[str]: diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 342c2d2c7..be3dbcd6b 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -118,23 +118,25 @@ def reload() -> None: @staticmethod def start_or_update_sglang_router(replicas: int) -> None: - """Start the sglang router if not running; otherwise update workers via HTTP API.""" - # Start router (without workers) if it's not running, then sync workers if not Nginx.is_sglang_router_running(): - logger.info("Sglang router not running, starting with %d replicas", replicas) - Nginx.start_only_sglang_router() + Nginx.start_sglang_router() Nginx.update_sglang_router_workers(replicas) @staticmethod def is_sglang_router_running() -> bool: - result = subprocess.run(["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5) - return result.returncode == 0 + """Check if sglang router is running and responding to HTTP requests.""" + try: + result = subprocess.run( + ["curl", "-s", "http://localhost:3000/workers"], capture_output=True, timeout=5 + ) + return result.returncode == 0 + except Exception as e: + logger.error(f"Error checking sglang router status: {e}") + return False @staticmethod - def start_only_sglang_router() -> None: - """Start sglang router""" + def start_sglang_router() -> None: try: - # Start sglang router logger.info("Starting sglang-router...") cmd = [ "python3", @@ -151,7 +153,6 @@ def start_only_sglang_router() -> None: ] subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - # Wait for router to start import time time.sleep(2) @@ -187,11 +188,6 @@ def update_sglang_router_workers(replicas: int) -> None: # Get current workers current_workers = Nginx.get_sglang_router_workers() current_worker_urls = {worker["url"] for worker in current_workers} - current_count = len(current_worker_urls) - - if current_count == replicas: - logger.info("Sglang router already has %d workers, no update needed", replicas) - return # Calculate target worker URLs target_worker_urls = {f"http://127.0.0.1:{10000 + i}" for i in range(1, replicas + 1)} @@ -201,19 +197,18 @@ def update_sglang_router_workers(replicas: int) -> None: # Workers to remove workers_to_remove = current_worker_urls - target_worker_urls - logger.info( - "Sglang router update: adding %d workers, removing %d workers", - len(workers_to_add), - len(workers_to_remove), - ) + if workers_to_add: + logger.info("Sglang router update: adding %d workers", len(workers_to_add)) + if workers_to_remove: + logger.info("Sglang router update: removing %d workers", len(workers_to_remove)) - # Add new workers + # Add workers for worker_url in sorted(workers_to_add): success = Nginx.add_sglang_router_worker(worker_url) if not success: logger.warning("Failed to add worker %s, continuing with others", worker_url) - # Remove old workers + # Remove workers for worker_url in sorted(workers_to_remove): success = Nginx.remove_sglang_router_worker(worker_url) if not success: @@ -223,6 +218,7 @@ def update_sglang_router_workers(replicas: int) -> None: except Exception as e: logger.error(f"Error updating sglang router workers: {e}") + raise @staticmethod def add_sglang_router_worker(worker_url: str) -> bool: @@ -246,7 +242,7 @@ def add_sglang_router_worker(worker_url: str) -> bool: if result.returncode == 0: response = json.loads(result.stdout.decode()) if response.get("status") == "accepted": - logger.info("Added worker %s to sglang router (queued)", worker_url) + logger.info("Added worker %s to sglang router", worker_url) return True else: logger.error("Failed to add worker %s: %s", worker_url, response) @@ -274,7 +270,7 @@ def remove_sglang_router_worker(worker_url: str) -> bool: if result.returncode == 0: response = json.loads(result.stdout.decode()) if response.get("status") == "accepted": - logger.info("Removed worker %s from sglang router (queued)", worker_url) + logger.info("Removed worker %s from sglang router", worker_url) return True else: logger.error("Failed to remove worker %s: %s", worker_url, response) @@ -286,47 +282,6 @@ def remove_sglang_router_worker(worker_url: str) -> bool: logger.error(f"Error removing worker {worker_url}: {e}") return False - @staticmethod - def start_sglang_router(replicas: int) -> None: - try: - # result = subprocess.run( - # ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 - # ) - # if result.returncode == 0: - # logger.info("Stopping existing sglang-router...") - # subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) - # import time - - # time.sleep(1) - worker_urls = [] - for i in range(1, replicas + 1): - worker_urls.append(f"http://127.0.0.1:{10000 + i}") - - logger.info(f"Starting sglang-router with {replicas} replicas...") - cmd = ( - [ - "python3", - "-m", - "sglang_router.launch_router", - "--worker-urls", - ] - + worker_urls - + [ - "--host", - "0.0.0.0", - "--port", - "3000", - "--log-level", - "debug", - "--log-dir", - "./router_logs", - ] - ) - subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - - except Exception as e: - logger.error(f"Failed to start sglang-router: {e}") - @staticmethod def stop_sglang_router() -> None: try: @@ -350,6 +305,7 @@ def stop_sglang_router() -> None: except Exception as e: logger.error(f"Failed to stop sglang-router: {e}") + raise def write_conf(self, conf: str, conf_name: str) -> None: """Update config and reload nginx. Rollback changes on error.""" From e061049043939bac695e792da9e60a23fad66ec8 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 21 Oct 2025 09:07:40 +0545 Subject: [PATCH 17/18] Include router field in gateway tests --- src/tests/_internal/server/routers/test_gateways.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/tests/_internal/server/routers/test_gateways.py b/src/tests/_internal/server/routers/test_gateways.py index 996157350..6d06e6969 100644 --- a/src/tests/_internal/server/routers/test_gateways.py +++ b/src/tests/_internal/server/routers/test_gateways.py @@ -70,6 +70,7 @@ async def test_list(self, test_db, session: AsyncSession, client: AsyncClient): "name": gateway.name, "backend": backend.type.value, "region": gateway.region, + "router": None, "domain": gateway.wildcard_domain, "default": False, "public_ip": True, @@ -121,6 +122,7 @@ async def test_get(self, test_db, session: AsyncSession, client: AsyncClient): "name": gateway.name, "backend": backend.type.value, "region": gateway.region, + "router": None, "domain": gateway.wildcard_domain, "default": False, "public_ip": True, @@ -201,6 +203,7 @@ async def test_create_gateway(self, test_db, session: AsyncSession, client: Asyn "name": "test", "backend": backend.type.value, "region": "us", + "router": None, "domain": None, "default": True, "public_ip": True, @@ -253,6 +256,7 @@ async def test_create_gateway_without_name( "name": "random-name", "backend": backend.type.value, "region": "us", + "router": None, "domain": None, "default": True, "public_ip": True, @@ -355,6 +359,7 @@ async def test_set_default_gateway(self, test_db, session: AsyncSession, client: "name": gateway.name, "backend": backend.type.value, "region": gateway.region, + "router": None, "domain": gateway.wildcard_domain, "default": True, "public_ip": True, @@ -477,6 +482,7 @@ def get_backend(project, backend_type): "name": gateway_gcp.name, "backend": backend_gcp.type.value, "region": gateway_gcp.region, + "router": None, "domain": gateway_gcp.wildcard_domain, "default": False, "public_ip": True, @@ -546,6 +552,7 @@ async def test_set_wildcard_domain(self, test_db, session: AsyncSession, client: "name": gateway.name, "backend": backend.type.value, "region": gateway.region, + "router": None, "domain": "test.com", "default": False, "public_ip": True, From f82ca1090dbb9359e2ec56669254d9d6ae79dca6 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 21 Oct 2025 17:36:54 +0545 Subject: [PATCH 18/18] Minor Update --- src/dstack/_internal/proxy/gateway/services/nginx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index be3dbcd6b..015091ac7 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -89,7 +89,7 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: await run_async(self.run_certbot, conf.domain, acme) await run_async(self.write_conf, conf.render(), conf_name) if hasattr(conf, "router") and conf.router == "sglang": - replicas = len(conf.replicas) if hasattr(conf, "replicas") and conf.replicas else 1 + replicas = len(conf.replicas) await run_async(self.write_sglang_workers_conf, conf) await run_async(self.start_or_update_sglang_router, replicas)