From 2bfec00d2114d8f43f4cc7c5cad0e3e44de25eaf Mon Sep 17 00:00:00 2001 From: Fabian Reisegger Date: Tue, 16 Jan 2024 10:10:43 +0100 Subject: [PATCH] Added ability to schedule sql jobs --- CHANGES.rst | 2 + croud/__main__.py | 74 +++++++++ croud/config/configuration.py | 31 +++- croud/config/schemas.py | 19 +++ croud/scheduledjobs/__init__.py | 0 croud/scheduledjobs/commands.py | 108 +++++++++++++ croud/util.py | 30 ++++ docs/commands/index.rst | 1 + docs/commands/scheduled-jobs.rst | 108 +++++++++++++ tests/commands/test_scheduled_jobs.py | 215 ++++++++++++++++++++++++++ 10 files changed, 587 insertions(+), 1 deletion(-) create mode 100644 croud/scheduledjobs/__init__.py create mode 100644 croud/scheduledjobs/commands.py create mode 100644 docs/commands/scheduled-jobs.rst create mode 100644 tests/commands/test_scheduled_jobs.py diff --git a/CHANGES.rst b/CHANGES.rst index 2b11293a..9c720bec 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,6 +5,8 @@ Changes for croud Unreleased ========== +- Added support for scheduling sql jobs with the ``scheduled-jobs`` commands. + 1.10.1 - 2024/01/11 =================== diff --git a/croud/__main__.py b/croud/__main__.py index b8e98c7e..ea891a07 100644 --- a/croud/__main__.py +++ b/croud/__main__.py @@ -109,6 +109,12 @@ project_users_remove, ) from croud.regions.commands import regions_create, regions_delete, regions_list +from croud.scheduledjobs.commands import ( + create_scheduled_job, + delete_scheduled_job, + get_scheduled_job_log, + get_scheduled_jobs, +) from croud.subscriptions.commands import ( subscription_delete, subscriptions_create, @@ -1468,6 +1474,74 @@ }, } }, + "scheduled-jobs": { + "help": "Manage your scheduled sql jobs.", + "commands": { + "create": { + "help": "Create a scheduled sql job to run at specific times.", + "extra_args": [ + Argument( + "--name", type=str, required=True, help="Name of the sql job." + ), + Argument( + "--cluster-id", type=str, required=True, + help="Cluster where the job should be run." + ), + Argument( + "--cron", type=str, required=True, + help="Cron schedule of the sql job." + ), + Argument( + "--sql", type=str, required=True, + help="The sql statement the job should run." + ), + Argument( + "--enabled", type=bool, required=True, + help="Enable or disable the job." + ) + ], + "resolver": create_scheduled_job, + }, + "list": { + "help": "Get all scheduled sql jobs.", + "extra_args": [ + Argument( + "--cluster-id", type=str, required=True, + help="The cluster of which jobs should be listed." + ) + ], + "resolver": get_scheduled_jobs, + }, + "logs": { + "help": "Logs of a scheduled sql job.", + "extra_args": [ + Argument( + "--job-id", type=str, required=True, + help="The job id of the job log to be listed." + ), + Argument( + "--cluster-id", type=str, required=True, + help="The cluster of which the job log should be listed." + ) + ], + "resolver": get_scheduled_job_log, + }, + "delete": { + "help": "Delete specified scheduled sql job.", + "extra_args": [ + Argument( + "--job-id", type=str, required=True, + help="The job id of the job to be deleted." + ), + Argument( + "--cluster-id", type=str, required=True, + help="The cluster of which the job should be deleted." + ), + ], + "resolver": delete_scheduled_job, + } + } + }, "subscriptions": { "help": "Manage subscriptions.", "commands": { diff --git a/croud/config/configuration.py b/croud/config/configuration.py index c5a0a8a1..8e69fd60 100644 --- a/croud/config/configuration.py +++ b/croud/config/configuration.py @@ -16,7 +16,6 @@ # However, if you have executed another commercial license agreement # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. - from pathlib import Path from typing import Any, Dict, Optional @@ -107,6 +106,18 @@ def region(self) -> Optional[str]: def organization(self) -> Optional[str]: return self.profile.get("organization-id") # type: ignore + @property + def gc_jwt_token(self) -> Optional[str]: + return self.profile.get("gc_jwt_token") + + @property + def gc_jwt_token_expiry(self) -> Optional[str]: + return self.profile.get("gc_jwt_token_expiry") + + @property + def gc_cluster_id(self) -> Optional[str]: + return self.profile.get("gc_cluster_id") + @property def profile(self) -> ProfileType: return self.profiles[self.name] # type: ignore @@ -165,6 +176,24 @@ def set_auth_token(self, profile: str, value: str) -> None: def set_current_auth_token(self, value: str) -> None: self.set_auth_token(self.name, value) + def set_gc_jwt_token(self, profile: str, value: str) -> None: + self._set_profile_option(profile, "gc_jwt_token", value) + + def set_current_gc_jwt_token(self, value: str) -> None: + self.set_gc_jwt_token(self.name, value) + + def set_gc_jwt_token_expiry(self, profile: str, value: str) -> None: + self._set_profile_option(profile, "gc_jwt_token_expiry", value) + + def set_current_gc_jwt_token_expiry(self, value: str) -> None: + self.set_gc_jwt_token_expiry(self.name, value) + + def set_gc_cluster_id(self, profile: str, value: str) -> None: + self._set_profile_option(profile, "gc_cluster_id", value) + + def set_current_gc_cluster_id(self, value: str) -> None: + self.set_gc_cluster_id(self.name, value) + def set_format(self, profile: str, value: str) -> None: self._set_profile_option(profile, "format", value) diff --git a/croud/config/schemas.py b/croud/config/schemas.py index 1239755f..f8759dcb 100644 --- a/croud/config/schemas.py +++ b/croud/config/schemas.py @@ -44,6 +44,25 @@ class ProfileSchema(Schema): attribute="organization-id", data_key="organization-id", allow_none=True ) region = fields.String(required=False) + gc_endpoint = fields.String(required=False) + gc_jwt_token = fields.String( + attribute="gc_jwt_token", + data_key="gc_jwt_token", + required=False, + allow_none=True, + ) + gc_jwt_token_expiry = fields.String( + attribute="gc_jwt_token_expiry", + data_key="gc_jwt_token_expiry", + required=False, + allow_none=True, + ) + gc_cluster_id = fields.String( + attribute="gc_cluster_id", + data_key="gc_cluster_id", + required=False, + allow_none=True, + ) class ConfigSchema(Schema): diff --git a/croud/scheduledjobs/__init__.py b/croud/scheduledjobs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/croud/scheduledjobs/commands.py b/croud/scheduledjobs/commands.py new file mode 100644 index 00000000..949a3ee3 --- /dev/null +++ b/croud/scheduledjobs/commands.py @@ -0,0 +1,108 @@ +# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +from argparse import Namespace + +from yarl import URL + +from croud.api import Client +from croud.config import CONFIG, get_output_format +from croud.printer import print_response +from croud.util import grand_central_jwt_token + + +@grand_central_jwt_token +def get_scheduled_jobs(args: Namespace) -> None: + client = _get_gc_client(args) + + data, errors = client.get("/api/scheduled-jobs/") + + print_response( + data=data, + errors=errors, + keys=["name", "id", "cron", "sql", "enabled", "next_run_time"], + output_fmt=get_output_format(args), + ) + + if errors or not data: + return + + +@grand_central_jwt_token +def get_scheduled_job_log(args: Namespace) -> None: + client = _get_gc_client(args) + + data, errors = client.get(f"/api/scheduled-jobs/{args.job_id}/log") + print_response( + data=data, + errors=errors, + keys=["job_id", "start", "end", "error", "statements"], + output_fmt=get_output_format(args), + ) + + if errors or not data: + return + + +@grand_central_jwt_token +def create_scheduled_job(args: Namespace) -> None: + body = { + "name": args.name, + "cron": args.cron, + "sql": args.sql, + "enabled": args.enabled, + } + + client = _get_gc_client(args) + + data, errors = client.post("/api/scheduled-jobs/", body=body) + print_response( + data=data, + errors=errors, + keys=["name", "id", "cron", "sql", "enabled"], + output_fmt=get_output_format(args), + ) + + if errors or not data: + return + + +@grand_central_jwt_token +def delete_scheduled_job(args: Namespace) -> None: + client = _get_gc_client(args) + + data, errors = client.delete(f"/api/scheduled-jobs/{args.job_id}") + print_response( + data=data, + errors=errors, + success_message="Scheduled job deleted.", + output_fmt=get_output_format(args), + ) + + +def _get_gc_client(args: Namespace) -> Client: + client = Client.from_args(args) + cluster, _ = client.get(f"/api/v2/clusters/{args.cluster_id}/") + + url_region_cloud = cluster.get("fqdn").split(".", 1)[1][:-1] # type: ignore + gc_url = f"https://{cluster.get('name')}.gc.{url_region_cloud}" # type: ignore + client.base_url = URL(gc_url) + client.session.cookies.set("cratedb_center_session", CONFIG.gc_jwt_token) + + return client diff --git a/croud/util.py b/croud/util.py index 4792cc11..06d221d7 100644 --- a/croud/util.py +++ b/croud/util.py @@ -23,8 +23,10 @@ import subprocess import webbrowser from argparse import Namespace +from datetime import datetime, timezone from typing import Tuple +from croud.api import Client from croud.config import CONFIG from croud.printer import print_error, print_info from croud.tools.spinner import HALO @@ -126,3 +128,31 @@ def _wrapper(cmd_args: Namespace): # decorator logic cmd(cmd_args) return _wrapper + + +def grand_central_jwt_token(cmd): + @functools.wraps(cmd) + def _wrapper(cmd_args: Namespace): + if CONFIG.gc_jwt_token: + if not CONFIG.gc_cluster_id == cmd_args.cluster_id: + _set_gc_jwt(cmd_args) + elif ( + str(datetime.now(tz=timezone.utc).isoformat()) + > CONFIG.gc_jwt_token_expiry + ): + _set_gc_jwt(cmd_args) + else: + _set_gc_jwt(cmd_args) + + cmd(cmd_args) + + return _wrapper + + +def _set_gc_jwt(cmd_args: Namespace) -> None: + client = Client.from_args(cmd_args) + data, errors = client.get(f"/api/v2/clusters/{cmd_args.cluster_id}/jwt/") + + CONFIG.set_current_gc_jwt_token(data.get("token")) # type: ignore + CONFIG.set_current_gc_cluster_id(cmd_args.cluster_id) # type: ignore + CONFIG.set_current_gc_jwt_token_expiry(data.get("expiry")) # type: ignore diff --git a/docs/commands/index.rst b/docs/commands/index.rst index b66a787c..587f23b4 100644 --- a/docs/commands/index.rst +++ b/docs/commands/index.rst @@ -46,6 +46,7 @@ To get help for a specific command, you can append ``--help``: users api-keys regions + scheduled-jobs .. note:: diff --git a/docs/commands/scheduled-jobs.rst b/docs/commands/scheduled-jobs.rst new file mode 100644 index 00000000..5789691f --- /dev/null +++ b/docs/commands/scheduled-jobs.rst @@ -0,0 +1,108 @@ +================== +``scheduled-jobs`` +================== + +The ``scheduled-jobs`` command lets you manage scheduled sql jobs for your cluster. + +.. tip:: + + Scheduled sql jobs are an easy way to setup sql statements that need + to be run in a certain interval to manage your clusters data. + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: scheduled-jobs + :nosubcommands: + + +``scheduled-jobs create`` +========================= + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: scheduled-jobs create + +Example +------- + +.. code-block:: console + + sh$ croud scheduled-jobs create \ + --name test-job \ + --cluster-id 8d6a7c3c-61d5-11e9-a639-34e12d2331a1 \ + --cron "1 1 * * *" \ + --sql "DELETE * FROM TABLE test" \ + --enabled True + +----------+---------------+-----------+-------------------+-----------+ + | name | id | cron | sql | enabled | + |----------+---------------+-----------+-------------------+-----------| + | test-job | 0EW7SX3ND87DY | 1 1 * * * | DELETE FROM test | TRUE | + +----------+---------------+-----------+-------------------+-----------+ + +``scheduled-jobs list`` +======================= + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: scheduled-jobs list + +Example +------- + +.. code-block:: console + + sh$ croud scheduled-jobs list \ + --cluster-id 8d6a7c3c-61d5-11e9-a639-34e12d2331a1 + +----------+---------------+-----------+-------------------+-----------+---------------------------+ + | name | id | cron | sql | enabled | next_run_time | + |----------+---------------+-----------+-------------------+-----------+---------------------------| + | test-job | 0EW7SX3ND87DY | 1 1 * * * | DELETE FROM test | TRUE | 2024-01-20T01:01:00+00:00 | + +----------+---------------+-----------+-------------------+-----------+---------------------------+ + +``scheduled-jobs logs`` +======================= + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: scheduled-jobs logs + +Example +------- + +.. code-block:: console + + sh$ croud scheduled-jobs logs \ + --job-id 0EW7SX3ND87DY \ + --cluster-id 8d6a7c3c-61d5-11e9-a639-34e12d2331a1 + +---------------+----------------------------+----------------------------+---------+-----------------------------------------------------------------------+ + | job_id | start | end | error | statements | + |---------------+----------------------------+----------------------------+---------+-----------------------------------------------------------------------| + | 0EW7SX3ND87DY | 2024-01-20T08:52:00.008000 | 2024-01-29T08:52:00.014000 | NULL | {"0": {"duration": 0.0021747201681137085, "sql": "DELETE FROM test"}} | + +---------------+----------------------------+----------------------------+---------+-----------------------------------------------------------------------+ + +``scheduled-jobs delete`` +========================= + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: scheduled-jobs delete + +Example +------- + +.. code-block:: console + + sh$ croud scheduled-jobs delete \ + --job-id 0EW7SX3ND87DY + --cluster-id 8d6a7c3c-61d5-11e9-a639-34e12d2331a1 + ==> Success: Scheduled job deleted. diff --git a/tests/commands/test_scheduled_jobs.py b/tests/commands/test_scheduled_jobs.py new file mode 100644 index 00000000..591f5de0 --- /dev/null +++ b/tests/commands/test_scheduled_jobs.py @@ -0,0 +1,215 @@ +from unittest import mock + +from croud.api import Client, RequestMethod +from tests.util import assert_rest, call_command, gen_uuid + + +@mock.patch.object(Client, "request", return_value=({}, None)) +def test_create_job(mock_request): + def mock_call(*args, **kwargs): + if args[0] == RequestMethod.POST: + return { + "name": "test-job", + "id": gen_uuid(), + "cron": "1 1 * * *", + "sql": "CREATE TABLE test (id TEXT)", + "enable": True, + }, None + if args[0] == RequestMethod.GET and "/jwt/" in args[1]: + return { + "token": "xyz", + "expiry": "01.02.2024", + }, None + if args[0] == RequestMethod.GET: + return {"fqdn": "my.cluster.cloud", "name": "mycluster"}, None + return None, None + + mock_request.side_effect = mock_call + + cluster_id = gen_uuid() + call_command( + "croud", + "scheduled-jobs", + "create", + "--name", + "test-job", + "--cluster-id", + cluster_id, + "--cron", + "1 1 * * *", + "--sql", + "CREATE TABLE test (id TEXT)", + "--enabled", + "True", + ) + + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/jwt/", + any_times=True, + ) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/", + any_times=True, + ) + assert_rest( + mock_request, + RequestMethod.POST, + "/api/scheduled-jobs/", + body={ + "name": "test-job", + "cron": "1 1 * * *", + "sql": "CREATE TABLE test (id TEXT)", + "enabled": True, + }, + any_times=True, + ) + + +@mock.patch.object(Client, "request", return_value=({}, None)) +def test_get_scheduled_jobs(mock_request): + def mock_call(*args, **kwargs): + if args[0] == RequestMethod.GET and "/jwt/" in args[1]: + return { + "token": "xyz", + "expiry": "01.02.2024", + }, None + if args[0] == RequestMethod.GET: + return {"fqdn": "my.cluster.cloud", "name": "mycluster"}, None + if args[0] == RequestMethod.GET and "/scheduled-jobs/" in args[1]: + return { + "name": "test-job", + "id": gen_uuid(), + "cron": "1 1 * * *", + "sql": "CREATE TABLE test (id TEXT)", + "enabled": True, + "next_run_time": "02.02.2024", + }, None + return None, None + + mock_request.side_effect = mock_call + + cluster_id = gen_uuid() + call_command("croud", "scheduled-jobs", "list", "--cluster-id", cluster_id) + + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/jwt/", + any_times=True, + ) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/", + any_times=True, + ) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/scheduled-jobs/", + any_times=True, + ) + + +@mock.patch.object(Client, "request", return_value=({}, None)) +def test_get_scheduled_job_log(mock_request): + job_id = gen_uuid() + + def mock_call(*args, **kwargs): + if args[0] == RequestMethod.GET and "/jwt/" in args[1]: + return { + "token": "xyz", + "expiry": "01.02.2024", + }, None + if args[0] == RequestMethod.GET: + return {"fqdn": "my.cluster.cloud", "name": "mycluster"}, None + if args[0] == RequestMethod.GET and "/scheduled-jobs/" in args[1]: + return { + "job_id": job_id, + "start": "02.02.2024", + "end": "03.02.2024", + "error": None, + "statements": "CREATE TABLE test (id TEXT)", + }, None + return None, None + + mock_request.side_effect = mock_call + + cluster_id = gen_uuid() + call_command( + "croud", + "scheduled-jobs", + "logs", + "--job-id", + job_id, + "--cluster-id", + cluster_id, + ) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/jwt/", + any_times=True, + ) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/", + any_times=True, + ) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/scheduled-jobs/{job_id}/log", + any_times=True, + ) + + +@mock.patch.object(Client, "request", return_value=({}, None)) +def test_delete_scheduled_job(mock_request): + job_id = gen_uuid() + + def mock_call(*args, **kwargs): + if args[0] == RequestMethod.GET and "/jwt/" in args[1]: + return { + "token": "xyz", + "expiry": "01.02.2024", + }, None + if args[0] == RequestMethod.GET: + return {"fqdn": "my.cluster.cloud", "name": "mycluster"}, None + return None, None + + mock_request.side_effect = mock_call + + cluster_id = gen_uuid() + call_command( + "croud", + "scheduled-jobs", + "delete", + "--job-id", + job_id, + "--cluster-id", + cluster_id, + ) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/jwt/", + any_times=True, + ) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/", + any_times=True, + ) + assert_rest( + mock_request, + RequestMethod.DELETE, + f"/api/scheduled-jobs/{job_id}", + any_times=True, + )