Skip to content

Commit

Permalink
Increase max active resources supported by server (#2189)
Browse files Browse the repository at this point in the history
* Increase runner submitWaitDuration to 5m

* Fix potentially long write DB transactions

* Set PRAGMA synchronous=NORMAL

* Fix _wait_to_lock_many()

* Implement batch background processing

* Add Server limits to Server deployment

* Fix reuse instances lock

* Use AsyncAdaptedQueuePool for sqlite

Due to aiosqlite's default, NullPool was used for sqlite.
This is a suboptimal setting when spawning many sessions as we do.
(See bluesky/tiled#663)
Also increase busy_timeout to 30s.
It allows getting rid of "database is locked" when stopping many runs (e.g. 20 at a time).
  • Loading branch information
r4victor authored Jan 16, 2025
1 parent 049b9e9 commit 5c6891a
Show file tree
Hide file tree
Showing 15 changed files with 157 additions and 49 deletions.
36 changes: 25 additions & 11 deletions docs/docs/guides/server-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ You can run the server either through `pip` or using Docker.

> The server can be set up via `pip` on Linux, macOS, and Windows (via WSL 2).
> It requires Git and OpenSSH.
> The minimum hardware requirements are 1 CPU and 1GB of RAM.

=== "Docker"

Expand Down Expand Up @@ -67,10 +68,14 @@ Alternatively, you can configure backends on the [project settings page](../guid
## State persistence

By default, the `dstack` server stores its state locally in `~/.dstack/server` using SQLite.
The `dstack` server can store its internal state in SQLite or Postgres.
By default, it stores the state locally in `~/.dstack/server` using SQLite.
With SQLite, you can run at most one server replica.
Postgres has no such limitation and is recommended for production deployment.

??? info "Replicate state to cloud storage"
If you’d like, you can configure automatic replication of your SQLite state to cloud object storage using Litestream.
??? info "Replicate SQLite to cloud storage"
You can configure automatic replication of your SQLite state to a cloud object storage using Litestream.
This allows persisting the server state across re-deployments when using SQLite.

To enable Litestream replication, set the following environment variables:

Expand Down Expand Up @@ -103,7 +108,7 @@ By default, the `dstack` server stores its state locally in `~/.dstack/server` u

### PostgreSQL

To store the state externally, set the `DSTACK_DATABASE_URL` and `DSTACK_SERVER_CLOUDWATCH_LOG_GROUP` environment variables.
To store the server state in Postgres, set the `DSTACK_DATABASE_URL` environment variable.

??? info "Migrate from SQLite to PostgreSQL"
You can migrate the existing state from SQLite to PostgreSQL using `pgloader`:
Expand Down Expand Up @@ -132,7 +137,8 @@ To store the state externally, set the `DSTACK_DATABASE_URL` and `DSTACK_SERVER_

## Logs storage

By default, `dstack` stores workload logs in `~/.dstack/server/projects/<project_name>/logs`.
By default, `dstack` stores workload logs locally in `~/.dstack/server/projects/<project_name>/logs`.
For multi-replica server deployments, it's required to store logs externally, e.g. in AWS CloudWatch.

### AWS CloudWatch

Expand Down Expand Up @@ -264,14 +270,22 @@ default_permissions:
## Backward compatibility
!!! info "Versioning scheme"
`dstack` follows the `{major}.{minor}.{patch}` versioning scheme based on these principles:
`dstack` follows the `{major}.{minor}.{patch}` versioning scheme.
Backward compatibility is maintained based on these principles:

=== "Server"
The server backward compatibility is maintained across all minor and patch releases. The specific features can be removed but the removal is preceded with deprecation warnings for several minor releases. This means you can use older client versions with newer server versions.
* The server backward compatibility is maintained across all minor and patch releases. The specific features can be removed but the removal is preceded with deprecation warnings for several minor releases. This means you can use older client versions with newer server versions.
* The client backward compatibility is maintained across patch releases. A new minor release indicates that the release breaks client backward compatibility. This means you don't need to update the server when you update the client to a new patch release. Still, upgrading a client to a new minor version requires upgrading the server too.

=== "Client"
The client backward compatibility is maintained across patch releases. A new minor release indicates that the release breaks client backward compatibility. This means you don't need to update the server when you update the client to a new patch release. Still, upgrading a client to a new minor version requires upgrading the server too.
## Server limits

A single `dstack` server replica can support:

* Up to 150 active runs.
* Up to 150 active jobs.
* Up to 150 active instances.

Having more active resources can affect server performance.
If you hit these limits, consider using Postgres with multiple server replicas.

## FAQs

Expand Down
2 changes: 1 addition & 1 deletion runner/internal/runner/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewServer(tempDir string, homeDir string, workingDir string, address string
pullDoneCh: make(chan interface{}),
wsDoneCh: make(chan interface{}),

submitWaitDuration: 2 * time.Minute,
submitWaitDuration: 5 * time.Minute,
logsWaitDuration: 30 * time.Second,

executor: ex,
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/server/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import importlib.resources
import os
import time
Expand Down Expand Up @@ -141,6 +142,9 @@ async def lifespan(app: FastAPI):
scheduler.shutdown()
await gateway_connections_pool.remove_all()
await service_replica_connection_pool.remove_all()
await get_db().engine.dispose()
# Let checked-out DB connections close as dispose() only closes checked-in connections
await asyncio.sleep(3)


_ON_STARTUP_HOOKS = []
Expand Down
44 changes: 37 additions & 7 deletions src/dstack/_internal/server/background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,55 @@ def start_background_tasks() -> AsyncIOScheduler:
# In-memory locking via locksets does not guarantee
# that the first waiting for the lock will acquire it.
# The jitter is needed to give all tasks a chance to acquire locks.

# The batch_size and interval determine background tasks processing rates.
# Currently one server replica can handle:
# * 150 active jobs with up to 2 minutes processing latency
# * 150 active runs with up to 2 minutes processing latency
# * 150 active instances with up to 2 minutes processing latency
_scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1)
_scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1)
# process_submitted_jobs and process_instances processing rate is 75 jobs(instances) per minute.
# Currently limited by cloud rate limits such as AWS ListServiceQuotas requests.
# TODO: Fix unnecessary requests to clouds and increase this.
_scheduler.add_job(
process_submitted_jobs,
IntervalTrigger(seconds=4, jitter=2),
kwargs={"batch_size": 5},
max_instances=5,
)
_scheduler.add_job(
process_running_jobs,
IntervalTrigger(seconds=4, jitter=2),
kwargs={"batch_size": 5},
max_instances=5,
)
_scheduler.add_job(
process_terminating_jobs,
IntervalTrigger(seconds=4, jitter=2),
kwargs={"batch_size": 5},
max_instances=5,
)
_scheduler.add_job(
process_submitted_jobs, IntervalTrigger(seconds=4, jitter=2), max_instances=5
process_runs,
IntervalTrigger(seconds=2, jitter=1),
kwargs={"batch_size": 5},
max_instances=5,
)
_scheduler.add_job(process_running_jobs, IntervalTrigger(seconds=4, jitter=2), max_instances=5)
_scheduler.add_job(
process_terminating_jobs, IntervalTrigger(seconds=4, jitter=2), max_instances=5
process_instances,
IntervalTrigger(seconds=4, jitter=2),
kwargs={"batch_size": 5},
max_instances=5,
)
_scheduler.add_job(process_instances, IntervalTrigger(seconds=4, jitter=2), max_instances=5)
_scheduler.add_job(process_runs, IntervalTrigger(seconds=2), max_instances=5)
_scheduler.add_job(process_fleets, IntervalTrigger(seconds=10, jitter=2))
_scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15))
_scheduler.add_job(
process_submitted_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5
)
_scheduler.add_job(
process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5
)
_scheduler.add_job(process_fleets, IntervalTrigger(seconds=15))
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30))
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5))
_scheduler.start()
return _scheduler
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,14 @@
logger = get_logger(__name__)


async def process_instances() -> None:
async def process_instances(batch_size: int = 1):
tasks = []
for _ in range(batch_size):
tasks.append(_process_next_instance())
await asyncio.gather(*tasks)


async def _process_next_instance():
lock, lockset = get_locker().get_lockset(InstanceModel.__tablename__)
async with get_session_ctx() as session:
async with lock:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datetime import timedelta
from typing import Dict, List, Optional

Expand Down Expand Up @@ -54,7 +55,14 @@
logger = get_logger(__name__)


async def process_running_jobs():
async def process_running_jobs(batch_size: int = 1):
tasks = []
for _ in range(batch_size):
tasks.append(_process_next_running_job())
await asyncio.gather(*tasks)


async def _process_next_running_job():
lock, lockset = get_locker().get_lockset(JobModel.__tablename__)
async with get_session_ctx() as session:
async with lock:
Expand Down
12 changes: 11 additions & 1 deletion src/dstack/_internal/server/background/tasks/process_runs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import datetime
import itertools
from typing import List, Optional, Set, Tuple
Expand Down Expand Up @@ -42,7 +43,14 @@
RETRY_DELAY = datetime.timedelta(seconds=15)


async def process_runs():
async def process_runs(batch_size: int = 1):
tasks = []
for _ in range(batch_size):
tasks.append(_process_next_run())
await asyncio.gather(*tasks)


async def _process_next_run():
run_lock, run_lockset = get_locker().get_lockset(RunModel.__tablename__)
job_lock, job_lockset = get_locker().get_lockset(JobModel.__tablename__)
async with get_session_ctx() as session:
Expand Down Expand Up @@ -322,6 +330,8 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel):
# use replicas_info from before retrying
replicas_diff = scaler.scale(replicas_info, stats)
if replicas_diff != 0:
# FIXME: potentially long write transaction
# Why do we flush here?
await session.flush()
await session.refresh(run_model)
await scale_run_replicas(session, run_model, replicas_diff)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import uuid
from typing import List, Optional, Tuple

Expand Down Expand Up @@ -74,7 +75,14 @@
logger = get_logger(__name__)


async def process_submitted_jobs():
async def process_submitted_jobs(batch_size: int = 1):
tasks = []
for _ in range(batch_size):
tasks.append(_process_next_submitted_job())
await asyncio.gather(*tasks)


async def _process_next_submitted_job():
lock, lockset = get_locker().get_lockset(JobModel.__tablename__)
async with get_session_ctx() as session:
async with lock:
Expand Down Expand Up @@ -196,7 +204,11 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
async with get_locker().lock_ctx(InstanceModel.__tablename__, instances_ids):
# Refetch after lock
res = await session.execute(
select(InstanceModel).where(InstanceModel.id.in_(instances_ids))
select(InstanceModel).where(
InstanceModel.id.in_(instances_ids),
InstanceModel.deleted == False,
InstanceModel.job_id.is_(None),
)
)
pool_instances = list(res.scalars().all())
instance = await _assign_job_to_pool_instance(
Expand Down Expand Up @@ -289,7 +301,6 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
)
session.add(instance)
session.add(fleet_model)
await session.flush() # to get im.id
job_model.used_instance_id = instance.id

volumes_ids = sorted([v.id for vs in volume_models for v in vs])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession

Expand All @@ -14,7 +16,14 @@
logger = get_logger(__name__)


async def process_terminating_jobs():
async def process_terminating_jobs(batch_size: int = 1):
tasks = []
for _ in range(batch_size):
tasks.append(_process_next_terminating_job())
await asyncio.gather(*tasks)


async def _process_next_terminating_job():
lock, lockset = get_locker().get_lockset(JobModel.__tablename__)
async with get_session_ctx() as session:
async with lock:
Expand Down
11 changes: 8 additions & 3 deletions src/dstack/_internal/server/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Optional

from alembic import command, config
from sqlalchemy import event
from sqlalchemy import AsyncAdaptedQueuePool, event
from sqlalchemy.engine.interfaces import DBAPIConnection
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
Expand All @@ -19,7 +19,11 @@ def __init__(self, url: str, engine: Optional[AsyncEngine] = None):
if engine is not None:
self.engine = engine
else:
self.engine = create_async_engine(self.url, echo=settings.SQL_ECHO_ENABLED)
self.engine = create_async_engine(
self.url,
echo=settings.SQL_ECHO_ENABLED,
poolclass=AsyncAdaptedQueuePool,
)
self.session_maker = sessionmaker(
bind=self.engine,
expire_on_commit=False,
Expand All @@ -33,7 +37,8 @@ def set_sqlite_pragma(dbapi_connection: DBAPIConnection, _: ConnectionPoolEntry)
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA foreign_keys=ON;")
cursor.execute("PRAGMA busy_timeout=10000;")
cursor.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA busy_timeout=30000;")
cursor.close()

@property
Expand Down
3 changes: 3 additions & 0 deletions src/dstack/_internal/server/services/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ async def update_backend(
raise ServerClientError("Backend does not exist")
await run_async(configurator.get_config_values, config)
backend = await run_async(configurator.create_backend, project=project, config=config)
# FIXME: potentially long write transaction
await session.execute(
update(BackendModel)
.where(
Expand Down Expand Up @@ -244,6 +245,8 @@ async def delete_backends(
deleted_backends_types = current_backends_types.intersection(backends_types)
if len(deleted_backends_types) == 0:
return
# FIXME: potentially long write transaction
# Not urgent since backend deletion is a rare operation
await session.execute(
delete(BackendModel).where(
BackendModel.type.in_(deleted_backends_types),
Expand Down
10 changes: 7 additions & 3 deletions src/dstack/_internal/server/services/locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ async def _wait_to_lock_many(
left_to_lock = keys.copy()
while len(left_to_lock) > 0:
async with lock:
locked_now_num = 0
for key in left_to_lock:
if key not in locked:
locked.add(key)
left_to_lock.remove(key)
if key in locked:
# Someone already aquired the lock, wait
break
locked.add(key)
locked_now_num += 1
left_to_lock = left_to_lock[locked_now_num:]
await asyncio.sleep(delay)
Loading

0 comments on commit 5c6891a

Please sign in to comment.