Skip to content

Commit

Permalink
make device_capabilities async running on a thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexCheema committed Dec 16, 2024
1 parent 036224f commit 1b14be6
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 66 deletions.
27 changes: 26 additions & 1 deletion exo/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,29 @@ async def shutdown(signal, loop, server):
def is_frozen():
return getattr(sys, 'frozen', False) or os.path.basename(sys.executable) == "exo" \
or ('Contents/MacOS' in str(os.path.dirname(sys.executable))) \
or '__nuitka__' in globals() or getattr(sys, '__compiled__', False)
or '__nuitka__' in globals() or getattr(sys, '__compiled__', False)

async def get_mac_system_info() -> Tuple[str, str, int]:
"""Get Mac system information using system_profiler."""
try:
output = await asyncio.get_running_loop().run_in_executor(
subprocess_pool,
lambda: subprocess.check_output(["system_profiler", "SPHardwareDataType"]).decode("utf-8")
)

model_line = next((line for line in output.split("\n") if "Model Name" in line), None)
model_id = model_line.split(": ")[1] if model_line else "Unknown Model"

chip_line = next((line for line in output.split("\n") if "Chip" in line), None)
chip_id = chip_line.split(": ")[1] if chip_line else "Unknown Chip"

memory_line = next((line for line in output.split("\n") if "Memory" in line), None)
memory_str = memory_line.split(": ")[1] if memory_line else "Unknown Memory"
memory_units = memory_str.split()
memory_value = int(memory_units[0])
memory = memory_value * 1024 if memory_units[1] == "GB" else memory_value

