From 0b862f3509f042fac431cf298b88ece12cdaa0a9 Mon Sep 17 00:00:00 2001 From: Ben Perry Date: Tue, 14 May 2024 13:41:25 -0500 Subject: [PATCH] Remove prefect examples, no longer supported by the saturn platform --- .saturn/templates-enterprise.json | 6 - .saturn/templates-hosted.json | 6 - examples/prefect/.saturn/saturn.json | 28 - examples/prefect/01-prefect-singlenode.ipynb | 465 ----------------- .../prefect/02-prefect-daskclusters.ipynb | 493 ------------------ .../prefect/03-prefect-resource-manager.ipynb | 294 ----------- examples/prefect/README.md | 22 - 7 files changed, 1314 deletions(-) delete mode 100644 examples/prefect/.saturn/saturn.json delete mode 100644 examples/prefect/01-prefect-singlenode.ipynb delete mode 100644 examples/prefect/02-prefect-daskclusters.ipynb delete mode 100644 examples/prefect/03-prefect-resource-manager.ipynb delete mode 100644 examples/prefect/README.md diff --git a/.saturn/templates-enterprise.json b/.saturn/templates-enterprise.json index b1c60e9f..ceba4247 100644 --- a/.saturn/templates-enterprise.json +++ b/.saturn/templates-enterprise.json @@ -78,12 +78,6 @@ "weight": 1200, "recipe_path": "examples/rapids/.saturn/saturn.json" }, - { - "title": "Prefect (Python)", - "thumbnail_image_url": "https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-thumbnails/prefect.png", - "weight": 1300, - "recipe_path": "examples/prefect/.saturn/saturn.json" - }, { "title": "Flux (Julia)", "thumbnail_image_url": "https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-thumbnails/flux-logo.png", diff --git a/.saturn/templates-hosted.json b/.saturn/templates-hosted.json index b1c60e9f..ceba4247 100644 --- a/.saturn/templates-hosted.json +++ b/.saturn/templates-hosted.json @@ -78,12 +78,6 @@ "weight": 1200, "recipe_path": "examples/rapids/.saturn/saturn.json" }, - { - "title": "Prefect (Python)", - "thumbnail_image_url": "https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-thumbnails/prefect.png", - "weight": 1300, - "recipe_path": "examples/prefect/.saturn/saturn.json" - }, { "title": "Flux (Julia)", "thumbnail_image_url": "https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-thumbnails/flux-logo.png", diff --git a/examples/prefect/.saturn/saturn.json b/examples/prefect/.saturn/saturn.json deleted file mode 100644 index 99daf74c..00000000 --- a/examples/prefect/.saturn/saturn.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "name": "example-prefect", - "image_uri": "public.ecr.aws/saturncloud/saturn-python:2022.06.01", - "description": "Scheduled jobs with Prefect and Prefect Cloud", - "environment_variables": { - "PREFECT_CLOUD_PROJECT_NAME": "dask-iz-gr8", - "SATURN__JUPYTER_SETUP_DASK_WORKSPACE": "true" - }, - "working_directory": "/home/jovyan/examples/examples/prefect", - "git_repositories": [ - { - "url": "https://github.com/saturncloud/examples", - "path": "/home/jovyan/examples" - } - ], - "jupyter_server": { - "disk_space": "10Gi", - "instance_type": "large" - }, - "credentials": [ - { - "name": "prefect-user-token", - "type": "environment_variable", - "location": "PREFECT_USER_TOKEN" - } - ], - "version": "2022.01.06" -} diff --git a/examples/prefect/01-prefect-singlenode.ipynb b/examples/prefect/01-prefect-singlenode.ipynb deleted file mode 100644 index 5556a302..00000000 --- a/examples/prefect/01-prefect-singlenode.ipynb +++ /dev/null @@ -1,465 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Run a Prefect Cloud Data Pipeline on a Single Machine" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "![Prefect Logo](https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-resources/prefect.png \"doc-image\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Overview\n", - "[Prefect Cloud](https://www.prefect.io/cloud/) is a hosted, high-availability, fault-tolerant service that handles all the orchestration responsibilities for running data pipelines. It gives you complete oversight of your workflows and makes it easy to manage them.\n", - "\n", - "This example shows how to create a project and execute tasks synchronously in the main thread (without using parallel computing). We will then register this flow with Prefect Cloud so that service can be used for orchestrating when the flow runs.\n", - "\n", - "### Model Details\n", - "\n", - "The data used for this example is the **\"Incident management process enriched event log\"** dataset [from the UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Incident+management+process+enriched+event+log).This dataset contains tickets from an IT support system, including characteristics like the priority of the ticket, the time it was opened, and the time it was closed.\n", - "\n", - "We will use this dataset in our example to solve following regression task:\n", - "\n", - "> Given the characteristics of a ticket, how long will it be until it is closed?\n", - "\n", - "We wil then evaluate the performance of above model, that predicts time-to-close for tickets in an IT support system.\n", - "\n", - "## Modelling Process\n", - "\n", - "### Prerequisites\n", - "* created a Prefect Cloud account\n", - "* set up the appropriate credentials in Saturn\n", - "* set up a Prefect Cloud agent in Saturn Cloud\n", - "\n", - "Details on these prerequisites can be found in [Using Prefect Cloud with Saturn Cloud](https://saturncloud.io/docs/using-saturn-cloud/prefect_cloud/).\n", - "\n", - "### Environment Setup\n", - "\n", - "Prefect Cloud data pipeline execution can be done 3 ways in Saturn Cloud :\n", - "1. On a Single Machine : You can run your code on a single node using local executor, which is default option in Saturn Cloud for running Prefect workflows. This is how we will be executing our pipeline in this example.\n", - "2. Distributing tasks across dasak clusters. Refer [prefect-daskclusters example](./02-prefect-daskclusters.ipynb) for this.\n", - "3. Parallelizng a task over dask clusters. Refer [prefect-resource-manager example](./03-prefect-resource-manager.ipynb).\n", - "\n", - "![Prefect Execution](https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-resources/prefect_execute.png \"doc-image\")\n", - "\n", - "The code in this example uses prefect for orchestration (figuring out what to do, and in what order) and local executor for execution (doing the things).\n", - "\n", - "It relies on the following additional non-builtin libraries:\n", - "\n", - "* numpy: data manipulation\n", - "* pandas: data manipulation\n", - "* requests: read in data from the internet\n", - "* scikit-learn: evaluation metric functions\n", - "* prefect-saturn: register Prefect flows with both Prefect Cloud and Saturn Cloud.\n", - "\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import json\n", - "import os\n", - "import uuid\n", - "from datetime import datetime, timedelta\n", - "from io import BytesIO\n", - "from zipfile import ZipFile\n", - "\n", - "import numpy as np\n", - "import pandas as pd\n", - "import requests\n", - "from prefect.schedules import IntervalSchedule\n", - "from sklearn.metrics import (\n", - " mean_absolute_error,\n", - " mean_squared_error,\n", - " median_absolute_error,\n", - " r2_score,\n", - ")\n", - "\n", - "import prefect\n", - "from prefect import Flow, Parameter, task\n", - "\n", - "PREFECT_CLOUD_PROJECT_NAME = os.environ[\"PREFECT_CLOUD_PROJECT_NAME\"]\n", - "SATURN_USERNAME = os.environ[\"SATURN_USERNAME\"]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Authenticate with Prefect Cloud." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!prefect auth login --key ${PREFECT_USER_TOKEN}" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "### Create a Prefect Cloud Project\n", - "\n", - "Prefect Cloud organizes flows within workspaces called \"projects\". Before you can register a flow with Prefect Cloud, it's necessary to create a project, if you don't have one yet.\n", - "\n", - "The code below will create a new project in whatever Prefect Cloud tenant you're authenticated with. If that project already exists, this code does not have any side effects." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "client = prefect.Client()\n", - "client.create_project(project_name=PREFECT_CLOUD_PROJECT_NAME)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "### Define Tasks\n", - "\n", - "Prefect refers to a workload as a \"flow\", which comprises multiple individual things to do called \"tasks\". From [the Prefect docs](https://docs.prefect.io/core/concepts/tasks.html):\n", - "\n", - "> A task is like a function: it optionally takes inputs, performs an action, and produces an optional result.\n", - "\n", - "The goal of this notebook's flow is to evaluate, on an ongoing basis, the performance of a model that predicts time-to-close for tickets in an IT support system.\n", - "\n", - "That can be broken down into the following tasks\n", - "\n", - "* `get_trial_id()`: assign a unique ID to each run\n", - "* `get_ticket_data_batch()`: get a random set of newly-closed tickets\n", - "* `get_target()`: given a batch of tickets, compute how long it took to close them\n", - "* `predict()`: predict the time-to-close, using the heuristic \"higher-priority tickets close faster\"\n", - "* `evaluate_model()`: compute evaluation metrics comparing predicted and actual time-to-close\n", - "* `get_trial_summary()`: collect all evaluation metrics in one object\n", - "* `write_trial_summary()`: write trial results somewhere" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "@task\n", - "def get_trial_id() -> str:\n", - " \"\"\"\n", - " Generate a unique identifier for this trial.\n", - " \"\"\"\n", - " return str(uuid.uuid4())\n", - "\n", - "\n", - "@task\n", - "def get_ticket_data_batch(batch_size: int) -> pd.DataFrame:\n", - " \"\"\"\n", - " Simulate the experience of getting a random sample of new tickets\n", - " from an IT system, to test the performance of a model.\n", - " \"\"\"\n", - " url = \"https://archive.ics.uci.edu/ml/machine-learning-databases/00498/incident_event_log.zip\"\n", - " resp = requests.get(url)\n", - " zipfile = ZipFile(BytesIO(resp.content))\n", - " data_file = \"incident_event_log.csv\"\n", - " # _date_parser has to be a lambda or pandas won't convert dates correctly\n", - " _date_parser = lambda x: pd.NaT if x == \"?\" else datetime.strptime(x, \"%d/%m/%Y %H:%M\")\n", - " df = pd.read_csv(\n", - " zipfile.open(data_file),\n", - " parse_dates=[\n", - " \"opened_at\",\n", - " \"resolved_at\",\n", - " \"closed_at\",\n", - " \"sys_created_at\",\n", - " \"sys_updated_at\",\n", - " ],\n", - " infer_datetime_format=False,\n", - " converters={\n", - " \"opened_at\": _date_parser,\n", - " \"resolved_at\": _date_parser,\n", - " \"closed_at\": _date_parser,\n", - " \"sys_created_at\": _date_parser,\n", - " \"sys_updated_at\": _date_parser,\n", - " },\n", - " na_values=[\"?\"],\n", - " )\n", - " df[\"sys_updated_at\"] = pd.to_datetime(df[\"sys_updated_at\"])\n", - " rows_to_score = np.random.randint(0, df.shape[0], 100)\n", - " return df.iloc[rows_to_score]\n", - "\n", - "\n", - "@task\n", - "def get_target(df):\n", - " \"\"\"\n", - " Compute time-til-close on a data frame of tickets\n", - " \"\"\"\n", - " time_til_close = (df[\"closed_at\"] - df[\"sys_updated_at\"]) / np.timedelta64(1, \"s\")\n", - " return time_til_close\n", - "\n", - "\n", - "@task\n", - "def predict(df):\n", - " \"\"\"\n", - " Given an input data frame, predict how long it will be until the ticket is closed.\n", - " For simplicity, using a super simple model that just says\n", - " \"high-priority tickets get closed faster\".\n", - " \"\"\"\n", - " seconds_in_an_hour = 60.0 * 60.0\n", - " preds = df[\"priority\"].map(\n", - " {\n", - " \"1 - Critical\": 6.0 * seconds_in_an_hour,\n", - " \"2 - High\": 24.0 * seconds_in_an_hour,\n", - " \"3 - Moderate\": 120.0 * seconds_in_an_hour,\n", - " \"4 - Lower\": 240.0 * seconds_in_an_hour,\n", - " }\n", - " )\n", - " default_guess_for_no_priority = 180.0 * seconds_in_an_hour\n", - " preds = preds.fillna(default_guess_for_no_priority)\n", - " return preds\n", - "\n", - "\n", - "@task\n", - "def evaluate_model(y_true, y_pred, metric_name: str) -> float:\n", - " metric_func_lookup = {\n", - " \"mae\": mean_absolute_error,\n", - " \"medae\": median_absolute_error,\n", - " \"mse\": mean_squared_error,\n", - " \"r2\": r2_score,\n", - " }\n", - " metric_func = metric_func_lookup[metric_name]\n", - " return metric_func(y_true, y_pred)\n", - "\n", - "\n", - "@task\n", - "def get_trial_summary(trial_id: str, actuals, input_df: pd.DataFrame, metrics: dict) -> dict:\n", - " out = {\"id\": trial_id}\n", - " out[\"data\"] = {\n", - " \"num_obs\": input_df.shape[0],\n", - " \"metrics\": metrics,\n", - " \"target\": {\n", - " \"mean\": actuals.mean(),\n", - " \"median\": actuals.median(),\n", - " \"min\": actuals.min(),\n", - " \"max\": actuals.max(),\n", - " },\n", - " }\n", - " return out\n", - "\n", - "\n", - "@task(log_stdout=True)\n", - "def write_trial_summary(trial_summary: str):\n", - " \"\"\"\n", - " Write out a summary of the file. Currently just logs back to the\n", - " Prefect logger\n", - " \"\"\"\n", - " logger = prefect.context.get(\"logger\")\n", - " logger.info(json.dumps(trial_summary))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "### Construct a Flow\n", - "\n", - "Now that all of the task logic has been defined, the next step is to compose those tasks into a \"flow\". From [the Prefect docs](https://docs.prefect.io/core/concepts/flows.html):\n", - "\n", - "> A Flow is a container for Tasks. It represents an entire workflow or application by describing the dependencies between tasks.\n", - "\n", - "> Flows are DAGs, or \"directed acyclic graphs.\" This is a mathematical way of describing certain organizational principles:\n", - "\n", - "> * A graph is a data structure that uses \"edges\" to connect \"nodes.\" Prefect models each Flow as a graph in which Task dependencies are modeled by Edges.\n", - "> * A directed graph means that edges have a start and an end: when two tasks are connected, one of them unambiguously runs first and the other one runs second.\n", - "> * An acyclic directed graph has no circular dependencies: if you walk through the graph, you will never revisit a task you've seen before.\n", - "\n", - "If you want to run this job to run on a schedule, include \"schedule\" object as one additional argument to `Flow()`. In this case, the code below says \"run this flow once every 24 hours\"." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "schedule = IntervalSchedule(interval=timedelta(hours=24))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "*NOTE: `prefect` flows do not have to be run on a schedule. If you want to run prefect flows on a schedule add `schedule=schedule` as an additional argument to `Flow()` in code below." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "with Flow(f\"{SATURN_USERNAME}-ticket-model-evaluation\") as flow:\n", - " batch_size = Parameter(\"batch-size\", default=1000)\n", - " trial_id = get_trial_id()\n", - "\n", - " # pull sample data\n", - " sample_ticket_df = get_ticket_data_batch(batch_size)\n", - "\n", - " # compute target\n", - " actuals = get_target(sample_ticket_df)\n", - "\n", - " # get prediction\n", - " preds = predict(sample_ticket_df)\n", - "\n", - " # compute evaluation metrics\n", - " mae = evaluate_model(actuals, preds, \"mae\")\n", - " medae = evaluate_model(actuals, preds, \"medae\")\n", - " mse = evaluate_model(actuals, preds, \"mse\")\n", - " r2 = evaluate_model(actuals, preds, \"r2\")\n", - "\n", - " # get trial summary in a string\n", - " trial_summary = get_trial_summary(\n", - " trial_id=trial_id,\n", - " input_df=sample_ticket_df,\n", - " actuals=actuals,\n", - " metrics={\"MAE\": mae, \"MedAE\": medae, \"MSE\": mse, \"R2\": r2},\n", - " )\n", - "\n", - " # store trial summary\n", - " trial_complete = write_trial_summary(trial_summary)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "### Register with Prefect Cloud\n", - "\n", - "Now that the business logic of the flow is complete, we can add information that Saturn Cloud will need to know to run it." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from prefect.executors import LocalExecutor\n", - "from prefect_saturn import PrefectCloudIntegration\n", - "\n", - "# tell Saturn Cloud about the flow\n", - "integration = PrefectCloudIntegration(prefect_cloud_project_name=PREFECT_CLOUD_PROJECT_NAME)\n", - "flow = integration.register_flow_with_saturn(flow=flow)\n", - "\n", - "# override the executor chosen by prefect-saturn\n", - "flow.executor = LocalExecutor()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next, run `register_flow_with_saturn().\n", - "\n", - "`register_flow_with_saturn()` does a few important things:\n", - "\n", - "* specifies how and where the flow's code is stored so it can be retrieved by a Prefect Cloud agent (see **flow.storage**)\n", - "* specifies the infrastructure needed to run the flow. In this case, it uses a **KubernetesJobEnvironment** \n", - "\n", - "The final step necessary is to \"register\" the flow with Prefect Cloud. If this is the first time you've registered this flow, it will create a new Prefect Cloud project `PREFECT_CLOUD_PROJECT_NAME`. If you already have a flow in this project with this name, it will create a new version of it in Prefect Cloud." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# tell Prefect Cloud about the flow\n", - "flow.register(project_name=PREFECT_CLOUD_PROJECT_NAME)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "### Run the Flow\n", - "\n", - "If you have scheduled your flow, it will be run once every 24 hours. You can confirm this by doing all of the following:\n", - "\n", - "* If you are an admin, go to Prefect Cloud Agent page of Saturn Cloud which is at the side bar and check logs for your agent.\n", - "* Go to Prefect Cloud. If you navigate to this flow and click \"Runs\", you should see task statuses and logs for this flow.\n", - "\n", - "If you have not scheduled your flow or want to run the flow immediately, navigate to the flow in the Prefect Cloud UI and click \"Quick Run\".\n", - "\n", - "\n", - "An alternative way to run the flow immediately is to open a terminal and run the code below.\n", - "\n", - "```shell\n", - "prefect auth login --key ${PREFECT_USER_TOKEN}\n", - "prefect run \\\n", - " --name ${SATURN_USERNAME}-ticket-model-evaluation \\\n", - " --project ${PREFECT_CLOUD_PROJECT_NAME}\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "## Conclusion\n", - "\n", - "Prefect makes your workflows more managable and fault tolerant. \n", - "In this example, you learned how to execute all tasks locally in a single thread with Prefect and register this flow with Prefect Cloud.Often your tasks have complex computations or dataset is huge. For such cases you can distribute work across Dask clusters. Check our examples for distributing single and multiple tasks across Dask Clusters." - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "saturn (Python 3)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.5" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/examples/prefect/02-prefect-daskclusters.ipynb b/examples/prefect/02-prefect-daskclusters.ipynb deleted file mode 100644 index f7bfd70c..00000000 --- a/examples/prefect/02-prefect-daskclusters.ipynb +++ /dev/null @@ -1,493 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Run a Prefect Cloud Data Pipeline on a Dask Cluster" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "![Prefect and Dask logos](https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-resources/prefect_dask.png \"doc-image\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Overview\n", - "[Prefect Cloud](https://www.prefect.io/cloud/) is a hosted, high-availability, fault-tolerant service that handles all the orchestration responsibilities for running data pipelines. It gives you complete oversight of your workflows and makes it easy to manage them.\n", - "\n", - "This example shows how to create a set of tasks for a project and execute these tasks on `dask`. We will then register this flow with Prefect Cloud so that service can be used for orchestrating when the flow runs.\n", - "\n", - "By using a Dask cluster, tasks can be executed parallel to each other across multiple machines which can dramatically\n", - "speed up overall run time. If you only want to use a single machine for the entire run, see\n", - "[the single machine example](./01-prefect-singlenode.ipynb).\n", - "\n", - "The code within each task will only be running on a single machine in the cluster. There might be situations where\n", - "you want the tasks be able to execute on a Dask cluster themselves, such as when each task is parallelizable within itself.\n", - "For an example of that, see [Run a Prefect Cloud Data Pipeline with access to a Dask Cluster](./03-prefect-resource-manager.ipynb).\n", - "\n", - "![Prefect Execution](https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-resources/prefect_execute.png \"doc-image\")\n", - "\n", - "\n", - "\n", - "### Model Details\n", - "\n", - "The data used for this example is the **\"Incident management process enriched event log\"** dataset [from the UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Incident+management+process+enriched+event+log).This dataset contains tickets from an IT support system, including characteristics like the priority of the ticket, the time it was opened, and the time it was closed.\n", - "\n", - "We will use this dataset in our example to solve following regression task:\n", - "\n", - "> Given the characteristics of a ticket, how long will it be until it is closed?\n", - "\n", - "We wil then evaluate the performance of above model, that predicts time-to-close for tickets in an IT support system.\n", - "\n", - "\n", - "## Modelling Process\n", - "\n", - "### Prerequisites\n", - "* created a Prefect Cloud account\n", - "* set up the appropriate credentials in Saturn\n", - "* set up a Prefect Cloud agent in Saturn Cloud\n", - "\n", - "Details on these prerequisites can be found in [Using Prefect Cloud with Saturn Cloud](https://saturncloud.io/docs/using-saturn-cloud/prefect_cloud/).\n", - "\n", - "### Environment Setup\n", - "\n", - "The code in this example uses prefect for orchestration (figuring out what to do, and in what order) and Dask Cluster for execution (doing the things).\n", - "\n", - "It relies on the following additional non-builtin libraries:\n", - "\n", - "* numpy: data manipulation\n", - "* pandas: data manipulation\n", - "* requests: read in data from the internet\n", - "* scikit-learn: evaluation metric functions\n", - "* [dask-saturn](https://github.com/saturncloud/dask-saturn): create and interact with Saturn Cloud Dask clusters.\n", - "* [prefect-saturn](https://github.com/saturncloud/prefect-saturn): register Prefect flows with both Prefect Cloud and have them run on Saturn Cloud Dask clusters." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import json\n", - "import os\n", - "import uuid\n", - "from datetime import datetime, timedelta\n", - "from io import BytesIO\n", - "from zipfile import ZipFile\n", - "\n", - "import numpy as np\n", - "import pandas as pd\n", - "import requests\n", - "from prefect.schedules import IntervalSchedule\n", - "from prefect_saturn import PrefectCloudIntegration\n", - "from sklearn.metrics import (\n", - " mean_absolute_error,\n", - " mean_squared_error,\n", - " median_absolute_error,\n", - " r2_score,\n", - ")\n", - "\n", - "import prefect\n", - "from prefect import Flow, Parameter, task\n", - "\n", - "PREFECT_CLOUD_PROJECT_NAME = os.environ[\"PREFECT_CLOUD_PROJECT_NAME\"]\n", - "SATURN_USERNAME = os.environ[\"SATURN_USERNAME\"]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Authenticate with Prefect Cloud." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!prefect auth login --key ${PREFECT_USER_TOKEN}" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "### Create a Prefect Cloud Project\n", - "\n", - "Prefect Cloud organizes flows within workspaces called \"projects\". Before you can register a flow with Prefect Cloud, it's necessary to create a project, if you don't have one yet.\n", - "\n", - "The code below will create a new project in whatever Prefect Cloud tenant you're authenticated with. If that project already exists, this code does not have any side effects." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "client = prefect.Client()\n", - "client.create_project(project_name=PREFECT_CLOUD_PROJECT_NAME)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "### Define Tasks\n", - "\n", - "Prefect refers to a workload as a \"flow\", which comprises multiple individual things to do called \"tasks\". From [the Prefect docs](https://docs.prefect.io/core/concepts/tasks.html):\n", - "\n", - "> A task is like a function: it optionally takes inputs, performs an action, and produces an optional result.\n", - "\n", - "The goal of this notebook's flow is to evaluate, on an ongoing basis, the performance of a model that predicts time-to-close for tickets in an IT support system.\n", - "\n", - "That can be broken down into the following tasks\n", - "\n", - "* `get_trial_id()`: assign a unique ID to each run\n", - "* `get_ticket_data_batch()`: get a random set of newly-closed tickets\n", - "* `get_target()`: given a batch of tickets, compute how long it took to close them\n", - "* `predict()`: predict the time-to-close, using the heuristic \"higher-priority tickets close faster\"\n", - "* `evaluate_model()`: compute evaluation metrics comparing predicted and actual time-to-close\n", - "* `get_trial_summary()`: collect all evaluation metrics in one object\n", - "* `write_trial_summary()`: write trial results somewhere" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "@task\n", - "def get_trial_id() -> str:\n", - " \"\"\"\n", - " Generate a unique identifier for this trial.\n", - " \"\"\"\n", - " return str(uuid.uuid4())\n", - "\n", - "\n", - "@task\n", - "def get_ticket_data_batch(batch_size: int) -> pd.DataFrame:\n", - " \"\"\"\n", - " Simulate the experience of getting a random sample of new tickets\n", - " from an IT system, to test the performance of a model.\n", - " \"\"\"\n", - " url = \"https://archive.ics.uci.edu/ml/machine-learning-databases/00498/incident_event_log.zip\"\n", - " resp = requests.get(url)\n", - " zipfile = ZipFile(BytesIO(resp.content))\n", - " data_file = \"incident_event_log.csv\"\n", - " # _date_parser has to be a lambda or pandas won't convert dates correctly\n", - " _date_parser = lambda x: pd.NaT if x == \"?\" else datetime.strptime(x, \"%d/%m/%Y %H:%M\")\n", - " df = pd.read_csv(\n", - " zipfile.open(data_file),\n", - " parse_dates=[\n", - " \"opened_at\",\n", - " \"resolved_at\",\n", - " \"closed_at\",\n", - " \"sys_created_at\",\n", - " \"sys_updated_at\",\n", - " ],\n", - " infer_datetime_format=False,\n", - " converters={\n", - " \"opened_at\": _date_parser,\n", - " \"resolved_at\": _date_parser,\n", - " \"closed_at\": _date_parser,\n", - " \"sys_created_at\": _date_parser,\n", - " \"sys_updated_at\": _date_parser,\n", - " },\n", - " na_values=[\"?\"],\n", - " )\n", - " df[\"sys_updated_at\"] = pd.to_datetime(df[\"sys_updated_at\"])\n", - " rows_to_score = np.random.randint(0, df.shape[0], 100)\n", - " return df.iloc[rows_to_score]\n", - "\n", - "\n", - "@task\n", - "def get_target(df):\n", - " \"\"\"\n", - " Compute time-til-close on a data frame of tickets\n", - " \"\"\"\n", - " time_til_close = (df[\"closed_at\"] - df[\"sys_updated_at\"]) / np.timedelta64(1, \"s\")\n", - " return time_til_close\n", - "\n", - "\n", - "@task\n", - "def predict(df):\n", - " \"\"\"\n", - " Given an input data frame, predict how long it will be until the ticket is closed.\n", - " For simplicity, using a super simple model that just says\n", - " \"high-priority tickets get closed faster\".\n", - " \"\"\"\n", - " seconds_in_an_hour = 60.0 * 60.0\n", - " preds = df[\"priority\"].map(\n", - " {\n", - " \"1 - Critical\": 6.0 * seconds_in_an_hour,\n", - " \"2 - High\": 24.0 * seconds_in_an_hour,\n", - " \"3 - Moderate\": 120.0 * seconds_in_an_hour,\n", - " \"4 - Lower\": 240.0 * seconds_in_an_hour,\n", - " }\n", - " )\n", - " default_guess_for_no_priority = 180.0 * seconds_in_an_hour\n", - " preds = preds.fillna(default_guess_for_no_priority)\n", - " return preds\n", - "\n", - "\n", - "@task\n", - "def evaluate_model(y_true, y_pred, metric_name: str) -> float:\n", - " metric_func_lookup = {\n", - " \"mae\": mean_absolute_error,\n", - " \"medae\": median_absolute_error,\n", - " \"mse\": mean_squared_error,\n", - " \"r2\": r2_score,\n", - " }\n", - " metric_func = metric_func_lookup[metric_name]\n", - " return metric_func(y_true, y_pred)\n", - "\n", - "\n", - "@task\n", - "def get_trial_summary(trial_id: str, actuals, input_df: pd.DataFrame, metrics: dict) -> dict:\n", - " out = {\"id\": trial_id}\n", - " out[\"data\"] = {\n", - " \"num_obs\": input_df.shape[0],\n", - " \"metrics\": metrics,\n", - " \"target\": {\n", - " \"mean\": actuals.mean(),\n", - " \"median\": actuals.median(),\n", - " \"min\": actuals.min(),\n", - " \"max\": actuals.max(),\n", - " },\n", - " }\n", - " return out\n", - "\n", - "\n", - "@task(log_stdout=True)\n", - "def write_trial_summary(trial_summary: str):\n", - " \"\"\"\n", - " Write out a summary of the file. Currently just logs back to the\n", - " Prefect logger\n", - " \"\"\"\n", - " logger = prefect.context.get(\"logger\")\n", - " logger.info(json.dumps(trial_summary))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "### Construct a Flow\n", - "\n", - "Now that all of the task logic has been defined, the next step is to compose those tasks into a \"flow\". From [the Prefect docs](https://docs.prefect.io/core/concepts/flows.html):\n", - "\n", - "> A Flow is a container for Tasks. It represents an entire workflow or application by describing the dependencies between tasks.\n", - "\n", - "> Flows are DAGs, or \"directed acyclic graphs.\" This is a mathematical way of describing certain organizational principles:\n", - "\n", - "> * A graph is a data structure that uses \"edges\" to connect \"nodes.\" Prefect models each Flow as a graph in which Task dependencies are modeled by Edges.\n", - "> * A directed graph means that edges have a start and an end: when two tasks are connected, one of them unambiguously runs first and the other one runs second.\n", - "> * An acyclic directed graph has no circular dependencies: if you walk through the graph, you will never revisit a task you've seen before.\n", - "\n", - "If you want to run this job to run on a schedule, include \"schedule\" object as one additional argument to `Flow()`. In this case, the code below says \"run this flow once every 24 hours\"." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "schedule = IntervalSchedule(interval=timedelta(hours=24))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "*NOTE: `prefect` flows do not have to be run on a schedule. If you want to run prefect flows on a schedule add `schedule=schedule` as an additional argument to `Flow()` in code below." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "with Flow(f\"{SATURN_USERNAME}-ticket-model-evaluation-dask\") as flow:\n", - " batch_size = Parameter(\"batch-size\", default=1000)\n", - " trial_id = get_trial_id()\n", - " # pull sample data\n", - " sample_ticket_df = get_ticket_data_batch(batch_size)\n", - " # compute target\n", - " actuals = get_target(sample_ticket_df)\n", - " # get prediction\n", - " preds = predict(sample_ticket_df)\n", - " # compute evaluation metrics\n", - " mae = evaluate_model(actuals, preds, \"mae\")\n", - " medae = evaluate_model(actuals, preds, \"medae\")\n", - " mse = evaluate_model(actuals, preds, \"mse\")\n", - " r2 = evaluate_model(actuals, preds, \"r2\")\n", - " # get trial summary in a string\n", - " trial_summary = get_trial_summary(\n", - " trial_id=trial_id,\n", - " input_df=sample_ticket_df,\n", - " actuals=actuals,\n", - " metrics={\"MAE\": mae, \"MedAE\": medae, \"MSE\": mse, \"R2\": r2},\n", - " )\n", - " # store trial summary\n", - " trial_complete = write_trial_summary(trial_summary)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "At this point, we have all of the work defined in tasks and arranged within a flow, but none of the tasks have run yet. In the next section, we'll do that using `Dask`." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Register with Prefect Cloud\n", - "\n", - "Now that the business logic of the flow is complete, we can add information that Saturn Cloud will need to know to run it." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "integration = PrefectCloudIntegration(prefect_cloud_project_name=PREFECT_CLOUD_PROJECT_NAME)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next, run `register_flow_with_saturn().\n", - "\n", - "`register_flow_with_saturn()` does a few important things:\n", - " \n", - "* specifies how and where the flow's code is stored so it can be retrieved by a Prefect Cloud agent (see `flow.storage`)\n", - "* specifies the infrastructure needed to run the flow. In this case, it uses a `KubernetesJobEnvironment` with a Saturn Dask cluster (see `flow.environment`)\n", - " \n", - "The code below also customizes the Dask cluster used when executing the flow.\n", - "\n", - "* `n_workers = 3`: use 3 workers\n", - "* `worker_size =\"xlarge\"`: each worker has 2 CPU cores and 16 GB RAM\n", - " - **NOTE**: you can find the full list of sizes with `prefect_saturn.describe_sizes()`\n", - "* `worker_is_spot = False`: don't use spot instances for workers\n", - "\n", - "**NOTE:** dask clusters associated with prefect cloud flows will be autoclosed when the flow run completes." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "flow = integration.register_flow_with_saturn(\n", - " flow=flow,\n", - " dask_cluster_kwargs={\n", - " \"n_workers\": 3,\n", - " \"worker_size\": \"xlarge\",\n", - " \"scheduler_size\": \"medium\",\n", - " \"worker_is_spot\": False,\n", - " },\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The final step necessary is to \"register\" the flow with Prefect Cloud. If this is the first time you've registered this flow, it will create a new Prefect Cloud project `PREFECT_CLOUD_PROJECT_NAME`. If you already have a flow in this project with this name, it will create a new version of it in Prefect Cloud." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "flow.register(project_name=PREFECT_CLOUD_PROJECT_NAME)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "\n", - "\n", - "### Run the flow\n", - "\n", - "If you have scheduled your flow, it will be run once every 24 hours. You can confirm this by doing all of the following:\n", - "\n", - "* If you are an admin, go to Prefect Cloud Agent page of Saturn Cloud which is at the side bar and check logs for your agent.\n", - "* Go to the \"Dask\" page in Saturn Cloud. You should see that a new Dask cluster has been created to run this flow.\n", - "* Go to Prefect Cloud. If you navigate to this flow and click \"Runs\", you should see task statuses and and logs for this flow.\n", - "\n", - "If you have not scheduled your flow or want to run the flow immediately, navigate to the flow in the Prefect Cloud UI and click \"Quick Run\".\n", - "\n", - "Alternative way to run the flow immediately is to open a terminal and run the code below.\n", - "```shell\n", - "prefect auth login --key ${PREFECT_USER_TOKEN}\n", - "prefect run \\\n", - " --name ${SATURN_USERNAME}-ticket-model-evaluation-dask \\\n", - " --project ${PREFECT_CLOUD_PROJECT_NAME}\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "\n", - "## Conclusion\n", - "\n", - "Prefect makes your workflows more managable and fault tolerant. \n", - "In this example, you learned how to create a prefect flow and distribute all tasks across Dask clusters. We then registered this flow with Prefect Cloud. \n", - "\n", - "Try changing the code above and re-running the flow. Add logging, add new tasks, or [customize the Dask cluster](https://github.com/saturncloud/prefect-saturn#customize-dask).If you have existing prefect flows, try running one of them on Saturn using this notebook as a template.\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "saturn (Python 3)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.9" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/examples/prefect/03-prefect-resource-manager.ipynb b/examples/prefect/03-prefect-resource-manager.ipynb deleted file mode 100644 index 3520acbb..00000000 --- a/examples/prefect/03-prefect-resource-manager.ipynb +++ /dev/null @@ -1,294 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Run a Prefect Cloud Data Pipeline with access to a Dask Cluster" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "![Prefect and Dask logos](https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-resources/prefect_dask.png \"doc-image\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "\n", - "## Overview\n", - "[Prefect Cloud](https://www.prefect.io/cloud/) is a hosted, high-availability, fault-tolerant service that handles all the orchestration responsibilities for running data pipelines. It gives you complete oversight of your workflows and makes it easy to manage them.\n", - "\n", - "This example has the tasks run on a single machine, but allows the tasks to be able to run Dask commands themselves. This is appropriate for situations where each task can be parallelized within itself. If you want to do everything on a single machine without Dask, see [the single machine example](./01-prefect-singlenode.ipynb). If you want to have the set of tasks parallelized on the Dask cluster, see [prefect-daskclusters example](./02-prefect-daskclusters.ipynb).\n", - "\n", - "![Prefect Execution](https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-resources/prefect_execute.png \"doc-image\")\n", - "\n", - "\n", - "### Model Details\n", - "For this example we will be using famous NYC taxi dataset. This dataset contains information about taxi trips in New York City. For the purposes of this example, we will be looking at the yellow taxi data from January 2019. We are distributing a single task amongst dask clusters. The task includes reading the datafrom nyc taxi dataset, filtering out the rows where price is missing and then calculating amount of variation in passenger count. \n", - "\n", - "## Modelling Process\n", - "\n", - "### Prerequisites\n", - "* created a Prefect Cloud account\n", - "* set up the appropriate credentials in Saturn\n", - "* set up a Prefect Cloud agent in Saturn Cloud\n", - "\n", - "Details on these prerequisites can be found in [Using Prefect Cloud with Saturn Cloud](https://saturncloud.io/docs/using-saturn-cloud/prefect_cloud/).\n", - "\n", - "### Environment Setup\n", - "\n", - "The code in this example uses prefect for orchestration (figuring out what to do, and in what order) and Dask Cluster for execution (doing the things).\n", - "\n", - "It relies on the following additional non-builtin libraries:\n", - "\n", - "* [`dask-saturn`](https://github.com/saturncloud/dask-saturn): create and interact with Saturn Cloud `Dask` clusters\n", - "* [`prefect-saturn`](https://github.com/saturncloud/prefect-saturn): register Prefect flows with both Prefect Cloud and Saturn Cloud.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "\n", - "import dask.dataframe as dd\n", - "from dask.distributed import Client\n", - "from dask_saturn import SaturnCluster\n", - "from prefect.executors import LocalExecutor\n", - "from prefect_saturn import PrefectCloudIntegration\n", - "\n", - "import prefect\n", - "from prefect import Flow, resource_manager, task\n", - "\n", - "PREFECT_CLOUD_PROJECT_NAME = os.environ[\"PREFECT_CLOUD_PROJECT_NAME\"]\n", - "SATURN_USERNAME = os.environ[\"SATURN_USERNAME\"]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Authenticate with Prefect Cloud." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!prefect auth login --key ${PREFECT_USER_TOKEN}" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Create a Prefect Cloud Project\n", - "\n", - "Prefect Cloud organizes flows within workspaces called \"projects\". Before you can register a flow with Prefect Cloud, it's necessary to create a project, if you don't have one yet.\n", - "\n", - "The code below will create a new project in whatever Prefect Cloud tenant you're authenticated with. If that project already exists, this code does not have any side effects." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "client = prefect.Client()\n", - "client.create_project(project_name=PREFECT_CLOUD_PROJECT_NAME)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Using a ResourceManager to setup a temporary Dask cluster\n", - "\n", - "Resource Managers in Prefect are like context managers. When some task needs an exclusive resource, we can use Resource Manager for setting up this temporary resource. This can be later cleaned when task is done.\n", - "Here we are creating a resource manager `_DaskCluster` by using `resource_manager` decorator. The resource manager object has three tasks : `init`, `setup` and `cleanup`. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "@resource_manager\n", - "class _DaskCluster:\n", - " def __init__(self, n_workers):\n", - " self.n_workers = n_workers\n", - "\n", - " def setup(self):\n", - " cluster = SaturnCluster(n_workers=self.n_workers)\n", - " client = Client(cluster) # noqa: F841\n", - "\n", - " def cleanup(self, x=None):\n", - " pass" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "## Define Tasks\n", - "\n", - "Prefect refers to a workload as a \"flow\", which comprises multiple individual things to do called \"tasks\". From [the Prefect docs](https://docs.prefect.io/core/concepts/tasks.html):\n", - "\n", - "> A task is like a function: it optionally takes inputs, performs an action, and produces an optional result.\n", - "\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "@task\n", - "def read():\n", - " taxi = dd.read_parquet(\"s3://saturn-public-data/nyc-taxi/data/yellow_tripdata_2019-01.parquet\")\n", - " df2 = taxi[taxi.passenger_count > 1]\n", - " df3 = df2.groupby(\"VendorID\").passenger_count.std()\n", - " return df3" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "## Construct a Flow\n", - "\n", - "Now that all of the task logic has been defined, the next step is to compose those tasks into a \"flow\".\n", - "\n", - "Inside our flow we have used Resource Manager `_DaskCluster`. Since task `read` needs Dask directly, we will keep the task inside `_DaskCluster`. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "with Flow(f\"{SATURN_USERNAME}-prefect-resource-manager\") as flow:\n", - " with _DaskCluster(n_workers=3) as client: # noqa: F841\n", - " a = read()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Register with Prefect Cloud\n" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "execution": { - "iopub.execute_input": "2021-12-02T00:45:29.479565Z", - "iopub.status.busy": "2021-12-02T00:45:29.479081Z", - "iopub.status.idle": "2021-12-02T00:45:29.487291Z", - "shell.execute_reply": "2021-12-02T00:45:29.486374Z", - "shell.execute_reply.started": "2021-12-02T00:45:29.479532Z" - } - }, - "source": [ - "Now that the business logic of the flow is complete, we can add information that Saturn Cloud will need to know to run it." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "integration = PrefectCloudIntegration(prefect_cloud_project_name=PREFECT_CLOUD_PROJECT_NAME)\n", - "flow = integration.register_flow_with_saturn(flow=flow)\n", - "# override the executor chosen by prefect-saturn\n", - "flow.executor = LocalExecutor()\n", - "# tell Prefect Cloud about the flow\n", - "flow.register(project_name=PREFECT_CLOUD_PROJECT_NAME)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "
\n", - "\n", - "## Run the flow\n", - "\n", - "If you have scheduled your flow, it will be run once every 24 hours. You can confirm this by doing all of the following:\n", - "\n", - "* If you are an admin, go to Prefect Cloud Agent page of Saturn Cloud which is at the side bar and check logs for your agent.\n", - "* Go to Prefect Cloud. If you navigate to this flow and click \"Runs\", you should see task statuses and and logs for this flow.\n", - "\n", - "If you have not scheduled your flow or want to run the flow immediately, navigate to the flow in the Prefect Cloud UI and click \"Quick Run\".\n", - "\n", - "Alternative way to run the flow immediately is to open a terminal and run the code below.\n", - "```shell\n", - "prefect auth login --key ${PREFECT_USER_TOKEN}\n", - "prefect run \\\n", - " --name prefect-resource-manager \\\n", - " --project ${PREFECT_CLOUD_PROJECT_NAME}\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "\n", - "\n", - "## Conclusion\n", - "\n", - "In this example, you learned how to create a prefect flow and distribute a task across Dask clusters. Register this flow with Prefect Cloud.\n", - "\n", - "Try changing the code above and re-running the flow eg you can train a model across multiple dask clusters\n", - "\n", - "If you have existing prefect flows, try running one of them on Saturn using this notebook as a template.\n", - "\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3.10.2 64-bit", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.2" - }, - "vscode": { - "interpreter": { - "hash": "b7848b2fbd737d4d16c30c2a265d9cb43a8b0508277d828bf32f61f61a6b4e46" - } - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/examples/prefect/README.md b/examples/prefect/README.md deleted file mode 100644 index c515111c..00000000 --- a/examples/prefect/README.md +++ /dev/null @@ -1,22 +0,0 @@ -# Scheduled jobs with Prefect and Prefect Cloud - -Prefect is an open source workflow orchestration tool made for data-intensive workloads. This allows you to schedule and organize jobs to run any time, in your chosen order. It can accommodate dependencies between jobs, and is very useful for data pipelines and machine learning workflows. - -Saturn Cloud supports two different tools in the Prefect ecosystem: -- Prefect Core, which is an open source library users can install, and -- Prefect Cloud, which is a fully hosted cloud service which you can purchase. The same team that maintains the `prefect` core library runs Prefect Cloud. Prefect Cloud is a hosted, high-availability, fault-tolerant service that handles all the orchestration responsibilities for running data pipelines. - -If you would like an introduction to the concepts behind job scheduling with Prefect, you can start with our [Introduction to Prefect Concepts](https://saturncloud.io/docs/examples/prefect/prefect_concepts/) docs page then come back here and run these examples! - -## [Data Pipeline on Single Node](./01-prefect-singlenode.ipynb) - -Execute all tasks locally in a single thread with Prefect and register this flow with Prefect Cloud. - -## [Data Pipeline: Run all tasks on Dask Cluster](./02-prefect-daskclusters.ipynb) - -Create a prefect flow and distribute all tasks across Dask clusters. - -## [Data Pipeline : Run a task on Dask Cluster](./03-prefect-resource-manager.ipynb) - -Create a prefect flow and distribute a task across Dask clusters. -