diff --git a/README.md b/README.md index d02906a..78d0201 100644 --- a/README.md +++ b/README.md @@ -5,12 +5,12 @@ A simple way to distribute rendering tasks across multiple machines. [![Lint and Test](https://github.com/DeepAI-Research/Distributask/actions/workflows/test.yml/badge.svg)](https://github.com/DeepAI-Research/Distributask/actions/workflows/test.yml) [![PyPI version](https://badge.fury.io/py/distributask.svg)](https://badge.fury.io/py/distributask) -[![License](https://img.shields.io/badge/License-MIT-blue)](https://github.com/RaccoonResearch/distributask/blob/main/LICENSE) -[![forks - distributask](https://img.shields.io/github/forks/RaccoonResearch/distributask?style=social)](https://github.com/RaccoonResearch/distributask) +[![License](https://img.shields.io/badge/License-MIT-blue)](https://github.com/DeepAI-Research/Distributask/blob/main/LICENSE) + # Description -Distributask is a package that automatically queues, executes, and uploads the result of any task you want using Vast.ai, a decentralized network of GPUs. It works by first creating a Celery queue of the tasks, which is code that you want to be ran on a GPU. The tasks are then passed to the Vast.ai GPU workers using Redis as a message broker. Once the worker has completed the task, the result is uploaded to Huggingface. +Distributask is a package that automatically queues, executes, and uploads the result of any task you want using Vast.ai, a decentralized network of GPUs. It works by first creating a Celery queue of the tasks, which contain the code that you want to be ran on a GPU. The tasks are then passed to the Vast.ai GPU workers using Redis as a message broker. Once a worker has completed a task, the result is uploaded to Hugging Face. # Installation @@ -35,10 +35,10 @@ Install the required packages: pip install -r requirements.txt ``` -Install the distributask package: +Or install Distributask as a package: ```bash -python setup.py install +pip install distributask ``` ### Configuration @@ -46,14 +46,14 @@ python setup.py install Create a `.env` file in the root directory of your project or set environment variables to create your desired setup: ```plaintext -REDIS_HOST=redis_host -REDIS_PORT=redis_port -REDIS_USER=redis_user -REDIS_PASSWORD=redis_password -VAST_API_KEY=your_vastai_api_key -HF_TOKEN=your_huggingface_token -HF_REPO_ID=your_huggingface_repo -BROKER_POOL_LIMIT=broker_pool_limit +REDIS_HOST="name of your redis server" +REDIS_PORT="port of your redis server +REDIS_USER="username to login to redis server" +REDIS_PASSWORD="password to login to redis server" +VAST_API_KEY="your Vast.ai API key" +HF_TOKEN="your Hugging Face token" +HF_REPO_ID="name of your Hugging Face repository" +BROKER_POOL_LIMIT="your broker pool limit setting" ``` ## Getting Started @@ -83,11 +83,11 @@ This script configures the environment, registers a sample function, creates a q ## Documentation -For more info checkout our in-depth [documentation]! +For more info checkout our in-depth [documentation](https://deepai-research.github.io/Distributask)! ## Contributing -Contributions are welcome! For major changes, please open an issue first to discuss what you would like to change. +Contributions are welcome! For any changes you would like to see, please open an issue to discuss what you would like to see changed or to change yourself. ## License diff --git a/distributask/distributask.py b/distributask/distributask.py index 402867a..704b4c7 100644 --- a/distributask/distributask.py +++ b/distributask/distributask.py @@ -519,7 +519,7 @@ def create_instance( self, offer_id: str, image: str, module_name: str, env_settings: Dict, command: str ) -> Dict: """ - Create an instance on the Vast.ai platform. + Create an instance on the Vast.ai platform. Passes in some useful Celery settings by default. Args: offer_id (str): The ID of the offer to create the instance from. diff --git a/distributask/example/distributed.py b/distributask/example/distributed.py index 065f597..576ac10 100644 --- a/distributask/example/distributed.py +++ b/distributask/example/distributed.py @@ -38,25 +38,24 @@ "--number_of_tasks", type=int, default=10, help="Number of tasks (default: 10)" ) - # Parse the arguments args = parser.parse_args() completed = False + # Register function to distributask object distributask.register_function(example_function) - # First, initialize the dataset on Hugging Face - # This is idempotent, if you run it multiple times it won't delete files that already exist + # Initialize the dataset on Hugging Face distributask.initialize_dataset() # Create a file with the current date and time and save it as "datetime.txt" with open("datetime.txt", "w") as f: f.write(time.strftime("%Y-%m-%d %H:%M:%S")) - # Upload this to the repository + # Upload file to the repository distributask.upload_file("datetime.txt") - # remove the example file + # Remove the example file from local os.remove("datetime.txt") vast_api_key = distributask.get_env("VAST_API_KEY") @@ -65,7 +64,7 @@ job_configs = [] - # Submit params for the job + # Compile parameters for tasks for i in range(args.number_of_tasks): job_configs.append( { @@ -74,41 +73,37 @@ } ) - # Rent the nodes and get the node ids - # This will return a list of node ids that you can use to execute tasks + # Rent Vast.ai nodes and get list of node ids print("Renting nodes...") rented_nodes = distributask.rent_nodes( args.max_price, args.max_nodes, args.docker_image, args.module_name ) print("Total rented nodes: ", len(rented_nodes)) - print(rented_nodes) tasks = [] - repo_id = distributask.get_env("HF_REPO_ID") - - # Submit the tasks to the queue for the worker nodes to execute + # Submit the tasks to the queue for the Vast.ai worker nodes to execute for i in range(args.number_of_tasks): job_config = job_configs[i] print(f"Task {i}") print(job_config) print("Task params: ", job_config["task_params"]) - print("Submitting tasks...") - params = job_config["task_params"] - # queue up the function for execution on the node + # Each task executes the function "example_function", defined in shared.py task = distributask.execute_function(example_function.__name__, params) - # add the task to the list of tasks + # Add the task to the list of tasks tasks.append(task) def terminate_workers(): distributask.terminate_nodes(rented_nodes) print("Workers terminated.") + # Terminate Vast.ai nodes on exit of script atexit.register(terminate_workers) + # Monitor the status of the tasks with tqdm distributask.monitor_tasks(tasks) diff --git a/distributask/example/local.py b/distributask/example/local.py index c271fc2..f99afa1 100644 --- a/distributask/example/local.py +++ b/distributask/example/local.py @@ -9,9 +9,10 @@ if __name__ == "__main__": completed = False + # Register function to distributask object distributask.register_function(example_function) + # First, initialize the dataset on Hugging Face - # This is idempotent, if you run it multiple times it won't delete files that already exist distributask.initialize_dataset() # Create a file with the current date and time and save it as "datetime.txt" @@ -21,7 +22,7 @@ # Upload this to the repository distributask.upload_file("datetime.txt") - # remove the example file + # Remove the example file from local os.remove("datetime.txt") vast_api_key = distributask.get_env("VAST_API_KEY") @@ -31,7 +32,7 @@ job_configs = [] number_of_tasks = 3 - # Submit params for the job + # Compile parameters for tasks for i in range(number_of_tasks): job_configs.append( { @@ -44,37 +45,24 @@ repo_id = distributask.get_env("HF_REPO_ID") - # Submit the tasks - # For each task, check if the output files already exist + # Submit the tasks to the queue for the Vast.ai worker nodes to execute for i in range(number_of_tasks): job_config = job_configs[i] print(f"Task {i}") print(job_config) print("Task params: ", job_config["task_params"]) - # for each file in job_config["outputs"] - for output in job_config["outputs"]: - # check if the file exists in the dataset already - file_exists = distributask.file_exists(repo_id, output) - - # if the file exists, ask the user if they want to overwrite it - if file_exists: - print("Files already exist. Do you want to overwrite them? (y/n): ") - - print("Submitting tasks...") - params = job_config["task_params"] - # queue up the function for execution on the node + # Each task executes the function "example_function", defined in shared.py task = distributask.execute_function(example_function.__name__, params) - # add the task to the list of tasks + # Add the task to the list of tasks tasks.append(task) - # start the worker - + # Start the local worker docker_installed = False - # first, check if docker is installed + # Check if docker is installed try: subprocess.run(["docker", "version"], check=True) docker_installed = True @@ -83,8 +71,8 @@ print(e) docker_process = None - # if docker is installed, start local docker worker - # if docker is not installed, start local celery worker + # If docker is installed, start local Docker worker + # If docker is not installed, start local Celery worker if docker_installed is False: print("Docker is not installed. Starting worker locally.") celery_worker = subprocess.Popen( @@ -129,6 +117,9 @@ def kill_docker(): print("Killing docker container") docker_process.terminate() + # Terminate Docker worker on exit of script atexit.register(kill_docker) + # Monitor the status of the tasks with tqdm distributask.monitor_tasks(tasks) + diff --git a/distributask/example/shared.py b/distributask/example/shared.py index dc844bd..d2467ad 100644 --- a/distributask/example/shared.py +++ b/distributask/example/shared.py @@ -4,27 +4,26 @@ from ..distributask import create_from_config +# Create distributask instance distributask = create_from_config() - # This is the function that will be executed on the nodes # You can make your own function and pass in whatever arguments you want def example_function(index, arg1, arg2): - # As an ext result = arg1 + arg2 time.sleep(random.randint(1, 6)) - # save the result to a file + # Save the result to a file with open(f"result_{index}.txt", "w") as f: f.write(f"{str(arg1)} plus {str(arg2)} is {str(result)}") - # write the file to huggingface + # Write the file to huggingface distributask.upload_file(f"result_{index}.txt") - # now destroy the file + # Delete local file os.remove(f"result_{index}.txt") - # return the result - you can get this value from the task object + # Return the result - you can get this value from the task object return f"Task {index} completed. Result ({str(arg1)} + {str(arg2)}): {str(result)}" diff --git a/distributask/example/worker.py b/distributask/example/worker.py index d814258..a616692 100644 --- a/distributask/example/worker.py +++ b/distributask/example/worker.py @@ -1,5 +1,7 @@ from .shared import distributask, example_function +# Register function to worker using distributask instance distributask.register_function(example_function) +# Create Celery worker celery = distributask.app diff --git a/docs/more_info.md b/docs/more_info.md index 0d8ff6e..f5c3f0f 100644 --- a/docs/more_info.md +++ b/docs/more_info.md @@ -1,4 +1,4 @@ -# Summary of useful functions +# Summary of most relevant functions #### Settings, Environment, and Help @@ -34,7 +34,7 @@ # Docker Setup -Distributask uses a Docker image to transfer the environment and files to the Vast.ai nodes. In your implementation using Distributask, you can use the Docker file in the Distributask repository as a base for your own Docker file. If you choose to do this, be sure to add requirements.txt (and add distributask to the list of packages) to your directory as well so the Docker image has the required packages. +Distributask uses a Docker image to transfer the environment and neccessary files to the Vast.ai nodes. In your implementation using Distributask, you can use the Docker file in the Distributask repository as a base for your own Docker file. If you do this, be sure to add Distributask to the list of packages to be installed on your Docker file. # Important Packages