From 82f1fc1d6e64a8794711e9d5ea973aafd00b8c1b Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Fri, 7 Jul 2023 11:09:32 +0200 Subject: [PATCH] Add support for cluster export jobs --- CHANGES.rst | 6 + croud/__main__.py | 92 ++++++++++++ croud/clusters/commands.py | 214 ++++++++++++++++++++++++--- croud/organizations/commands.py | 23 +++ docs/commands/clusters.rst | 91 ++++++++++++ docs/commands/organizations.rst | 79 ++++++++++ tests/commands/test_clusters.py | 182 +++++++++++++++++++++++ tests/commands/test_organizations.py | 21 +++ 8 files changed, 686 insertions(+), 22 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index c00c5615..48f97c32 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -9,6 +9,12 @@ Unreleased This is done by specifying a ``--org-id`` and ``--region``, at which point the project will be created on-the-fly. +- Added new resource ``clusters export-jobs`` and subcommands that allow + organization admins to manage export jobs. + +- Added new subcommand ``get`` for the resource ``organizations files`` that + returns a single file by its ID. + 1.4.0 - 2023/06/22 ================== diff --git a/croud/__main__.py b/croud/__main__.py index ad2eba4f..469b8cfa 100644 --- a/croud/__main__.py +++ b/croud/__main__.py @@ -47,6 +47,9 @@ clusters_snapshots_list, clusters_snapshots_restore, clusters_upgrade, + export_jobs_create, + export_jobs_delete, + export_jobs_list, import_jobs_create_from_file, import_jobs_create_from_url, import_jobs_delete, @@ -67,6 +70,7 @@ from croud.organizations.commands import ( org_files_create, org_files_delete, + org_files_get, org_files_list, organizations_create, organizations_delete, @@ -762,6 +766,78 @@ }, }, }, + "export-jobs": { + "help": "Manage data export jobs.", + "commands": { + "delete": { + "help": "Cancels an already running data export job that has " + "not finished yet. " + "If the job has already finished it deletes it from " + "the job history.", + "extra_args": [ + Argument( + "--cluster-id", type=str, required=True, + help="The cluster the job belongs to." + ), + Argument( + "--export-job-id", type=str, + required=True, + help="The ID of the export job." + ), + ], + "resolver": export_jobs_delete, + }, + "list": { + "help": "Lists data export jobs that belong to a specific " + "cluster.", + "extra_args": [ + Argument( + "--cluster-id", type=str, required=True, + help="The cluster the export jobs belong to." + ), + ], + "resolver": export_jobs_list, + }, + "create": { + "help": "Create a data export job for the specified cluster.", + "extra_args": [ + Argument( + "--cluster-id", type=str, required=True, + help="The cluster the data will be exported from." + ), + Argument( + "--table", + type=str, + required=True, + help="The table the data will be exported from.", + ), + Argument( + "--file-format", + type=str, + required=True, + choices=["csv", "json", "parquet"], + help="The format of the data in the file.", + ), + Argument( + "--compression", + type=str, + required=False, + choices=["gzip"], + help="The compression method of the exported file.", + ), + Argument( + "--save-as", + type=str, + required=False, + help="The file on your local filesystem the data will " + "be exported to. If not specified, you will " + "receive the URL to download the file.", + ), + ], + "resolver": export_jobs_create, + }, + }, + }, }, }, "products": { @@ -924,6 +1000,22 @@ "files": { "help": "Manage organization's files.", "commands": { + "get": { + "help": ( + "Get a file by its ID." + ), + "extra_args": [ + Argument( + "--org-id", type=str, required=True, + help="The organization ID to use.", + ), + Argument( + "--file-id", type=str, required=True, + help="The ID of the file.", + ), + ], + "resolver": org_files_get, + }, "list": { "help": "List all files uploaded to this organization.", "extra_args": [ diff --git a/croud/clusters/commands.py b/croud/clusters/commands.py index d4cad47f..23d9ed16 100644 --- a/croud/clusters/commands.py +++ b/croud/clusters/commands.py @@ -16,12 +16,17 @@ # However, if you have executed another commercial license agreement # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. +import functools +import pathlib import time from argparse import Namespace from datetime import datetime, timedelta, timezone -from typing import Any, Dict, Optional +from shutil import copyfileobj +from typing import Any, Dict, Optional, cast import bitmath +import requests +from tqdm.auto import tqdm from croud.api import Client from croud.clusters.exceptions import AsyncOperationNotFound @@ -253,25 +258,6 @@ def import_jobs_create(args: Namespace, extra_payload: Dict[str, Any]) -> None: output_fmt=get_output_format(args), ) - def import_job_feedback_func(status: str, feedback: dict): - num_records = feedback.get("progress", {}).get("records") - num_bytes = feedback.get("progress", {}).get("bytes") - - records_normalized = num_records - - if num_records > 1_000_000: - records_normalized = f"{records_normalized / 1_000_000:.2f} M" - elif num_records > 1_000: - records_normalized = f"{records_normalized / 1_000:.2f} K" - - size = bitmath.Byte(num_bytes).best_prefix().format("{value:.2f} {unit}") - if status == "SUCCEEDED": - print_info(f"Done importing {records_normalized} records and {size}.") - else: - print_info( - f"Importing... {records_normalized} records and {size} imported so far." - ) - if data: import_job_id = data["id"] @@ -280,7 +266,10 @@ def import_job_feedback_func(status: str, feedback: dict): cluster_id=args.cluster_id, request_params={"import_job_id": import_job_id}, operation_status_func=_get_import_job_operation_status, - feedback_func=import_job_feedback_func, + feedback_func=( + _data_job_feedback_func, + ("import",), + ), ) @@ -663,6 +652,83 @@ def clusters_snapshots_restore(args: Namespace) -> None: ) +def export_jobs_create(args: Namespace) -> None: + body = { + "source": { + "table": args.table, + }, + "destination": {"format": args.file_format}, + } + + if args.compression: + body["compression"] = args.compression + + client = Client.from_args(args) + data, errors = client.post( + f"/api/v2/clusters/{args.cluster_id}/export-jobs/", body=body + ) + print_response( + data=data, + errors=errors, + keys=["id", "cluster_id", "status"], + output_fmt=get_output_format(args), + ) + + if data: + export_job_id = data["id"] + + _wait_for_completed_operation( + client=client, + cluster_id=args.cluster_id, + request_params={"export_job_id": export_job_id}, + operation_status_func=_get_export_job_operation_status, + feedback_func=( + _data_job_feedback_func, + ("export",), + ), + post_success_func=( + _download_exported_file, + (client, args.cluster_id, args.save_as, export_job_id), + ), + ) + + +def export_jobs_delete(args: Namespace) -> None: + client = Client.from_args(args) + data, errors = client.delete( + f"/api/v2/clusters/{args.cluster_id}/export-jobs/{args.export_job_id}/" + ) + print_response( + data=data, + errors=errors, + keys=["id", "cluster_id", "status"], + output_fmt=get_output_format(args), + ) + + +def export_jobs_list(args: Namespace) -> None: + client = Client.from_args(args) + data, errors = client.get(f"/api/v2/clusters/{args.cluster_id}/export-jobs/") + print_response( + data=data, + errors=errors, + keys=["id", "cluster_id", "status", "source", "destination"], + output_fmt=get_output_format(args), + transforms={ + "source": _transform_export_job_source, + "destination": _transform_export_job_destination, + }, + ) + + +def _transform_export_job_source(field): + return field["table"] + + +def _transform_export_job_destination(field): + return f"Format: {field['format']}\nFile ID: {field.get('file', {}).get('id')}" + + # We want to map the custom hardware specs to slightly nicer params in croud, # hence this mapping here def _handle_edge_params(body, args): @@ -717,6 +783,105 @@ def _get_import_job_operation_status( return status, msg, feedback_data +def _get_formatted_size(feedback: dict) -> str: + num_bytes = feedback.get("progress", {}).get("bytes") + return bitmath.Byte(num_bytes).best_prefix().format("{value:.2f} {unit}") + + +def _get_formatted_records_normalized(feedback: dict) -> str: + num_records = feedback.get("progress", {}).get("records") + + records_normalized = num_records + + if num_records > 1_000_000: + records_normalized = f"{records_normalized / 1_000_000:.2f} M" + elif num_records > 1_000: + records_normalized = f"{records_normalized / 1_000:.2f} K" + + return records_normalized + + +def _data_job_feedback_func(status: str, feedback: dict, job_type: str): + records_normalized = _get_formatted_records_normalized(feedback) + size = _get_formatted_size(feedback) + + if status == "SUCCEEDED": + print_info(f"Done {job_type}ing {records_normalized} records and {size}.") + else: + print_info( + f"{job_type}ing... {records_normalized} records and {size} {job_type}ed " + "so far." + ) + + +def _download_exported_file( + client: Client, cluster_id: str, save_as: str, export_job_id: str +): + data, errors = client.get( + f"/api/v2/clusters/{cluster_id}/export-jobs/{export_job_id}/" + ) + + if not data or not data.get("progress"): + raise AsyncOperationNotFound("Failed retrieving operation status.") + + status = data["status"] + if status == "SUCCEEDED": + file_id = data.get("destination", {}).get("file", {}).get("id") + if file_id: + org_id = _get_org_id_from_cluster_id(client, cluster_id) + data, errors = client.get( + f"/api/v2/organizations/{org_id}/files/{file_id}/" + ) + file_data: dict = cast(dict, data) + if not (file_data and file_data.get("download_url")): + print_error("File could not be fetched.") + if not save_as: + print_success(f"Download URL: {file_data['download_url']}") + return + HALO.stop() + print_info("Downloading file...") + + r = requests.get( + file_data["download_url"], stream=True, allow_redirects=True + ) + if r.status_code != 200: + r.raise_for_status() + print_error( + f"Request to {file_data['download_url']} returned status code " + f"{r.status_code}" + ) + file_size = int(r.headers.get("Content-Length", 0)) + + path = pathlib.Path(save_as).expanduser().resolve() + path.parent.mkdir(parents=True, exist_ok=True) + + desc = "(Unknown total file size)" if file_size == 0 else "" + r.raw.read = functools.partial(r.raw.read, decode_content=True) + with tqdm.wrapattr(r.raw, "read", total=file_size, desc=desc) as r_raw: + with path.open("wb") as f: + copyfileobj(r_raw, f) + + print_success(f"Successfully downloaded file to {path}") + + +def _get_export_job_operation_status( + client: Client, cluster_id: str, request_params: Dict +): + export_job_id = request_params["export_job_id"] + data, errors = client.get( + f"/api/v2/clusters/{cluster_id}/export-jobs/{export_job_id}/" + ) + + if not data or not data.get("progress"): + raise AsyncOperationNotFound("Failed retrieving export operation status.") + + status = data["status"] + feedback_data = {"progress": data["progress"]} + msg = data["progress"]["message"] + + return status, msg, feedback_data + + def _wait_for_completed_operation( *, client: Client, @@ -724,6 +889,7 @@ def _wait_for_completed_operation( request_params: Dict, operation_status_func=_get_operation_status, feedback_func=None, + post_success_func=None, ): last_status = None last_msg = None @@ -747,10 +913,14 @@ def _wait_for_completed_operation( # Call for custom feedback if function available and there is status to report. if status in ["IN_PROGRESS", "SUCCEEDED"] and feedback_func: - feedback_func(status, feedback) + (feedback_func, feedback_args) = feedback_func + feedback_func(status, feedback, *feedback_args) # Final statuses if status == "SUCCEEDED": + if post_success_func: + (func, call_args) = post_success_func + func(*call_args) print_success("Operation completed.") break if status == "FAILED": diff --git a/croud/organizations/commands.py b/croud/organizations/commands.py index 4e7e6bd6..9ca404bd 100644 --- a/croud/organizations/commands.py +++ b/croud/organizations/commands.py @@ -20,6 +20,7 @@ from argparse import Namespace from typing import Any, Tuple +import bitmath import requests from tqdm import tqdm from tqdm.utils import CallbackIOWrapper @@ -193,3 +194,25 @@ def org_files_delete(args: Namespace) -> None: success_message="File upload deleted.", output_fmt=get_output_format(args), ) + + +def org_files_get(args: Namespace) -> None: + client = Client.from_args(args) + data, errors = client.get( + f"/api/v2/organizations/{args.org_id}/files/{args.file_id}/" + ) + print_response( + data=data, + errors=errors, + keys=["id", "name", "status", "file_size", "download_url"], + output_fmt=get_output_format(args), + transforms={ + "file_size": _transform_file_size, + }, + ) + + +def _transform_file_size(size_bytes): + if not size_bytes: + return None + return bitmath.Byte(size_bytes).best_prefix().format("{value:.2f} {unit}") diff --git a/docs/commands/clusters.rst b/docs/commands/clusters.rst index ede29368..9014ba28 100644 --- a/docs/commands/clusters.rst +++ b/docs/commands/clusters.rst @@ -585,4 +585,95 @@ Example This command will wait for the operation to finish or fail. It is only available to organization and project admins. + +``clusters export-jobs`` +======================== + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: clusters export-jobs + :nosubcommands: + + +``clusters export-jobs create`` +=============================== + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: clusters export-jobs create + +Example +------- + +.. code-block:: console + + sh$ ❯ croud clusters export-jobs create --cluster-id f6c39580-5719-431d-a508-0cee4f9e8209 \ + --table nyc_taxi --file-format csv + +--------------------------------------+--------------------------------------+------------+ + | id | cluster_id | status | + |--------------------------------------+--------------------------------------+------------| + | 85dc0024-b049-4b9d-b100-4bf850881692 | f6c39580-5719-431d-a508-0cee4f9e8209 | REGISTERED | + +--------------------------------------+--------------------------------------+------------+ + ==> Info: Status: SENT (Your creation request was sent to the region.) + ==> Info: Status: IN_PROGRESS (Export in progress) + ==> Info: Exporting... 2.00 K records and 19.53 KiB exported so far. + ==> Info: Exporting... 4.00 K records and 39.06 KiB exported so far. + ==> Info: Done exporting 6.00 K records and 58.59 KiB. + ==> Success: Download URL: https://cratedb-file-uploads.s3.amazonaws.com/some/download + ==> Success: Operation completed. + + +.. NOTE:: + + This command will wait for the operation to finish or fail. It is only available + to organization admins. + + +``clusters export-jobs list`` +============================= + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: clusters export-jobs list + +Example +------- + +.. code-block:: console + + sh$ ❯ croud clusters export-jobs list \ + --cluster-id f6c39580-5719-431d-a508-0cee4f9e8209 + +--------------------------------------+--------------------------------------+-----------+---------------------+-----------------------------------------------+ + | id | cluster_id | status | source | destination | + |--------------------------------------+--------------------------------------+-----------+---------------------+-----------------------------------------------| + | b311ba9d-9cb4-404a-b58d-c442ae251dbf | f6c39580-5719-431d-a508-0cee4f9e8209 | SUCCEEDED | nyc_taxi | Format: csv | + | | | | | File ID: 327ad0e6-607f-4f99-a4cc-c1e98bf28e4d | + +--------------------------------------+--------------------------------------+-----------+---------------------+-----------------------------------------------+ + + +``clusters export-jobs delete`` +=============================== + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: clusters export-jobs delete + +Example +------- + +.. code-block:: console + + sh$ ❯ croud clusters export-jobs delete \ + --cluster-id f6c39580-5719-431d-a508-0cee4f9e8209 \ + --export-job-id 3b311ba9d-9cb4-404a-b58d-c442ae251dbf + ==> Success: Success. + .. _here: https://hub.docker.com/r/crate/crate/tags diff --git a/docs/commands/organizations.rst b/docs/commands/organizations.rst index bc089550..35508bbc 100644 --- a/docs/commands/organizations.rst +++ b/docs/commands/organizations.rst @@ -234,3 +234,82 @@ Example --org-id f6c39580-5719-431d-a508-0cee4f9e8209 \ --user john.doe@example.io ==> Success: User removed from organization. + + +``organizations files`` +======================= + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: organizations files + :nosubcommands: + + +``organizations files list`` +---------------------------- + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: organizations files list + +Example +....... + +.. code-block:: console + + sh$ croud organizations files list \ + --org-id f6c39580-5719-431d-a508-0cee4f9e8209 + +--------------------------------------+---------------------+----------+ + | id | name | status | + |--------------------------------------+---------------------+----------| + | 9b5d438f-036c-410f-b6f4-9adfb1feb252 | nyc_taxi | UPLOADED | + +--------------------------------------+---------------------+----------+ + + +``organizations files delete`` +------------------------------ + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: organizations files delete + +Example +....... + +.. code-block:: console + + sh$ croud organizations files delete \ + --org-id f6c39580-5719-431d-a508-0cee4f9e8209 \ + --file-id 327ad0e6-607f-4f99-a4cc-c1e98bf28e4d + ==> Success: File upload deleted. + + +``organizations files get`` +------------------------------ + +.. argparse:: + :module: croud.__main__ + :func: get_parser + :prog: croud + :path: organizations files get + +Example +....... + +.. code-block:: console + + sh$ croud organizations files get \ + --org-id f6c39580-5719-431d-a508-0cee4f9e8209 \ + --file-id 327ad0e6-607f-4f99-a4cc-c1e98bf28e4d + +--------------------------------------+----------+----------+-------------+-------------------------------------------------------------+ + | id | name | status | file_size | download_url | + |--------------------------------------+----------+----------+-------------+-------------------------------------------------------------| + | 327ad0e6-607f-4f99-a4cc-c1e98bf28e4d | nyc_taxi | UPLOADED | 107.56 MiB | https://cratedb-file-uploads.s3.amazonaws.com/some/download | + +--------------------------------------+----------+----------+-------------+-------------------------------------------------------------+ + diff --git a/tests/commands/test_clusters.py b/tests/commands/test_clusters.py index a99812ca..c7106be2 100644 --- a/tests/commands/test_clusters.py +++ b/tests/commands/test_clusters.py @@ -1619,3 +1619,185 @@ def test_import_job_list(mock_request): "/api/v2/clusters/123/import-jobs/", params=None, ) + + +@mock.patch.object( + Client, + "request", + return_value=( + [ + { + "cluster_id": "12345", + "dc": { + "created": "2023-07-04T10:12:29.763000+00:00", + "modified": "2023-07-04T10:12:29.763000+00:00", + }, + "source": {"table": "my_table"}, + "destination": {"format": "csv", "file": {"id": ""}}, + "id": "45678", + "progress": { + "bytes": 0, + "message": "Failed", + "records": 0, + }, + "status": "FAILED", + } + ], + None, + ), +) +def test_export_job_list(mock_request): + cluster_id = gen_uuid() + call_command("croud", "clusters", "export-jobs", "list", "--cluster-id", cluster_id) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/export-jobs/", + params=None, + ) + + +@mock.patch.object(Client, "request", return_value=({}, None)) +def test_export_job_delete(mock_request): + cluster_id = gen_uuid() + export_job_id = gen_uuid() + call_command( + "croud", + "clusters", + "export-jobs", + "delete", + "--export-job-id", + export_job_id, + "--cluster-id", + cluster_id, + ) + assert_rest( + mock_request, + RequestMethod.DELETE, + f"/api/v2/clusters/{cluster_id}/export-jobs/{export_job_id}/", + ) + + +@pytest.mark.parametrize("save_file", [True, False]) +@mock.patch("croud.clusters.commands.copyfileobj") +@mock.patch.object(Client, "request") +def test_export_job_create(mock_client_request, mock_copy, save_file): + cluster_id = gen_uuid() + project_id = gen_uuid() + org_id = gen_uuid() + file_id = gen_uuid() + export_job_id = gen_uuid() + mock_client_request.side_effect = [ + ( + { + "id": export_job_id, + "status": "IN_PROGRESS", + "progress": {"message": "Export in progress."}, + }, + None, + ), + ( + { + "id": export_job_id, + "status": "SUCCEEDED", + "destination": {"file": {"id": file_id}}, + "progress": { + "message": "Export finished successfully.", + "records": 123, + "bytes": 456, + }, + }, + None, + ), + ( + { + "id": export_job_id, + "status": "SUCCEEDED", + "destination": {"file": {"id": file_id}}, + "progress": { + "message": "Export finished successfully.", + "records": 123, + "bytes": 456, + }, + }, + None, + ), + ({"project_id": project_id}, None), + ({"organization_id": org_id}, None), + ( + {"download_url": "https://s3-presigned-url.s3.amazonaws.com/some/bla"}, + None, + ), + ] + + cluster_id = gen_uuid() + + mock_response = mock.MagicMock() + mock_response.headers = {"Content-Length": 123} + mock_response.status_code = 200 + + with mock.patch("requests.get", return_value=mock_response): + with mock.patch("builtins.open", mock.mock_open(read_data="id,name,path")): + cmd = ( + "croud", + "clusters", + "export-jobs", + "create", + "--cluster-id", + cluster_id, + "--file-format", + "csv", + "--table", + "my-table", + "--compression", + "gzip", + ) + if save_file: + cmd = cmd + ( + "--save-as", + "./my_table.csv", + ) + call_command(*cmd) + + body = { + "source": { + "table": "my-table", + }, + "destination": {"format": "csv"}, + "compression": "gzip", + } + assert_rest( + mock_client_request, + RequestMethod.POST, + f"/api/v2/clusters/{cluster_id}/export-jobs/", + body=body, + any_times=True, + ) + assert_rest( + mock_client_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/export-jobs/{export_job_id}/", + any_times=True, + ) + assert_rest( + mock_client_request, + RequestMethod.GET, + f"/api/v2/clusters/{cluster_id}/", + any_times=True, + ) + assert_rest( + mock_client_request, + RequestMethod.GET, + f"/api/v2/projects/{project_id}/", + any_times=True, + ) + assert_rest( + mock_client_request, + RequestMethod.GET, + f"/api/v2/organizations/{org_id}/files/{file_id}/", + any_times=True, + ) + if save_file: + mock_copy.assert_called_once() + else: + mock_copy.assert_not_called() diff --git a/tests/commands/test_organizations.py b/tests/commands/test_organizations.py index df6f6228..84c0ba96 100644 --- a/tests/commands/test_organizations.py +++ b/tests/commands/test_organizations.py @@ -373,6 +373,27 @@ def test_role_fqn_transform(): assert response == "organization_admin" +@mock.patch.object(Client, "request", return_value=({}, None)) +def test_organizations_files_get(mock_request): + org_id = gen_uuid() + file_id = gen_uuid() + call_command( + "croud", + "organizations", + "files", + "get", + "--org-id", + org_id, + "--file-id", + file_id, + ) + assert_rest( + mock_request, + RequestMethod.GET, + f"/api/v2/organizations/{org_id}/files/{file_id}/", + ) + + @mock.patch.object(Client, "request", return_value=({}, None)) def test_organizations_files_list(mock_request): org_id = gen_uuid()