Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HWORKS-791] Add Python API support to schedule jobs #176

Merged
merged 4 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion python/hopsworks/core/job_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import json

from hopsworks import client, job, util
from hopsworks import client, job, util, job_schedule
from hopsworks.client.exceptions import RestAPIError


Expand Down Expand Up @@ -192,3 +192,24 @@ def _update_job(self, name: str, config: dict):
self._project_id,
self._project_name,
)

def _schedule_job(self, name, schedule_config):
_client = client.get_instance()
path_params = ["project", self._project_id, "jobs", name, "schedule", "v2"]
headers = {"content-type": "application/json"}
method = "PUT" if schedule_config["id"] else "POST"

return job_schedule.JobSchedule.from_response_json(
_client._send_request(
method, path_params, headers=headers, data=json.dumps(schedule_config)
)
)

def _delete_schedule_job(self, name):
_client = client.get_instance()
path_params = ["project", self._project_id, "jobs", name, "schedule", "v2"]

return _client._send_request(
"DELETE",
path_params,
)
58 changes: 57 additions & 1 deletion python/hopsworks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import json
from hopsworks.engine import execution_engine
from hopsworks.core import job_api, execution_api
from hopsworks import util
from hopsworks import util, job_schedule as js

from datetime import datetime, timezone


class Job:
Expand All @@ -38,6 +40,7 @@ def __init__(
count=None,
project_id=None,
project_name=None,
job_schedule=None,
):
self._id = id
self._name = name
Expand All @@ -47,6 +50,11 @@ def __init__(
self._creator = creator
self._executions = executions
self._project_id = project_id
self._job_schedule = (
js.JobSchedule.from_response_json(job_schedule)
if job_schedule
robzor92 marked this conversation as resolved.
Show resolved Hide resolved
else job_schedule
)

self._execution_engine = execution_engine.ExecutionEngine(project_id)
self._execution_api = execution_api.ExecutionsApi(project_id)
Expand Down Expand Up @@ -117,6 +125,11 @@ def creator(self):
"""Creator of the job"""
return self._creator

@property
def job_schedule(self):
"""Return the Job schedule"""
return self._job_schedule

def run(self, args: str = None, await_termination: bool = None):
"""Run the job, with the option of passing runtime arguments.

Expand Down Expand Up @@ -180,6 +193,49 @@ def delete(self):
"""
self._job_api._delete(self)

def schedule(self, cron_expression, start_time=None, end_time=None):
robzor92 marked this conversation as resolved.
Show resolved Hide resolved
"""Schedule the execution of the job.

If a schedule for this job already exists, the method updates it.

```python
# Schedule the job
job.schedule(
cron_expression="0 */5 * ? * * *",
start_time=datetime.datetime.now(tz=timezone.utc)
)

# Retrieve the next execution time
print(job.job_schedule.next_execution_date_time)
```

# Arguments
cron_expression: str. The quartz cron expression
start_time: datetime, optional. The schedule start time in UTC.
If None, the current time is used. The start_time can be a value in the past.
end_time: datetime, optional. The schedule end time in UTC.
If None, the schedule will continue running indefinitely.
The end_time can be a value in the past.
# Returns
`JobSchedule`. The schedule of the job
"""
job_schedule = js.JobSchedule(
id=self._job_schedule.id if self._job_schedule else None,
start_date_time=start_time if start_time else datetime.now(tz=timezone.utc),
cron_expression=cron_expression,
end_time=end_time,
enabled=True,
)
self._job_schedule = self._job_api._schedule_job(
self._name, job_schedule.to_dict()
)
return self._job_schedule

def unschedule(self):
"""Unschedule the exceution of a Job"""
self._job_api._delete_schedule_job(self._name)
self._job_schedule = None

def json(self):
return json.dumps(self, cls=util.Encoder)

Expand Down
105 changes: 105 additions & 0 deletions python/hopsworks/job_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#
# Copyright 2023 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import humps
import json
from datetime import datetime, timezone

from hopsworks import util


class JobSchedule:
def __init__(
self,
start_date_time,
enabled,
cron_expression,
next_execution_date_time=None,
id=None,
end_date_time=None,
**kwargs
):
self._id = id
self._start_date_time = (
datetime.fromtimestamp(start_date_time / 1000, tz=timezone.utc)
if isinstance(start_date_time, int)
else start_date_time
)

self._end_date_time = (
datetime.fromtimestamp(end_date_time / 1000, tz=timezone.utc)
if isinstance(end_date_time, int)
else end_date_time
)
self._enabled = enabled
self._cron_expression = cron_expression

self._next_execution_date_time = (
datetime.fromtimestamp(next_execution_date_time / 1000, tz=timezone.utc)
if isinstance(next_execution_date_time, int)
else next_execution_date_time
)

@classmethod
def from_response_json(cls, json_dict):
json_decamelized = humps.decamelize(json_dict)
return cls(**json_decamelized)

def to_dict(self):
return {
"id": self._id,
"startDateTime": int(self._start_date_time.timestamp() * 1000.0)
if self._start_date_time
else None,
"endDateTime": int(self._end_date_time.timestamp() * 1000.0)
if self._end_date_time
else None,
"cronExpression": self._cron_expression,
"enabled": self._enabled,
}

def json(self):
return json.dumps(self, cls=util.Encoder)

@property
def id(self):
"""Return the schedule id"""
return self._id

@property
def start_date_time(self):
"""Return the schedule start time"""
return self._start_date_time

@property
def end_date_time(self):
"""Return the schedule end time"""
return self._end_date_time

@property
def enabled(self):
"""Return whether the schedule is enabled or not"""
return self._enabled

@property
def cron_expression(self):
"""Return the schedule cron expression"""
return self._cron_expression

@property
def next_execution_date_time(self):
"""Return the next execution time"""
return self._next_execution_date_time
Loading