Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added function to get log from worker #75

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions distributask/distributask.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

class Distributask:
"""
The Distributask class contains the core features of distributask, including creating and
executing the task queue, managing workers using the Vast.ai API, and uploading files and directories
The Distributask class contains the core features of distributask, including creating and
executing the task queue, managing workers using the Vast.ai API, and uploading files and directories
using the Hugging Face API.
"""

Expand All @@ -40,7 +40,7 @@ def __init__(
broker_pool_limit=os.getenv("BROKER_POOL_LIMIT", 1),
) -> None:
"""
Initialize the Distributask object with the provided configuration parameters. Also sets some
Initialize the Distributask object with the provided configuration parameters. Also sets some
default settings in Celery and handles cleanup of Celery queue and Redis server on exit.

Args:
Expand Down Expand Up @@ -283,7 +283,7 @@ def initialize_dataset(self, **kwargs) -> None:
Initialize a Hugging Face repository if it doesn't exist. Reads Hugging Face info from config or .env

Args:
kwargs: kwargs that can be passed into the HfApi.create_repo function.
kwargs: kwargs that can be passed into the HfApi.create_repo function.

Raises:
HTTPError: If repo cannot be created due to connection error other than repo not existing
Expand Down Expand Up @@ -543,7 +543,7 @@ def create_instance(
raise ValueError("VAST_API_KEY is not set in the environment")

if command is None:
command = f"celery -A {module_name} worker --loglevel=info --concurrency=1"
command = f"celery -A {module_name} worker --loglevel=info --concurrency=1 --without-heartbeat"

json_blob = {
"client_id": "me",
Expand Down Expand Up @@ -590,7 +590,7 @@ def rent_nodes(
command: str = None,
) -> List[Dict]:
"""
Rent nodes as an instance on the Vast.ai platform.
Rent nodes as an instance on the Vast.ai platform.

Args:
max_price (float): The maximum price per hour for the nodes.
Expand Down Expand Up @@ -648,6 +648,26 @@ def rent_nodes(
break
return rented_nodes

def get_node_log(self, node: Dict, wait_time: int = 2):

node_id = node["instance_id"]
url = f"https://console.vast.ai/api/v0/instances/request_logs/{node_id}/"

payload = {"tail": "1000"}
headers = {
"Accept": "application/json",
"Authorization": "Bearer ac8b1195eb3f71e5d3520b6c2cbd81b671b05619d5f1b276eaaf25f5177b0599",
}

response = requests.request(
"PUT", url, headers=headers, json=payload, timeout=5
)
log_url = response.json()["result_url"]
time.sleep(wait_time)
log_response = requests.get(log_url, timeout=5)

return log_response

def terminate_nodes(self, nodes: List[Dict]) -> None:
"""
Terminate the instances of rented nodes on Vast.ai.
Expand All @@ -666,7 +686,9 @@ def terminate_nodes(self, nodes: List[Dict]) -> None:
f"Error terminating node: {node['instance_id']}, {str(e)}", "error"
)

def monitor_tasks(self, tasks, update_interval = 1, show_time_left=True, print_statements=True):
def monitor_tasks(
self, tasks, update_interval=1, show_time_left=True, print_statements=True
):
"""
Monitor the status of the tasks on the Vast.ai nodes.

Expand All @@ -675,7 +697,7 @@ def monitor_tasks(self, tasks, update_interval = 1, show_time_left=True, print_s
update_interval (bool): Number of seconds the status of tasks are updated.
show_time_left (bool): Show the estimated time left to complete tasks using the tqdm progress bar
print_statments (bool): Allow printing of status of task queue

Raises:
Exception: If error in the process of executing the tasks
"""
Expand All @@ -691,23 +713,20 @@ def monitor_tasks(self, tasks, update_interval = 1, show_time_left=True, print_s
pbar.update(current_tasks - pbar.n)
time.sleep(update_interval)
except Exception as e:
self.log(
f"Error in executing tasks on nodes, {str(e)}"
)

self.log(f"Error in executing tasks on nodes, {str(e)}")

if all(task.ready() for task in tasks):
print("All tasks completed.")



distributask = None


def create_from_config(config_path="config.json", env_path=".env") -> Distributask:
"""
Create Distributask object using settings that merge config.json and .env files present in distributask directory.
If there are conflicting values, the .env takes priority.

Args:
config_path (str): path to config.json file
env_path (str): path to .env file
Expand Down
Loading