Skip to content

Commit

Permalink
Merge pull request #42 from skrityak/master
Browse files Browse the repository at this point in the history
Wait for job completion after reading all logs
  • Loading branch information
skrityak authored Aug 17, 2020
2 parents 44918a0 + 5f2ff37 commit dd88346
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
2 changes: 2 additions & 0 deletions eastern/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def wait_completion(self, idle_timeout=None):

args = self.kubectl.get_launch_args() + ["logs", "-f", pod_name]
ProcessTimeout(idle_timeout, *args).run_sync()
# wait for job completion
retry(lambda: self.is_completed(), count=10)

def get_pod_name(self):
"""
Expand Down
26 changes: 24 additions & 2 deletions tests/test_job_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pytest

from eastern.job_manager import JobManager
from eastern.kubectl import JobStatus
from eastern.timeout import ProcessTimeout
from eastern import Kubectl
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

JOB_NAME = "test-job"
MULTI_POD_NAMES = "test-job-9vmlg test-job-2abcd"
Expand All @@ -13,7 +15,6 @@
def kubectl():
return MagicMock(Kubectl)


def test_get_pod_names(kubectl):
kubectl.get_job_pod_name.return_value = MULTI_POD_NAMES
job_manager = JobManager(kubectl, JOB_NAME)
Expand All @@ -30,3 +31,24 @@ def test_get_pod_name(kubectl):
kubectl.get_job_pod_name.return_value = MULTI_POD_NAMES
job_manager = JobManager(kubectl, JOB_NAME)
assert job_manager.get_pod_name() == "test-job-2abcd"


def test_wait_completion_success(kubectl):
kubectl.get_job_pod_name.return_value = JOB_NAME
kubectl.get_pod_phase.return_value = 'RUNNING'
job_manager = JobManager(kubectl, JOB_NAME)
kubectl.get_job_status.return_value = JobStatus({"active" : 0, "succeeded" : 1})
job_manager.wait_completion()


@patch.object(ProcessTimeout, 'run_sync', MagicMock(return_value=None))
def test_wait_completion(kubectl):
kubectl.get_job_pod_name.return_value = JOB_NAME
kubectl.get_pod_phase.return_value = 'RUNNING'

job_manager = JobManager(kubectl, JOB_NAME)
kubectl.get_job_status.side_effect = [ JobStatus({"active" : 1}), JobStatus({"active" : 1}), JobStatus({"active" : 0, "succeeded": 1}) ]

job_manager.wait_completion()


0 comments on commit dd88346

Please sign in to comment.