Skip to content

Commit

Permalink
Merge pull request #89 from DeepAI-Research/antbaez/fixing_issues
Browse files Browse the repository at this point in the history
added comments, improved readme
  • Loading branch information
antbaez9 authored Jul 17, 2024
2 parents d91d2fb + f568629 commit cf9fce8
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 63 deletions.
30 changes: 15 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ A simple way to distribute rendering tasks across multiple machines.

[![Lint and Test](https://github.com/DeepAI-Research/Distributask/actions/workflows/test.yml/badge.svg)](https://github.com/DeepAI-Research/Distributask/actions/workflows/test.yml)
[![PyPI version](https://badge.fury.io/py/distributask.svg)](https://badge.fury.io/py/distributask)
[![License](https://img.shields.io/badge/License-MIT-blue)](https://github.com/RaccoonResearch/distributask/blob/main/LICENSE)
[![forks - distributask](https://img.shields.io/github/forks/RaccoonResearch/distributask?style=social)](https://github.com/RaccoonResearch/distributask)
[![License](https://img.shields.io/badge/License-MIT-blue)](https://github.com/DeepAI-Research/Distributask/blob/main/LICENSE)


# Description

Distributask is a package that automatically queues, executes, and uploads the result of any task you want using Vast.ai, a decentralized network of GPUs. It works by first creating a Celery queue of the tasks, which is code that you want to be ran on a GPU. The tasks are then passed to the Vast.ai GPU workers using Redis as a message broker. Once the worker has completed the task, the result is uploaded to Huggingface.
Distributask is a package that automatically queues, executes, and uploads the result of any task you want using Vast.ai, a decentralized network of GPUs. It works by first creating a Celery queue of the tasks, which contain the code that you want to be ran on a GPU. The tasks are then passed to the Vast.ai GPU workers using Redis as a message broker. Once a worker has completed a task, the result is uploaded to Hugging Face.

# Installation

Expand All @@ -35,25 +35,25 @@ Install the required packages:
pip install -r requirements.txt
```

Install the distributask package:
Or install Distributask as a package:

```bash
python setup.py install
pip install distributask
```

### Configuration

Create a `.env` file in the root directory of your project or set environment variables to create your desired setup:

```plaintext
REDIS_HOST=redis_host
REDIS_PORT=redis_port
REDIS_USER=redis_user
REDIS_PASSWORD=redis_password
VAST_API_KEY=your_vastai_api_key
HF_TOKEN=your_huggingface_token
HF_REPO_ID=your_huggingface_repo
BROKER_POOL_LIMIT=broker_pool_limit
REDIS_HOST="name of your redis server"
REDIS_PORT="port of your redis server
REDIS_USER="username to login to redis server"
REDIS_PASSWORD="password to login to redis server"
VAST_API_KEY="your Vast.ai API key"
HF_TOKEN="your Hugging Face token"
HF_REPO_ID="name of your Hugging Face repository"
BROKER_POOL_LIMIT="your broker pool limit setting"
```

## Getting Started
Expand Down Expand Up @@ -83,11 +83,11 @@ This script configures the environment, registers a sample function, creates a q

## Documentation

For more info checkout our in-depth [documentation]!
For more info checkout our in-depth [documentation](https://deepai-research.github.io/Distributask)!

## Contributing

Contributions are welcome! For major changes, please open an issue first to discuss what you would like to change.
Contributions are welcome! For any changes you would like to see, please open an issue to discuss what you would like to see changed or to change yourself.

## License

Expand Down
2 changes: 1 addition & 1 deletion distributask/distributask.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def create_instance(
self, offer_id: str, image: str, module_name: str, env_settings: Dict, command: str
) -> Dict:
"""
Create an instance on the Vast.ai platform.
Create an instance on the Vast.ai platform. Passes in some useful Celery settings by default.
Args:
offer_id (str): The ID of the offer to create the instance from.
Expand Down
27 changes: 11 additions & 16 deletions distributask/example/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,24 @@
"--number_of_tasks", type=int, default=10, help="Number of tasks (default: 10)"
)

# Parse the arguments
args = parser.parse_args()

completed = False

# Register function to distributask object
distributask.register_function(example_function)

# First, initialize the dataset on Hugging Face
# This is idempotent, if you run it multiple times it won't delete files that already exist
# Initialize the dataset on Hugging Face
distributask.initialize_dataset()

# Create a file with the current date and time and save it as "datetime.txt"
with open("datetime.txt", "w") as f:
f.write(time.strftime("%Y-%m-%d %H:%M:%S"))

# Upload this to the repository
# Upload file to the repository
distributask.upload_file("datetime.txt")

# remove the example file
# Remove the example file from local
os.remove("datetime.txt")

vast_api_key = distributask.get_env("VAST_API_KEY")
Expand All @@ -65,7 +64,7 @@

job_configs = []

# Submit params for the job
# Compile parameters for tasks
for i in range(args.number_of_tasks):
job_configs.append(
{
Expand All @@ -74,41 +73,37 @@
}
)

# Rent the nodes and get the node ids
# This will return a list of node ids that you can use to execute tasks
# Rent Vast.ai nodes and get list of node ids
print("Renting nodes...")
rented_nodes = distributask.rent_nodes(
args.max_price, args.max_nodes, args.docker_image, args.module_name
)

print("Total rented nodes: ", len(rented_nodes))
print(rented_nodes)

tasks = []

repo_id = distributask.get_env("HF_REPO_ID")

# Submit the tasks to the queue for the worker nodes to execute
# Submit the tasks to the queue for the Vast.ai worker nodes to execute
for i in range(args.number_of_tasks):
job_config = job_configs[i]
print(f"Task {i}")
print(job_config)
print("Task params: ", job_config["task_params"])

print("Submitting tasks...")

params = job_config["task_params"]

# queue up the function for execution on the node
# Each task executes the function "example_function", defined in shared.py
task = distributask.execute_function(example_function.__name__, params)

# add the task to the list of tasks
# Add the task to the list of tasks
tasks.append(task)

def terminate_workers():
distributask.terminate_nodes(rented_nodes)
print("Workers terminated.")

# Terminate Vast.ai nodes on exit of script
atexit.register(terminate_workers)

# Monitor the status of the tasks with tqdm
distributask.monitor_tasks(tasks)
37 changes: 14 additions & 23 deletions distributask/example/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
if __name__ == "__main__":
completed = False

# Register function to distributask object
distributask.register_function(example_function)

# First, initialize the dataset on Hugging Face
# This is idempotent, if you run it multiple times it won't delete files that already exist
distributask.initialize_dataset()

# Create a file with the current date and time and save it as "datetime.txt"
Expand All @@ -21,7 +22,7 @@
# Upload this to the repository
distributask.upload_file("datetime.txt")

# remove the example file
# Remove the example file from local
os.remove("datetime.txt")

vast_api_key = distributask.get_env("VAST_API_KEY")
Expand All @@ -31,7 +32,7 @@
job_configs = []
number_of_tasks = 3

# Submit params for the job
# Compile parameters for tasks
for i in range(number_of_tasks):
job_configs.append(
{
Expand All @@ -44,37 +45,24 @@

repo_id = distributask.get_env("HF_REPO_ID")

# Submit the tasks
# For each task, check if the output files already exist
# Submit the tasks to the queue for the Vast.ai worker nodes to execute
for i in range(number_of_tasks):
job_config = job_configs[i]
print(f"Task {i}")
print(job_config)
print("Task params: ", job_config["task_params"])

# for each file in job_config["outputs"]
for output in job_config["outputs"]:
# check if the file exists in the dataset already
file_exists = distributask.file_exists(repo_id, output)

# if the file exists, ask the user if they want to overwrite it
if file_exists:
print("Files already exist. Do you want to overwrite them? (y/n): ")

print("Submitting tasks...")

params = job_config["task_params"]

# queue up the function for execution on the node
# Each task executes the function "example_function", defined in shared.py
task = distributask.execute_function(example_function.__name__, params)

# add the task to the list of tasks
# Add the task to the list of tasks
tasks.append(task)

# start the worker

# Start the local worker
docker_installed = False
# first, check if docker is installed
# Check if docker is installed
try:
subprocess.run(["docker", "version"], check=True)
docker_installed = True
Expand All @@ -83,8 +71,8 @@
print(e)

docker_process = None
# if docker is installed, start local docker worker
# if docker is not installed, start local celery worker
# If docker is installed, start local Docker worker
# If docker is not installed, start local Celery worker
if docker_installed is False:
print("Docker is not installed. Starting worker locally.")
celery_worker = subprocess.Popen(
Expand Down Expand Up @@ -129,6 +117,9 @@ def kill_docker():
print("Killing docker container")
docker_process.terminate()

# Terminate Docker worker on exit of script
atexit.register(kill_docker)

# Monitor the status of the tasks with tqdm
distributask.monitor_tasks(tasks)

11 changes: 5 additions & 6 deletions distributask/example/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,26 @@

from ..distributask import create_from_config

# Create distributask instance
distributask = create_from_config()


# This is the function that will be executed on the nodes
# You can make your own function and pass in whatever arguments you want
def example_function(index, arg1, arg2):

# As an ext
result = arg1 + arg2

time.sleep(random.randint(1, 6))

# save the result to a file
# Save the result to a file
with open(f"result_{index}.txt", "w") as f:
f.write(f"{str(arg1)} plus {str(arg2)} is {str(result)}")

# write the file to huggingface
# Write the file to huggingface
distributask.upload_file(f"result_{index}.txt")

# now destroy the file
# Delete local file
os.remove(f"result_{index}.txt")

# return the result - you can get this value from the task object
# Return the result - you can get this value from the task object
return f"Task {index} completed. Result ({str(arg1)} + {str(arg2)}): {str(result)}"
2 changes: 2 additions & 0 deletions distributask/example/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .shared import distributask, example_function

# Register function to worker using distributask instance
distributask.register_function(example_function)

# Create Celery worker
celery = distributask.app
4 changes: 2 additions & 2 deletions docs/more_info.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Summary of useful functions
# Summary of most relevant functions

#### Settings, Environment, and Help

Expand Down Expand Up @@ -34,7 +34,7 @@

# Docker Setup

Distributask uses a Docker image to transfer the environment and files to the Vast.ai nodes. In your implementation using Distributask, you can use the Docker file in the Distributask repository as a base for your own Docker file. If you choose to do this, be sure to add requirements.txt (and add distributask to the list of packages) to your directory as well so the Docker image has the required packages.
Distributask uses a Docker image to transfer the environment and neccessary files to the Vast.ai nodes. In your implementation using Distributask, you can use the Docker file in the Distributask repository as a base for your own Docker file. If you do this, be sure to add Distributask to the list of packages to be installed on your Docker file.

# Important Packages

Expand Down

0 comments on commit cf9fce8

Please sign in to comment.