diff --git a/distributask/distributask.py b/distributask/distributask.py index 57d0025..259025a 100644 --- a/distributask/distributask.py +++ b/distributask/distributask.py @@ -18,8 +18,8 @@ class Distributask: """ - The Distributask class contains the core features of distributask, including creating and - executing the task queue, managing workers using the Vast.ai API, and uploading files and directories + The Distributask class contains the core features of distributask, including creating and + executing the task queue, managing workers using the Vast.ai API, and uploading files and directories using the Hugging Face API. """ @@ -40,7 +40,7 @@ def __init__( broker_pool_limit=os.getenv("BROKER_POOL_LIMIT", 1), ) -> None: """ - Initialize the Distributask object with the provided configuration parameters. Also sets some + Initialize the Distributask object with the provided configuration parameters. Also sets some default settings in Celery and handles cleanup of Celery queue and Redis server on exit. Args: @@ -283,7 +283,7 @@ def initialize_dataset(self, **kwargs) -> None: Initialize a Hugging Face repository if it doesn't exist. Reads Hugging Face info from config or .env Args: - kwargs: kwargs that can be passed into the HfApi.create_repo function. + kwargs: kwargs that can be passed into the HfApi.create_repo function. Raises: HTTPError: If repo cannot be created due to connection error other than repo not existing @@ -543,7 +543,7 @@ def create_instance( raise ValueError("VAST_API_KEY is not set in the environment") if command is None: - command = f"celery -A {module_name} worker --loglevel=info --concurrency=1" + command = f"celery -A {module_name} worker --loglevel=info --concurrency=1 --without-heartbeat" json_blob = { "client_id": "me", @@ -590,7 +590,7 @@ def rent_nodes( command: str = None, ) -> List[Dict]: """ - Rent nodes as an instance on the Vast.ai platform. + Rent nodes as an instance on the Vast.ai platform. Args: max_price (float): The maximum price per hour for the nodes. @@ -648,6 +648,26 @@ def rent_nodes( break return rented_nodes + def get_node_log(self, node: Dict, wait_time: int = 2): + + node_id = node["instance_id"] + url = f"https://console.vast.ai/api/v0/instances/request_logs/{node_id}/" + + payload = {"tail": "1000"} + headers = { + "Accept": "application/json", + "Authorization": "Bearer ac8b1195eb3f71e5d3520b6c2cbd81b671b05619d5f1b276eaaf25f5177b0599", + } + + response = requests.request( + "PUT", url, headers=headers, json=payload, timeout=5 + ) + log_url = response.json()["result_url"] + time.sleep(wait_time) + log_response = requests.get(log_url, timeout=5) + + return log_response + def terminate_nodes(self, nodes: List[Dict]) -> None: """ Terminate the instances of rented nodes on Vast.ai. @@ -666,7 +686,9 @@ def terminate_nodes(self, nodes: List[Dict]) -> None: f"Error terminating node: {node['instance_id']}, {str(e)}", "error" ) - def monitor_tasks(self, tasks, update_interval = 1, show_time_left=True, print_statements=True): + def monitor_tasks( + self, tasks, update_interval=1, show_time_left=True, print_statements=True + ): """ Monitor the status of the tasks on the Vast.ai nodes. @@ -675,7 +697,7 @@ def monitor_tasks(self, tasks, update_interval = 1, show_time_left=True, print_s update_interval (bool): Number of seconds the status of tasks are updated. show_time_left (bool): Show the estimated time left to complete tasks using the tqdm progress bar print_statments (bool): Allow printing of status of task queue - + Raises: Exception: If error in the process of executing the tasks """ @@ -691,15 +713,12 @@ def monitor_tasks(self, tasks, update_interval = 1, show_time_left=True, print_s pbar.update(current_tasks - pbar.n) time.sleep(update_interval) except Exception as e: - self.log( - f"Error in executing tasks on nodes, {str(e)}" - ) - + self.log(f"Error in executing tasks on nodes, {str(e)}") + if all(task.ready() for task in tasks): print("All tasks completed.") - distributask = None @@ -707,7 +726,7 @@ def create_from_config(config_path="config.json", env_path=".env") -> Distributa """ Create Distributask object using settings that merge config.json and .env files present in distributask directory. If there are conflicting values, the .env takes priority. - + Args: config_path (str): path to config.json file env_path (str): path to .env file