return model_id, chip_id, memory
except Exception as e:
if DEBUG >= 2: print(f"Error getting Mac system info: {e}")
return "Unknown Model", "Unknown Chip", 0
2 changes: 1 addition & 1 deletion exo/networking/tailscale/tailscale_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(
self.update_task = None

async def start(self):
self.device_capabilities = device_capabilities()
self.device_capabilities = await device_capabilities()
self.discovery_task = asyncio.create_task(self.task_discover_peers())
self.cleanup_task = asyncio.create_task(self.task_cleanup_peers())
self.update_task = asyncio.create_task(self.task_update_device_posture_attributes())
Expand Down
2 changes: 1 addition & 1 deletion exo/networking/udp/udp_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
self.cleanup_task = None

async def start(self):
self.device_capabilities = device_capabilities()
self.device_capabilities = await device_capabilities()
self.broadcast_task = asyncio.create_task(self.task_broadcast_presence())
self.listen_task = asyncio.create_task(self.task_listen_for_peers())
self.cleanup_task = asyncio.create_task(self.task_cleanup_peers())
Expand Down
5 changes: 3 additions & 2 deletions exo/orchestration/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from exo.networking import Discovery, PeerHandle, Server
from exo.inference.inference_engine import InferenceEngine, Shard
from exo.topology.topology import Topology
from exo.topology.device_capabilities import device_capabilities
from exo.topology.device_capabilities import device_capabilities, UNKNOWN_DEVICE_CAPABILITIES
from exo.topology.partitioning_strategy import Partition, PartitioningStrategy, map_partitions_to_shards
from exo import DEBUG
from exo.helpers import AsyncCallbackSystem
Expand Down Expand Up @@ -37,7 +37,7 @@ def __init__(
self.partitioning_strategy = partitioning_strategy
self.peers: List[PeerHandle] = {}
self.topology: Topology = Topology()
self.device_capabilities = device_capabilities()
self.device_capabilities = UNKNOWN_DEVICE_CAPABILITIES
self.buffered_token_output: Dict[str, Tuple[List[int], bool]] = {}
self.buffered_logits: Dict[str, List[np.ndarray]] = {}
self.buffered_inputs: Dict[str, List[np.ndarray]] = {}
Expand All @@ -56,6 +56,7 @@ def __init__(
self.outstanding_requests = {}

async def start(self, wait_for_peers: int = 0) -> None:
self.device_capabilities = await device_capabilities()
await self.server.start()
await self.discovery.start()
await self.update_peers(wait_for_peers)
Expand Down
9 changes: 9 additions & 0 deletions exo/orchestration/test_node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import unittest
from unittest.mock import Mock, AsyncMock
import numpy as np
import pytest

from .node import Node
from exo.networking.peer_handle import PeerHandle
Expand Down Expand Up @@ -55,3 +56,11 @@ async def test_process_tensor_calls_inference_engine(self):
await self.node.process_tensor(input_tensor, None)

self.node.inference_engine.process_shard.assert_called_once_with(input_tensor)

@pytest.mark.asyncio
async def test_node_capabilities():
node = Node()
await node.initialize()
caps = await node.get_device_capabilities()
assert caps is not None
assert caps.model != ""
37 changes: 15 additions & 22 deletions exo/topology/device_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from exo import DEBUG
import subprocess
import psutil
import asyncio
from exo.helpers import get_mac_system_info, subprocess_pool

TFLOPS = 1.00

Expand Down Expand Up @@ -144,11 +146,11 @@ def to_dict(self):
CHIP_FLOPS.update({f"{key} Laptop GPU": value for key, value in CHIP_FLOPS.items()})


def device_capabilities() -> DeviceCapabilities:
async def device_capabilities() -> DeviceCapabilities:
if psutil.MACOS:
return mac_device_capabilities()
return await mac_device_capabilities()
elif psutil.LINUX:
return linux_device_capabilities()
return await linux_device_capabilities()
else:
return DeviceCapabilities(
model="Unknown Device",
Expand All @@ -158,27 +160,18 @@ def device_capabilities() -> DeviceCapabilities:
)


def mac_device_capabilities() -> DeviceCapabilities:
# Fetch the model of the Mac using system_profiler
model = subprocess.check_output(["system_profiler", "SPHardwareDataType"]).decode("utf-8")
model_line = next((line for line in model.split("\n") if "Model Name" in line), None)
model_id = model_line.split(": ")[1] if model_line else "Unknown Model"
chip_line = next((line for line in model.split("\n") if "Chip" in line), None)
chip_id = chip_line.split(": ")[1] if chip_line else "Unknown Chip"
memory_line = next((line for line in model.split("\n") if "Memory" in line), None)
memory_str = memory_line.split(": ")[1] if memory_line else "Unknown Memory"
memory_units = memory_str.split()
memory_value = int(memory_units[0])
if memory_units[1] == "GB":
memory = memory_value*1024
else:
memory = memory_value

# Assuming static values for other attributes for demonstration
return DeviceCapabilities(model=model_id, chip=chip_id, memory=memory, flops=CHIP_FLOPS.get(chip_id, DeviceFlops(fp32=0, fp16=0, int8=0)))
async def mac_device_capabilities() -> DeviceCapabilities:
model_id, chip_id, memory = await get_mac_system_info()

return DeviceCapabilities(
model=model_id,
chip=chip_id,
memory=memory,
flops=CHIP_FLOPS.get(chip_id, DeviceFlops(fp32=0, fp16=0, int8=0))
)


def linux_device_capabilities() -> DeviceCapabilities:
async def linux_device_capabilities() -> DeviceCapabilities:
import psutil
from tinygrad import Device

Expand Down
4 changes: 3 additions & 1 deletion exo/topology/partitioning_strategy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from abc import ABC, abstractmethod
from typing import List
from typing import List, Dict
from dataclasses import dataclass
from .topology import Topology
from exo.inference.shard import Shard
from exo.topology.device_capabilities import device_capabilities
import asyncio


# Partitions shard-space into pieces of contiguous shards, represented by floating point range [start, end) between 0 and 1
Expand Down
79 changes: 41 additions & 38 deletions exo/topology/test_device_capabilities.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import unittest
import pytest
from unittest.mock import patch
from exo.topology.device_capabilities import mac_device_capabilities, DeviceCapabilities, DeviceFlops, TFLOPS
from exo.topology.device_capabilities import mac_device_capabilities, DeviceCapabilities, DeviceFlops, TFLOPS, device_capabilities


class TestMacDeviceCapabilities(unittest.TestCase):
@patch("subprocess.check_output")
def test_mac_device_capabilities_pro(self, mock_check_output):
@pytest.mark.asyncio
@patch("subprocess.check_output")
async def test_mac_device_capabilities_pro(mock_check_output):
# Mock the subprocess output
mock_check_output.return_value = b"""
Hardware:
Expand All @@ -27,20 +27,19 @@ def test_mac_device_capabilities_pro(self, mock_check_output):
"""

# Call the function
result = mac_device_capabilities()
result = await mac_device_capabilities()

# Check the results
self.assertIsInstance(result, DeviceCapabilities)
self.assertEqual(result.model, "MacBook Pro")
self.assertEqual(result.chip, "Apple M3 Max")
self.assertEqual(result.memory, 131072) # 16 GB in MB
self.assertEqual(
str(result),
"Model: MacBook Pro. Chip: Apple M3 Max. Memory: 131072MB. Flops: 14.20 TFLOPS, fp16: 28.40 TFLOPS, int8: 56.80 TFLOPS",
)

@patch("subprocess.check_output")
def test_mac_device_capabilities_air(self, mock_check_output):
assert isinstance(result, DeviceCapabilities)
assert result.model == "MacBook Pro"
assert result.chip == "Apple M3 Max"
assert result.memory == 131072 # 128 GB in MB
assert str(result) == "Model: MacBook Pro. Chip: Apple M3 Max. Memory: 131072MB. Flops: 14.20 TFLOPS, fp16: 28.40 TFLOPS, int8: 56.80 TFLOPS"


@pytest.mark.asyncio
@patch("subprocess.check_output")
async def test_mac_device_capabilities_air(mock_check_output):
# Mock the subprocess output
mock_check_output.return_value = b"""
Hardware:
Expand All @@ -62,30 +61,34 @@ def test_mac_device_capabilities_air(self, mock_check_output):
"""

# Call the function
result = mac_device_capabilities()
result = await mac_device_capabilities()

# Check the results
self.assertIsInstance(result, DeviceCapabilities)
self.assertEqual(result.model, "MacBook Air")
self.assertEqual(result.chip, "Apple M2")
self.assertEqual(result.memory, 8192) # 8 GB in MB
assert isinstance(result, DeviceCapabilities)
assert result.model == "MacBook Air"
assert result.chip == "Apple M2"
assert result.memory == 8192 # 8 GB in MB


@unittest.skip("Unskip this test when running on a MacBook Pro, Apple M3 Max, 128GB")
def test_mac_device_capabilities_real(self):
@pytest.mark.skip(reason="Unskip this test when running on a MacBook Pro, Apple M3 Max, 128GB")
@pytest.mark.asyncio
async def test_mac_device_capabilities_real():
# Call the function without mocking
result = mac_device_capabilities()
result = await mac_device_capabilities()

# Check the results
self.assertIsInstance(result, DeviceCapabilities)
self.assertEqual(result.model, "MacBook Pro")
self.assertEqual(result.chip, "Apple M3 Max")
self.assertEqual(result.memory, 131072) # 128 GB in MB
self.assertEqual(result.flops, DeviceFlops(fp32=14.20*TFLOPS, fp16=28.40*TFLOPS, int8=56.80*TFLOPS))
self.assertEqual(
str(result),
"Model: MacBook Pro. Chip: Apple M3 Max. Memory: 131072MB. Flops: 14.20 TFLOPS, fp16: 28.40 TFLOPS, int8: 56.80 TFLOPS",
)


if __name__ == "__main__":
unittest.main()
assert isinstance(result, DeviceCapabilities)
assert result.model == "MacBook Pro"
assert result.chip == "Apple M3 Max"
assert result.memory == 131072 # 128 GB in MB
assert result.flops == DeviceFlops(fp32=14.20*TFLOPS, fp16=28.40*TFLOPS, int8=56.80*TFLOPS)
assert str(result) == "Model: MacBook Pro. Chip: Apple M3 Max. Memory: 131072MB. Flops: 14.20 TFLOPS, fp16: 28.40 TFLOPS, int8: 56.80 TFLOPS"


@pytest.mark.asyncio
async def test_device_capabilities():
caps = await device_capabilities()
assert caps.model != ""
assert caps.chip != ""
assert caps.memory > 0
assert caps.flops is not None

0 comments on commit 1b14be6

Please sign in to comment.