From 2103873cf16525c4272043538b007d8c9f87e09a Mon Sep 17 00:00:00 2001 From: moon Date: Sat, 18 May 2024 12:25:13 -0700 Subject: [PATCH] borked --- Dockerfile | 4 +- distributaur/__init__.py | 0 distributaur/batch.py | 197 --------------------------------- distributaur/cli.py | 18 +++ distributaur/core.py | 33 ++++++ distributaur/decorators.py | 5 + distributaur/environment.py | 17 +++ distributaur/example.py | 5 - distributaur/task_runner.py | 82 -------------- distributaur/tests/__init__.py | 2 +- distributaur/tests/vast.py | 2 +- distributaur/utils.py | 141 ----------------------- distributaur/vast.py | 2 +- distributaur/worker.py | 11 ++ example.sh | 44 ++++++++ 15 files changed, 133 insertions(+), 430 deletions(-) create mode 100644 distributaur/__init__.py delete mode 100644 distributaur/batch.py create mode 100644 distributaur/cli.py create mode 100644 distributaur/core.py create mode 100644 distributaur/decorators.py create mode 100644 distributaur/environment.py delete mode 100644 distributaur/example.py delete mode 100644 distributaur/task_runner.py delete mode 100644 distributaur/utils.py create mode 100644 distributaur/worker.py create mode 100644 example.sh diff --git a/Dockerfile b/Dockerfile index 3e8ebbd..6b5b1cb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=linux/x86_64 ubuntu:24.04 +FROM ubuntu:24.04 RUN apt-get update && \ apt-get install -y \ @@ -14,4 +14,4 @@ RUN python3 -m pip install --no-cache-dir -r requirements.txt COPY distributaur/ ./distributaur/ -CMD ["celery", "-A", "distributaur.example", "example", "--loglevel=info", "--concurrency=1"] \ No newline at end of file +CMD ["celery", "-A", "distributaur.task_runner", "worker", "--loglevel=info", "--concurrency=1"] \ No newline at end of file diff --git a/distributaur/__init__.py b/distributaur/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/distributaur/batch.py b/distributaur/batch.py deleted file mode 100644 index 1ca8ab6..0000000 --- a/distributaur/batch.py +++ /dev/null @@ -1,197 +0,0 @@ -import json -import os -import signal -import sys -import argparse -import time -from celery import chord, uuid - -sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../")) - -from distributaur.vast import ( - rent_nodes, - terminate_nodes, - monitor_job_status, - handle_sigint, - attach_to_existing_job, - dump_redis_values, -) -from distributaur.worker import render_object, notify_completion - - -def render_objects( - job_id, - start_index, - end_index, - start_frame=0, - end_frame=65, - width=1920, - height=1080, - output_dir="./renders", - hdri_path="./backgrounds", - max_price=0.1, - max_nodes=1, - image="arfx/simian-worker:latest", - api_key=None, -): - combinations = [] - # read combinations.json - with open("combinations.json", "r") as file: - combinations = json.load(file) - combinations = combinations["combinations"] - - # make sure end_index is less than the number of combinations - end_index = min(end_index, len(combinations)) - - print(f"Rendering objects from {start_index} to {end_index}") - - tasks = [ - render_object.s( - job_id, - i, - combination, - width, - height, - output_dir, - hdri_path, - start_frame, - end_frame, - ) - for i, combination in enumerate(combinations[start_index:end_index]) - ] - callback = notify_completion.s(job_id) # Pass job_id to completion callback - job = chord(tasks)(callback) - - # Rent nodes using distributed_vast - nodes = rent_nodes(max_price, max_nodes, image, api_key) - - # Set up signal handler for SIGINT - signal.signal(signal.SIGINT, lambda sig, frame: handle_sigint(nodes)) - - # Add delay to wait for workers to start - time.sleep(30) # Adjust this time as needed - - # Monitor the job status - monitor_job_status(job) # Directly pass the job - - # Dump Redis values for debugging - # dump_redis_values() - - # Terminate nodes once the job is complete - terminate_nodes(nodes) - - print("All tasks have been completed!") - return job - - -def main(): - parser = argparse.ArgumentParser( - description="Automate the rendering of objects using Celery." - ) - parser.add_argument( - "--start_index", - type=int, - default=0, - help="Starting index for rendering from the combinations list.", - ) - parser.add_argument( - "--end_index", - type=int, - default=100, - help="Ending index for rendering from the combinations list.", - ) - parser.add_argument( - "--start_frame", - type=int, - default=0, - help="Starting frame number for the animation. Defaults to 0.", - ) - parser.add_argument( - "--end_frame", - type=int, - default=65, - help="Ending frame number for the animation. Defaults to 65.", - ) - parser.add_argument( - "--width", - type=int, - default=1920, - help="Width of the rendering in pixels. Defaults to 1920.", - ) - parser.add_argument( - "--height", - type=int, - default=1080, - help="Height of the rendering in pixels. Defaults to 1080.", - ) - parser.add_argument( - "--output_dir", - type=str, - default="./renders", - help="Directory to save rendered outputs. Defaults to './renders'.", - ) - parser.add_argument( - "--hdri_path", - type=str, - default="./backgrounds", - help="Directory containing HDRI files for rendering. Defaults to './backgrounds'.", - ) - parser.add_argument( - "--max_price", - type=float, - default=0.1, - help="Maximum price per hour for renting nodes. Defaults to 0.1.", - ) - parser.add_argument( - "--max_nodes", - type=int, - default=1, - help="Maximum number of nodes to rent. Defaults to 1.", - ) - parser.add_argument( - "--image", - type=str, - default="arfx/simian-worker:latest", - help="Docker image to use for rendering. Defaults to 'arfx/simian-worker:latest'.", - ) - parser.add_argument( - "--api_key", - type=str, - default=None, - help="API key for renting nodes. Defaults to None.", - ) - # add job_id - parser.add_argument( - "--job_id", - type=str, - default=str(uuid()), - help="Unique job ID for the batch.", - ) - - args = parser.parse_args() - - job_id = args.job_id - # Check if attaching to an existing job - if attach_to_existing_job(job_id): - # Monitor the job status - monitor_job_status() - else: - render_objects( - job_id=job_id, - start_index=args.start_index, - end_index=args.end_index, - start_frame=args.start_frame, - end_frame=args.end_frame, - width=args.width, - height=args.height, - output_dir=args.output_dir, - hdri_path=args.hdri_path, - max_price=args.max_price, - max_nodes=args.max_nodes, - image=args.image, - api_key=args.api_key, - ) - - -if __name__ == "__main__": - main() diff --git a/distributaur/cli.py b/distributaur/cli.py new file mode 100644 index 0000000..c7edf0c --- /dev/null +++ b/distributaur/cli.py @@ -0,0 +1,18 @@ +import argparse +import os +import sys + +sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "../")) + +from distributaur.core import submit_task + +def main(): + parser = argparse.ArgumentParser(description="Distributaur CLI") + parser.add_argument("--code", type=str, help="Python code to execute.") + args = parser.parse_args() + + result = submit_task(args.code) + print(f"Execution Result: {result}") + +if __name__ == "__main__": + main() diff --git a/distributaur/core.py b/distributaur/core.py new file mode 100644 index 0000000..4d8b5bb --- /dev/null +++ b/distributaur/core.py @@ -0,0 +1,33 @@ +# core.py + +from celery import Celery +from .environment import get_redis_url +from .decorators import register_task + +app = Celery("distributaur_tasks", broker=get_redis_url(), backend=get_redis_url()) + +@app.task(bind=True) +def execute_python_code(self, code): + try: + local_scope = {} + exec(code, {'__builtins__': None}, local_scope) + return local_scope.get('result', None) # Assuming the code defines 'result' + except Exception as e: + print(f"Error executing code: {str(e)}") + return None + +def submit_task(code): + """ + Submits a Python code execution task to the Celery worker. + Args: + code (str): Python code to execute. + + Returns: + result: The result of the Python code execution or None if an error occurs. + """ + try: + result = app.send_task('execute_python_code', args=[code]) + return result.get(timeout=10) # Waits for the task to complete and returns the result + except Exception as e: + print(f"Failed to submit task: {str(e)}") + return None diff --git a/distributaur/decorators.py b/distributaur/decorators.py new file mode 100644 index 0000000..ccb92a8 --- /dev/null +++ b/distributaur/decorators.py @@ -0,0 +1,5 @@ +def register_task(app): + def decorator(func): + task = app.task(bind=True)(func) + return func + return decorator \ No newline at end of file diff --git a/distributaur/environment.py b/distributaur/environment.py new file mode 100644 index 0000000..b348f04 --- /dev/null +++ b/distributaur/environment.py @@ -0,0 +1,17 @@ +import os + +def get_redis_url(): + """Constructs the Redis connection URL from environment variables.""" + host = os.getenv("REDIS_HOST", "localhost") + password = os.getenv("REDIS_PASSWORD", None) + port = os.getenv("REDIS_PORT", 6379) + return f"redis://:{password}@{host}:{port}" if password else f"redis://{host}:{port}" + +def get_env_vars(): + """Loads environment variables critical for Distributaur operations.""" + required_vars = ["VAST_API_KEY", "REDIS_HOST", "REDIS_PORT"] + env_vars = {var: os.getenv(var) for var in required_vars} + if not all(env_vars.values()): + missing = [var for var, value in env_vars.items() if not value] + raise EnvironmentError(f"Missing critical environment variables: {', '.join(missing)}") + return env_vars diff --git a/distributaur/example.py b/distributaur/example.py deleted file mode 100644 index 2bfadfa..0000000 --- a/distributaur/example.py +++ /dev/null @@ -1,5 +0,0 @@ -from distributaur.task_runner import run_task - -@run_task -def run_example_job() -> None: - print("Running example job") \ No newline at end of file diff --git a/distributaur/task_runner.py b/distributaur/task_runner.py deleted file mode 100644 index eedb053..0000000 --- a/distributaur/task_runner.py +++ /dev/null @@ -1,82 +0,0 @@ -import json -import subprocess -import sys -import os -import ssl -import time -from celery import Celery -from redis import ConnectionPool, Redis - -ssl._create_default_https_context = ssl._create_unverified_context - -from distributaur.utils import get_redis_values - -redis_url = get_redis_values() -pool = ConnectionPool.from_url(redis_url) -redis_client = Redis(connection_pool=pool) - -app = Celery("tasks", broker=redis_url, backend=redis_url) - - -def run_task(task_func): - @app.task(name=task_func.__name__, acks_late=True, reject_on_worker_lost=True) - def wrapper(*args, **kwargs): - job_id = kwargs.get("job_id") - task_id = wrapper.request.id - print(f"Starting task {task_id} in job {job_id}") - update_task_status(job_id, task_id, "IN_PROGRESS") - - timeout = 600 # 10 minutes in seconds - task_timeout = 2700 # 45 minutes in seconds - - start_time = time.time() - print(f"Task {task_id} starting.") - - while True: - elapsed_time = time.time() - start_time - if elapsed_time > timeout: - update_task_status(task_id, "TIMEOUT") - print(f"Task {task_id} timed out before starting task") - return - - try: - task_start_time = time.time() - print(f"Task {task_id} executing task function.") - result = task_func(*args, **kwargs) - print(f"Task {task_id} completed task function.") - - elapsed_task_time = time.time() - task_start_time - if elapsed_task_time > task_timeout: - update_task_status(task_id, "TIMEOUT") - print( - f"Task {task_id} timed out after {elapsed_task_time} seconds of execution" - ) - return - - update_task_status(task_id, "COMPLETE") - print(f"Task {task_id} completed successfully") - return result - - except subprocess.TimeoutExpired: - update_task_status(task_id, "TIMEOUT") - print(f"Task {task_id} timed out after {timeout} seconds") - return - - except Exception as e: - update_task_status(job_id, task_id, "FAILED") - print(f"Task {task_id} failed with error: {str(e)}") - return - - return wrapper - - -def update_task_status(job_id, task_id, status): - key = f"celery-task-meta-{task_id}" - value = json.dumps({"status": status}) - redis_client.set(key, value) - print(f"Updated status for task {task_id} in job {job_id} to {status}") - - -if __name__ == "__main__": - print("Starting Celery worker...") - app.start(argv=["celery", "worker", "--loglevel=info"]) diff --git a/distributaur/tests/__init__.py b/distributaur/tests/__init__.py index 272f39e..fb16209 100644 --- a/distributaur/tests/__init__.py +++ b/distributaur/tests/__init__.py @@ -1 +1 @@ -from .vast import * \ No newline at end of file +from distributaur.vast import * \ No newline at end of file diff --git a/distributaur/tests/vast.py b/distributaur/tests/vast.py index cb6b02e..412aa48 100644 --- a/distributaur/tests/vast.py +++ b/distributaur/tests/vast.py @@ -26,7 +26,7 @@ def rented_nodes(vast_api_key): max_price = 0.5 max_nodes = 1 - image = "arfx/simian-worker:latest" + image = "arfx/distributaur-example-worker-worker:latest" nodes = rent_nodes(max_price, max_nodes, image, vast_api_key) yield nodes diff --git a/distributaur/utils.py b/distributaur/utils.py deleted file mode 100644 index d44124b..0000000 --- a/distributaur/utils.py +++ /dev/null @@ -1,141 +0,0 @@ -import os -from sys import platform - - -def get_env_vars(path=".env"): - env_vars = {} - if not os.path.exists(path): - return env_vars - with open(path, "r") as f: - for line in f: - key, value = line.strip().split("=") - env_vars[key] = value - return env_vars - - -def get_redis_values(path=".env"): - env_vars = get_env_vars(path) - - host = env_vars.get("REDIS_HOST", os.getenv("REDIS_HOST", "localhost")) - password = env_vars.get("REDIS_PASSWORD", os.getenv("REDIS_PASSWORD", None)) - port = env_vars.get("REDIS_PORT", os.getenv("REDIS_PORT", 6379)) - username = env_vars.get("REDIS_USER", os.getenv("REDIS_USER", None)) - if password is None: - redis_url = f"redis://{host}:{port}" - else: - redis_url = f"redis://{username}:{password}@{host}:{port}" - return redis_url - - -def get_blender_path(): - # if we are on macOS, then application_path is /Applications/Blender.app/Contents/MacOS/Blender - if platform.system() == "Darwin": - application_path = "/Applications/Blender.app/Contents/MacOS/Blender" - else: - application_path = "./blender/blender" - if not os.path.exists(application_path): - raise FileNotFoundError(f"Blender not found at {application_path}.") - return application_path - - -def upload_outputs(output_dir): - # determine if s3 or huggingface environment variables are set up - # env_vars = get_env_vars() - # aws_access_key_id = env_vars.get("AWS_ACCESS_KEY_ID") or os.getenv("AWS_ACCESS_KEY_ID") - # huggingface_token = env_vars.get("HF_TOKEN") or os.getenv("HF_TOKEN") - # if aws_access_key_id and huggingface_token: - # print("Warning: Both AWS and Hugging Face credentials are set. Defaulting to Huggingface. Remove credentials to default to AWS.") - # upload_to_huggingface(output_dir, combination) - # elif aws_access_key_id is None and huggingface_token is None: - # raise ValueError("No AWS or Hugging Face credentials found. Please set one.") - # elif aws_access_key_id: - # upload_to_s3(output_dir, combination) - # elif huggingface_token: - upload_to_huggingface(output_dir) - - -# def upload_to_s3(output_dir, combination): -# """ -# Uploads the rendered outputs to an S3 bucket. - -# Args: -# - output_dir (str): The directory where the rendered outputs are saved. -# - bucket_name (str): The name of the S3 bucket. -# - s3_path (str): The path in the S3 bucket where files should be uploaded. - -# Returns: -# - None -# """ -# import boto3 -# from botocore.exceptions import NoCredentialsError, PartialCredentialsError - -# env_vars = get_env_vars() -# aws_access_key_id = env_vars.get("AWS_ACCESS_KEY_ID") or os.getenv("AWS_ACCESS_KEY_ID") -# aws_secret_access_key = env_vars.get("AWS_SECRET_ACCESS_KEY") or os.getenv("AWS_SECRET_ACCESS_KEY") - -# s3_client = boto3.client( -# "s3", -# aws_access_key_id, -# aws_secret_access_key -# ) - -# bucket_name = combination.get("bucket_name", os.getenv("AWS_BUCKET_NAME")) or env_vars.get("AWS_BUCKET_NAME") -# s3_path = combination.get("upload_path", os.getenv("AWS_UPLOAD_PATH")) or env_vars.get("AWS_UPLOAD_PATH") - -# for root, dirs, files in os.walk(output_dir): -# for file in files: -# local_path = os.path.join(root, file) -# s3_file_path = os.path.join(s3_path, file) if s3_path else file - -# try: -# s3_client.upload_file(local_path, bucket_name, s3_file_path) -# print(f"Uploaded {local_path} to s3://{bucket_name}/{s3_file_path}") -# except FileNotFoundError: -# print(f"File not found: {local_path}") -# except NoCredentialsError: -# print("AWS credentials not found.") -# except PartialCredentialsError: -# print("Incomplete AWS credentials.") -# except Exception as e: -# print(f"Failed to upload {local_path} to s3://{bucket_name}/{s3_file_path}: {e}") - - -def upload_to_huggingface(output_dir): - """ - Uploads the rendered outputs to a Hugging Face repository. - - Args: - - output_dir (str): The directory where the rendered outputs are saved. - - repo_id (str): The repository ID on Hugging Face. - - Returns: - - None - """ - env_vars = get_env_vars() - hf_token = os.getenv("HF_TOKEN") or env_vars.get("HF_TOKEN") - repo_id = os.getenv("HF_REPO_ID") or env_vars.get("HF_REPO_ID") - repo_path = os.getenv("HF_PATH") or env_vars.get("HF_PATH", "") - from huggingface_hub import HfApi - - api = HfApi(token=hf_token) - - for root, dirs, files in os.walk(output_dir): - for file in files: - local_path = os.path.join(root, file) - path_in_repo = os.path.join(repo_path, file) if repo_path else file - - try: - api.upload_file( - path_or_fileobj=local_path, - path_in_repo=path_in_repo, - repo_id=repo_id, - token=hf_token, - repo_type="dataset", - ) - print( - f"Uploaded {local_path} to Hugging Face repo {repo_id} at {path_in_repo}" - ) - except Exception as e: - print( - f"Failed to upload {local_path} to Hugging Face repo {repo_id} at {path_in_repo}: {e}" - ) diff --git a/distributaur/vast.py b/distributaur/vast.py index 30b3340..2e15afe 100644 --- a/distributaur/vast.py +++ b/distributaur/vast.py @@ -305,7 +305,7 @@ def search_offers(max_price, api_key): } url = ( base_url - + '?q={"gpu_ram":">=4","rentable":{"eq":true},"dph_total":{"lte":0.1480339514817041},"sort_option":{"0":["dph_total","asc"],"1":["total_flops","asc"]}}' + + '?q={"gpu_ram":">=4","rentable":{"eq":true},"dph_total":{"lte":' + max_price + '},"sort_option":{"0":["dph_total","asc"],"1":["total_flops","asc"]}}' ) print("url", url) diff --git a/distributaur/worker.py b/distributaur/worker.py new file mode 100644 index 0000000..08a67ab --- /dev/null +++ b/distributaur/worker.py @@ -0,0 +1,11 @@ +from distributaur.decorators import register_task + +@register_task +def execute_python_code(code): + try: + local_scope = {} + exec(code, {'__builtins__': None}, local_scope) + return local_scope.get('result', None) # Assuming the code defines 'result' + except Exception as e: + print(f"Error executing code: {str(e)}") + return None diff --git a/example.sh b/example.sh new file mode 100644 index 0000000..b1b6cc2 --- /dev/null +++ b/example.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +# Define log files +CELERY_LOG="celery.log" +EXAMPLE_LOG="example.log" + +# Function to start the Celery worker +start_celery() { + celery -A distributaur.task_runner worker --loglevel=info > $CELERY_LOG 2>&1 +} + +# Function to start the example script +start_example() { + python distributaur/example.py > $EXAMPLE_LOG 2>&1 +} + +# Function to display logs +display_logs() { + tail -f $CELERY_LOG & CELERY_TAIL_PID=$! + tail -f $EXAMPLE_LOG & EXAMPLE_TAIL_PID=$! + wait $CELERY_TAIL_PID $EXAMPLE_TAIL_PID +} + +# Function to handle cleanup on script exit +cleanup() { + echo "Stopping Celery worker and example script..." + pkill -f "celery -A distributaur.task_runner worker" + pkill -f "python distributaur/example.py" + pkill -f "tail -f $CELERY_LOG" + pkill -f "tail -f $EXAMPLE_LOG" +} + +# Trap SIGINT and SIGTERM to call cleanup +trap cleanup SIGINT SIGTERM + +# Start the Celery worker and example script in separate subshells +(start_celery) & +(start_example) & + +# Wait for a few seconds to allow the processes to start +sleep 5 + +# Display logs from both processes +display_logs \ No newline at end of file