Skip to content

Commit

Permalink
SDK/python: Update dsort job info query and related tests
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Wilson <[email protected]>
  • Loading branch information
aaronnw committed Aug 22, 2023
1 parent 3986c03 commit 46c1fd1
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 67 deletions.
16 changes: 8 additions & 8 deletions python/aistore/sdk/dsort.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
DSORT_ABORT,
DSORT_UUID,
)
from aistore.sdk.dsort_types import DsortMetrics
from aistore.sdk.dsort_types import JobInfo
from aistore.sdk.errors import Timeout
from aistore.sdk.utils import validate_file, probing_frequency

Expand Down Expand Up @@ -56,17 +56,17 @@ def abort(self):
HTTP_METHOD_DELETE, path=f"{URL_PATH_DSORT}/{DSORT_ABORT}", params=qparam
)

def metrics(self) -> Dict[str, DsortMetrics]:
def get_job_info(self) -> Dict[str, JobInfo]:
"""
Get metrics for a dsort job
Get info for a dsort job
Returns:
Dictionary of metrics for jobs associated with this dsort job
Dictionary of job info for all jobs associated with this dsort
"""
qparam = {DSORT_UUID: [self._dsort_id]}
return self._client.request_deserialize(
HTTP_METHOD_GET,
path=URL_PATH_DSORT,
res_model=Dict[str, DsortMetrics],
res_model=Dict[str, JobInfo],
params=qparam,
)

