Skip to content

Commit

Permalink
[HWORKS-791] Add Python API support to schedule jobs (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf committed Oct 16, 2023
1 parent f783386 commit a9a47cf
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 2 deletions.
23 changes: 22 additions & 1 deletion python/hopsworks/core/job_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import json

from hopsworks import client, job, util
from hopsworks import client, job, util, job_schedule
from hopsworks.client.exceptions import RestAPIError


Expand Down Expand Up @@ -192,3 +192,24 @@ def _update_job(self, name: str, config: dict):
self._project_id,
self._project_name,
)

def _schedule_job(self, name, schedule_config):
_client = client.get_instance()
path_params = ["project", self._project_id, "jobs", name, "schedule", "v2"]
headers = {"content-type": "application/json"}
method = "PUT" if schedule_config["id"] else "POST"

return job_schedule.JobSchedule.from_response_json(
_client._send_request(
method, path_params, headers=headers, data=json.dumps(schedule_config)
)
)

def _delete_schedule_job(self, name):
_client = client.get_instance()
path_params = ["project", self._project_id, "jobs", name, "schedule", "v2"]

return _client._send_request(
"DELETE",
path_params,
)
60 changes: 59 additions & 1 deletion python/hopsworks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import json
from hopsworks.engine import execution_engine
from hopsworks.core import job_api, execution_api
from hopsworks import util
from hopsworks import util, job_schedule as js

from datetime import datetime, timezone


class Job:
Expand All @@ -38,6 +40,7 @@ def __init__(
count=None,
project_id=None,
project_name=None,
job_schedule=None,
):
self._id = id
self._name = name
Expand All @@ -47,6 +50,11 @@ def __init__(
self._creator = creator
self._executions = executions
self._project_id = project_id
self._job_schedule = (
js.JobSchedule.from_response_json(job_schedule)
if job_schedule
else job_schedule
)

self._execution_engine = execution_engine.ExecutionEngine(project_id)
self._execution_api = execution_api.ExecutionsApi(project_id)
Expand Down Expand Up @@ -117,6 +125,11 @@ def creator(self):
"""Creator of the job"""
return self._creator

@property
def job_schedule(self):
"""Return the Job schedule"""
return self._job_schedule

def run(self, args: str = None, await_termination: bool = None):
"""Run the job, with the option of passing runtime arguments.
Expand Down Expand Up @@ -180,6 +193,51 @@ def delete(self):
"""
self._job_api._delete(self)

def schedule(
self,
cron_expression: str,
start_time: datetime = None,
end_time: datetime = None,
):
"""Schedule the execution of the job.
If a schedule for this job already exists, the method updates it.
```python
# Schedule the job
job.schedule(
cron_expression="0 */5 * ? * * *",
start_time=datetime.datetime.now(tz=timezone.utc)
)
# Retrieve the next execution time
print(job.job_schedule.next_execution_date_time)
```
# Arguments
cron_expression: The quartz cron expression
start_time: The schedule start time in UTC. If None, the current time is used. The start_time can be a value in the past.
end_time: The schedule end time in UTC. If None, the schedule will continue running indefinitely. The end_time can be a value in the past.
# Returns
`JobSchedule`. The schedule of the job
"""
job_schedule = js.JobSchedule(
id=self._job_schedule.id if self._job_schedule else None,
start_date_time=start_time if start_time else datetime.now(tz=timezone.utc),
cron_expression=cron_expression,
end_time=end_time,
enabled=True,
)
self._job_schedule = self._job_api._schedule_job(
self._name, job_schedule.to_dict()
)
return self._job_schedule

def unschedule(self):
"""Unschedule the exceution of a Job"""
self._job_api._delete_schedule_job(self._name)
self._job_schedule = None

def json(self):
return json.dumps(self, cls=util.Encoder)

Expand Down
105 changes: 105 additions & 0 deletions python/hopsworks/job_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#
# Copyright 2023 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import humps
import json
from datetime import datetime, timezone

from hopsworks import util


class JobSchedule:
def __init__(
self,
start_date_time,
enabled,
cron_expression,
next_execution_date_time=None,
id=None,
end_date_time=None,
**kwargs
):
self._id = id
self._start_date_time = (
datetime.fromtimestamp(start_date_time / 1000, tz=timezone.utc)
if isinstance(start_date_time, int)
else start_date_time
)

self._end_date_time = (
datetime.fromtimestamp(end_date_time / 1000, tz=timezone.utc)
if isinstance(end_date_time, int)
else end_date_time
)
self._enabled = enabled
self._cron_expression = cron_expression

self._next_execution_date_time = (
datetime.fromtimestamp(next_execution_date_time / 1000, tz=timezone.utc)
if isinstance(next_execution_date_time, int)
else next_execution_date_time
)

@classmethod
def from_response_json(cls, json_dict):
json_decamelized = humps.decamelize(json_dict)
return cls(**json_decamelized)

def to_dict(self):
return {
"id": self._id,
"startDateTime": int(self._start_date_time.timestamp() * 1000.0)
if self._start_date_time
else None,
"endDateTime": int(self._end_date_time.timestamp() * 1000.0)
if self._end_date_time
else None,
"cronExpression": self._cron_expression,
"enabled": self._enabled,
}

def json(self):
return json.dumps(self, cls=util.Encoder)

@property
def id(self):
"""Return the schedule id"""
return self._id

@property
def start_date_time(self):
"""Return the schedule start time"""
return self._start_date_time

@property
def end_date_time(self):
"""Return the schedule end time"""
return self._end_date_time

@property
def enabled(self):
"""Return whether the schedule is enabled or not"""
return self._enabled

@property
def cron_expression(self):
"""Return the schedule cron expression"""
return self._cron_expression

@property
def next_execution_date_time(self):
"""Return the next execution time"""
return self._next_execution_date_time

0 comments on commit a9a47cf

Please sign in to comment.