diff --git a/python/hopsworks/core/job_api.py b/python/hopsworks/core/job_api.py index bd1544ce3..e40afe8c0 100644 --- a/python/hopsworks/core/job_api.py +++ b/python/hopsworks/core/job_api.py @@ -16,7 +16,7 @@ import json -from hopsworks import client, job, util +from hopsworks import client, job, util, job_schedule from hopsworks.client.exceptions import RestAPIError @@ -192,3 +192,24 @@ def _update_job(self, name: str, config: dict): self._project_id, self._project_name, ) + + def _schedule_job(self, name, schedule_config): + _client = client.get_instance() + path_params = ["project", self._project_id, "jobs", name, "schedule", "v2"] + headers = {"content-type": "application/json"} + method = "PUT" if schedule_config["id"] else "POST" + + return job_schedule.JobSchedule.from_response_json( + _client._send_request( + method, path_params, headers=headers, data=json.dumps(schedule_config) + ) + ) + + def _delete_schedule_job(self, name): + _client = client.get_instance() + path_params = ["project", self._project_id, "jobs", name, "schedule", "v2"] + + return _client._send_request( + "DELETE", + path_params, + ) diff --git a/python/hopsworks/job.py b/python/hopsworks/job.py index 8fd581893..168ee9ef9 100644 --- a/python/hopsworks/job.py +++ b/python/hopsworks/job.py @@ -18,7 +18,9 @@ import json from hopsworks.engine import execution_engine from hopsworks.core import job_api, execution_api -from hopsworks import util +from hopsworks import util, job_schedule as js + +from datetime import datetime, timezone class Job: @@ -38,6 +40,7 @@ def __init__( count=None, project_id=None, project_name=None, + job_schedule=None, ): self._id = id self._name = name @@ -47,6 +50,11 @@ def __init__( self._creator = creator self._executions = executions self._project_id = project_id + self._job_schedule = ( + js.JobSchedule.from_response_json(job_schedule) + if job_schedule + else job_schedule + ) self._execution_engine = execution_engine.ExecutionEngine(project_id) self._execution_api = execution_api.ExecutionsApi(project_id) @@ -117,6 +125,11 @@ def creator(self): """Creator of the job""" return self._creator + @property + def job_schedule(self): + """Return the Job schedule""" + return self._job_schedule + def run(self, args: str = None, await_termination: bool = None): """Run the job, with the option of passing runtime arguments. @@ -180,6 +193,51 @@ def delete(self): """ self._job_api._delete(self) + def schedule( + self, + cron_expression: str, + start_time: datetime = None, + end_time: datetime = None, + ): + """Schedule the execution of the job. + + If a schedule for this job already exists, the method updates it. + + ```python + # Schedule the job + job.schedule( + cron_expression="0 */5 * ? * * *", + start_time=datetime.datetime.now(tz=timezone.utc) + ) + + # Retrieve the next execution time + print(job.job_schedule.next_execution_date_time) + ``` + + # Arguments + cron_expression: The quartz cron expression + start_time: The schedule start time in UTC. If None, the current time is used. The start_time can be a value in the past. + end_time: The schedule end time in UTC. If None, the schedule will continue running indefinitely. The end_time can be a value in the past. + # Returns + `JobSchedule`. The schedule of the job + """ + job_schedule = js.JobSchedule( + id=self._job_schedule.id if self._job_schedule else None, + start_date_time=start_time if start_time else datetime.now(tz=timezone.utc), + cron_expression=cron_expression, + end_time=end_time, + enabled=True, + ) + self._job_schedule = self._job_api._schedule_job( + self._name, job_schedule.to_dict() + ) + return self._job_schedule + + def unschedule(self): + """Unschedule the exceution of a Job""" + self._job_api._delete_schedule_job(self._name) + self._job_schedule = None + def json(self): return json.dumps(self, cls=util.Encoder) diff --git a/python/hopsworks/job_schedule.py b/python/hopsworks/job_schedule.py new file mode 100644 index 000000000..48e022572 --- /dev/null +++ b/python/hopsworks/job_schedule.py @@ -0,0 +1,105 @@ +# +# Copyright 2023 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import humps +import json +from datetime import datetime, timezone + +from hopsworks import util + + +class JobSchedule: + def __init__( + self, + start_date_time, + enabled, + cron_expression, + next_execution_date_time=None, + id=None, + end_date_time=None, + **kwargs + ): + self._id = id + self._start_date_time = ( + datetime.fromtimestamp(start_date_time / 1000, tz=timezone.utc) + if isinstance(start_date_time, int) + else start_date_time + ) + + self._end_date_time = ( + datetime.fromtimestamp(end_date_time / 1000, tz=timezone.utc) + if isinstance(end_date_time, int) + else end_date_time + ) + self._enabled = enabled + self._cron_expression = cron_expression + + self._next_execution_date_time = ( + datetime.fromtimestamp(next_execution_date_time / 1000, tz=timezone.utc) + if isinstance(next_execution_date_time, int) + else next_execution_date_time + ) + + @classmethod + def from_response_json(cls, json_dict): + json_decamelized = humps.decamelize(json_dict) + return cls(**json_decamelized) + + def to_dict(self): + return { + "id": self._id, + "startDateTime": int(self._start_date_time.timestamp() * 1000.0) + if self._start_date_time + else None, + "endDateTime": int(self._end_date_time.timestamp() * 1000.0) + if self._end_date_time + else None, + "cronExpression": self._cron_expression, + "enabled": self._enabled, + } + + def json(self): + return json.dumps(self, cls=util.Encoder) + + @property + def id(self): + """Return the schedule id""" + return self._id + + @property + def start_date_time(self): + """Return the schedule start time""" + return self._start_date_time + + @property + def end_date_time(self): + """Return the schedule end time""" + return self._end_date_time + + @property + def enabled(self): + """Return whether the schedule is enabled or not""" + return self._enabled + + @property + def cron_expression(self): + """Return the schedule cron expression""" + return self._cron_expression + + @property + def next_execution_date_time(self): + """Return the next execution time""" + return self._next_execution_date_time