Expand Down Expand Up @@ -97,12 +97,12 @@ def wait(
if passed > timeout:
raise Timeout("dsort job to finish")
finished = True
for metric in self.metrics().values():
if metric.aborted:
for job_info in self.get_job_info().values():
if job_info.metrics.aborted:
logger.info("DSort job '%s' aborted", self._dsort_id)
return
# Shard creation is the last phase, so check if it's finished
finished = metric.shard_creation.finished and finished
finished = job_info.metrics.shard_creation.finished and finished
if finished:
logger.info("DSort job '%s' finished", self._dsort_id)
return
Expand Down
31 changes: 30 additions & 1 deletion python/aistore/sdk/dsort_types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from typing import List

from pydantic import BaseModel
from pydantic import BaseModel, Field

from aistore.sdk.types import BucketModel


# See ext/dsort/metric.go for cluster-side type definitions

# pylint: disable=too-few-public-methods


class TimeStats(BaseModel):
"""
Expand Down Expand Up @@ -99,3 +104,27 @@ class DsortMetrics(BaseModel):
warnings: List[str] = None
errors: List[str] = None
extended: bool = None


class JobInfo(BaseModel):
"""
Info about a dsort Job, including metrics
"""

id: str
src_bck: BucketModel = Field(alias="src-bck")
dst_bck: BucketModel = Field(alias="dst-bck")
started_time: str = None
finish_time: str = None
extracted_duration: str = Field(alias="started_meta_sorting", default=None)
sorting_duration: str = Field(alias="started_shard_creation", default=None)
creation_duration: str = Field(alias="finished_shard_creation", default=None)
objects: int = Field(alias="loc-objs")
bytes: int = Field(alias="loc-bytes")
metrics: DsortMetrics = Field(alias="Metrics")
aborted: bool
archived: bool

# pylint: disable=missing-class-docstring
class Config:
allow_population_by_field_name = True
7 changes: 3 additions & 4 deletions python/tests/integration/sdk/test_dsort_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ def test_abort(self):
)
dsort.abort()
dsort.wait(timeout=TEST_TIMEOUT)
metrics = dsort.metrics()
for metric in metrics.values():
self.assertTrue(metric.aborted)
self.assertEqual(1, len(metric.errors))
for job_info in dsort.get_job_info().values():
self.assertTrue(job_info.metrics.aborted)
self.assertEqual(1, len(job_info.metrics.errors))
104 changes: 50 additions & 54 deletions python/tests/unit/sdk/test_dsort.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
HTTP_METHOD_GET,
)
from aistore.sdk.dsort import Dsort
from aistore.sdk.dsort_types import DsortMetrics
from aistore.sdk.dsort_types import DsortMetrics, JobInfo
from aistore.sdk.errors import Timeout
from aistore.sdk.utils import probing_frequency

Expand All @@ -22,6 +22,15 @@ def setUp(self) -> None:
self.dsort_id = "123"
self.dsort = Dsort(client=self.mock_client, dsort_id=self.dsort_id)

@staticmethod
def _get_mock_job_info(finished, aborted=False):
mock_metrics = Mock(DsortMetrics)
mock_metrics.aborted = aborted
mock_metrics.shard_creation = Mock(finished=finished)
mock_job_info = Mock(JobInfo)
mock_job_info.metrics = mock_metrics
return mock_job_info

def test_properties(self):
self.assertEqual(self.dsort_id, self.dsort.dsort_id)

Expand Down Expand Up @@ -52,88 +61,81 @@ def test_abort(self):
params={DSORT_UUID: [self.dsort_id]},
)

def test_metrics(self):
metrics = {"id_1": Mock(DsortMetrics)}
self.mock_client.request_deserialize.return_value = metrics
res = self.dsort.metrics()
self.assertEqual(metrics, res)
def test_get_job_info(self):
mock_job_info = {"id_1": Mock(JobInfo)}
self.mock_client.request_deserialize.return_value = mock_job_info
res = self.dsort.get_job_info()
self.assertEqual(mock_job_info, res)
self.mock_client.request_deserialize.assert_called_with(
HTTP_METHOD_GET,
path=URL_PATH_DSORT,
res_model=Dict[str, DsortMetrics],
res_model=Dict[str, JobInfo],
params={DSORT_UUID: [self.dsort_id]},
)

@patch("aistore.sdk.dsort.time.sleep")
@patch("aistore.sdk.dsort.Dsort.metrics")
def test_wait_default_timeout(self, mock_metrics, mock_sleep):
@patch("aistore.sdk.dsort.Dsort.get_job_info")
def test_wait_default_timeout(self, mock_get_job_info, mock_sleep):
timeout = 300
frequency = probing_frequency(timeout)
expected_metrics_calls = [
expected_job_info_calls = [
call(),
call(),
call(),
]
expected_sleep_calls = [call(frequency), call(frequency)]
self._wait_test_helper(
self.dsort,
mock_metrics,
mock_get_job_info,
mock_sleep,
expected_metrics_calls,
expected_job_info_calls,
expected_sleep_calls,
)

@patch("aistore.sdk.dsort.time.sleep")
@patch("aistore.sdk.dsort.Dsort.metrics")
def test_wait(self, mock_status, mock_sleep):
@patch("aistore.sdk.dsort.Dsort.get_job_info")
def test_wait(self, mock_get_job_info, mock_sleep):
timeout = 20
frequency = probing_frequency(timeout)
expected_metrics_calls = [call(), call(), call()]
expected_job_info_calls = [call(), call(), call()]
expected_sleep_calls = [call(frequency), call(frequency)]
self._wait_test_helper(
self.dsort,
mock_status,
mock_get_job_info,
mock_sleep,
expected_metrics_calls,
expected_job_info_calls,
expected_sleep_calls,
timeout=timeout,
)

@patch("aistore.sdk.dsort.time.sleep")
@patch("aistore.sdk.dsort.Dsort.metrics")
@patch("aistore.sdk.dsort.Dsort.get_job_info")
# pylint: disable=unused-argument
def test_wait_timeout(self, mock_metrics, mock_sleep):
mock_metric = Mock(DsortMetrics)
mock_metric.aborted = False
mock_metric.shard_creation = Mock(finished=False)
mock_metrics.return_value = {"key": mock_metric}
def test_wait_timeout(self, mock_get_job_info, mock_sleep):
mock_get_job_info.return_value = {
"key": self._get_mock_job_info(finished=False, aborted=False)
}
self.assertRaises(Timeout, self.dsort.wait)

@patch("aistore.sdk.dsort.time.sleep")
@patch("aistore.sdk.dsort.Dsort.metrics")
def test_wait_aborted(self, mock_metrics, mock_sleep):
@patch("aistore.sdk.dsort.Dsort.get_job_info")
def test_wait_aborted(self, mock_get_job_info, mock_sleep):
timeout = 300
frequency = probing_frequency(timeout)
expected_metrics_calls = [
call(),
call(),
]
expected_sleep_calls = [call(frequency)]
unfinished_metric = Mock(DsortMetrics)
unfinished_metric.aborted = False
unfinished_metric.shard_creation = Mock(finished=False)
aborted_metric = Mock(DsortMetrics)
aborted_metric.aborted = True
aborted_metric.shard_creation = Mock(finished=False)
mock_metrics.side_effect = [
{"key": unfinished_metric},
{"key": aborted_metric},
{"key": unfinished_metric},
mock_get_job_info.side_effect = [
{"key": self._get_mock_job_info(finished=False)},
{"key": self._get_mock_job_info(finished=False, aborted=True)},
{"key": self._get_mock_job_info(finished=False)},
]

self._wait_exec_assert(
self.dsort,
mock_metrics,
mock_get_job_info,
mock_sleep,
expected_metrics_calls,
expected_sleep_calls,
Expand All @@ -143,44 +145,38 @@ def test_wait_aborted(self, mock_metrics, mock_sleep):
def _wait_test_helper(
self,
dsort,
mock_metrics,
mock_get_job_info,
mock_sleep,
expected_metrics_calls,
expected_job_info_calls,
expected_sleep_calls,
**kwargs,
):
unfinished_metric = Mock(DsortMetrics)
unfinished_metric.aborted = False
unfinished_metric.shard_creation = Mock(finished=False)
finished_metric = Mock(DsortMetrics)
finished_metric.aborted = False
finished_metric.shard_creation = Mock(finished=True)
mock_metrics.side_effect = [
{"key": unfinished_metric},
{"key": unfinished_metric},
{"key": finished_metric},
mock_get_job_info.side_effect = [
{"job_id": self._get_mock_job_info(finished=False)},
{"job_id": self._get_mock_job_info(finished=False)},
{"job_id": self._get_mock_job_info(finished=True)},
]
self._wait_exec_assert(
dsort,
mock_metrics,
mock_get_job_info,
mock_sleep,
expected_metrics_calls,
expected_job_info_calls,
expected_sleep_calls,
**kwargs,
)

def _wait_exec_assert(
self,
dsort,
mock_metrics,
mock_get_job_info,
mock_sleep,
expected_metrics_calls,
expected_job_info_calls,
expected_sleep_calls,
**kwargs,
):
dsort.wait(**kwargs)

mock_metrics.assert_has_calls(expected_metrics_calls)
mock_get_job_info.assert_has_calls(expected_job_info_calls)
mock_sleep.assert_has_calls(expected_sleep_calls)
self.assertEqual(len(expected_metrics_calls), mock_metrics.call_count)
self.assertEqual(len(expected_job_info_calls), mock_get_job_info.call_count)
self.assertEqual(len(expected_sleep_calls), mock_sleep.call_count)

0 comments on commit 46c1fd1

Please sign in to comment.