Skip to content

Commit

Permalink
Add more cluster functionality (#266)
Browse files Browse the repository at this point in the history
* Add more cluster functionality

* Update beaker/services/cluster.py
  • Loading branch information
epwalsh authored Feb 28, 2024
1 parent 09ffcb1 commit 6cd612a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ use patch releases for compatibility fixes instead.

## Unreleased

### Added

- Added cluster method `Beaker.cluster.preempt_jobs()`.
- Added argument `allow_preemptible` to `Beaker.cluster.update()`.

## [v1.25.1](https://github.com/allenai/beaker-py/releases/tag/v1.25.1) - 2024-02-26

### Fixed
Expand Down
3 changes: 2 additions & 1 deletion beaker/data_model/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,5 @@ class ClusterSpec(BaseModel):


class ClusterPatch(BaseModel):
capacity: int
capacity: Optional[int] = None
allow_preemptible_restriction_exceptions: Optional[bool] = None
39 changes: 37 additions & 2 deletions beaker/services/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,18 @@ def create(
).json()
)

def update(self, cluster: Union[str, Cluster], max_size: int) -> Cluster:
def update(
self,
cluster: Union[str, Cluster],
max_size: Optional[int] = None,
allow_preemptible: Optional[bool] = None,
) -> Cluster:
"""
Modify a cluster.
:param cluster: The cluster ID, full name, or object.
:param max_size: The maximum number of nodes.
:param allow_preemptible: Allow or disallow preemptible jobs.
:raises ClusterNotFound: If the cluster doesn't exist.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
Expand All @@ -115,7 +121,9 @@ def update(self, cluster: Union[str, Cluster], max_size: int) -> Cluster:
self.request(
f"clusters/{cluster_name}",
method="PATCH",
data=ClusterPatch(capacity=max_size),
data=ClusterPatch(
capacity=max_size, allow_preemptible_restriction_exceptions=allow_preemptible
),
exceptions_for_status={404: ClusterNotFound(self._not_found_err_msg(cluster))},
).json()
)
Expand Down Expand Up @@ -355,6 +363,33 @@ def url(self, cluster: Union[str, Cluster]) -> str:
cluster_name = self.resolve_cluster(cluster).full_name
return f"{self.config.agent_address}/cl/{cluster_name}/details"

def preempt_jobs(self, cluster: Union[str, Cluster]) -> List[Job]:
"""
Preempt all preemptible jobs on the cluster.
:param cluster: The cluster ID, full name, or object.
:raises ClusterNotFound: If the cluster doesn't exist.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
cluster = self.resolve_cluster(cluster)
nodes = set(n.id for n in self.nodes(cluster))
current_jobs = self.beaker.job.list(cluster=cluster, finalized=False)
preempted_jobs = []
for job in current_jobs:
if job.node not in nodes:
continue
if job.execution is None:
continue
if job.status.current not in {CurrentJobStatus.running, CurrentJobStatus.idle}:
continue
if job.execution.spec.context.priority != Priority.preemptible:
continue
preempted_jobs.append(self.beaker.job.preempt(job))
return preempted_jobs

def _not_found_err_msg(self, cluster: Union[str, Cluster]) -> str:
cluster = cluster if isinstance(cluster, str) else cluster.id
return (
Expand Down

0 comments on commit 6cd612a

Please sign in to comment.