Skip to content

Commit

Permalink
removed test dependency on inotify-tools (inotifywait)
Browse files Browse the repository at this point in the history
and related tweaks to improve test reliability across systems
  • Loading branch information
jesteria committed Jun 28, 2023
1 parent 9bad4b2 commit 1ccd9bb
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-library.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Check out repository
uses: actions/checkout@v3

- name: Install dependencies
- name: Install library dependencies
run: |
pip install poetry~=1.4
poetry install --only main --only test
Expand Down
18 changes: 17 additions & 1 deletion test/test_sched/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
import pytest

from .fixture import SchedulerFixture
from .fixture import (
LockingTaskFixture,
SchedulerFixture,
)


@pytest.fixture
def schedpatch(confpatch):
return SchedulerFixture(confpatch.conf, confpatch.logger)


@pytest.fixture
def locking_task(tmp_path):
lock = LockingTaskFixture(tmp_path)

lock.acquire()

try:
yield lock
finally:
if lock.locked:
lock.release()
1 change: 1 addition & 0 deletions test/test_sched/fixture/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .sched import SchedulerFixture # noqa: F401
from .task import LockingTaskFixture # noqa: F401
50 changes: 50 additions & 0 deletions test/test_sched/fixture/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import fcntl
import textwrap

from descriptors import cachedproperty


class LockingTaskFixture:

def __init__(self, basedir, path='opt/done.lock', result='done\n'):
self.lock_path = basedir / path
self.result = result
self.locked = False

@property
def conf(self):
return {
'shell': {
'executable': 'python',
'script': textwrap.dedent(
'''\
import fcntl, json, sys
param = json.load(sys.stdin)
with open(param['lock_path'], 'w') as fd:
fcntl.flock(fd, fcntl.LOCK_EX)
print(param['result'], end='')
fcntl.flock(fd, fcntl.LOCK_UN)
'''
),
},
'param': {
'lock_path': str(self.lock_path),
'result': self.result,
},
}

@cachedproperty
def lock_fd(self):
self.lock_path.parent.mkdir()
return self.lock_path.open('w')

def acquire(self):
fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.locked = True

def release(self):
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
self.lock_fd.close()
self.locked = False
84 changes: 46 additions & 38 deletions test/test_sched/test_tiered_tenancy.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_skips(confpatch, schedpatch, monkeypatch):
assert logs.field_equals(msg='skipped: suppressed by if/unless condition')


def test_refill_primary_cohort(confpatch, schedpatch, monkeypatch, tmp_path):
def test_refill_primary_cohort(locking_task, confpatch, schedpatch, monkeypatch, tmp_path):
#
# configure a long-running task kicked off at minute 0 and another task at minute 1
#
Expand All @@ -96,29 +96,17 @@ def test_refill_primary_cohort(confpatch, schedpatch, monkeypatch, tmp_path):
# because there's no tenancy-related hold-up, the primary cohort (initial check) is
# immediately enqueued, and the refill simply recreates this cohort.
#
# (really the initial task will just wait on the creation of a file ... which will be
# created by the subsequent task. as such, the initial task will run only as long as needed
# for the test.)
# (really the initial task will just wait on release of a file lock.
# as such, the initial task will run only as long as needed for the test.)
#
release_dir = tmp_path / 'opt'
release_dir.mkdir()

release_path = release_dir / 'done.d'

confpatch.set_tasks(
{
'runs-long': {
'exec': [
'inotifywait',
'-qq',
'--event', 'create',
'--timeout', '3',
str(release_dir),
],
**locking_task.conf,
'schedule': '0 * * * *',
},
'runs-late': {
'exec': ['touch', str(release_path)],
'exec': ['echo', 'done'],
'schedule': '1 * * * *',
},
}
Expand All @@ -138,42 +126,69 @@ def test_refill_primary_cohort(confpatch, schedpatch, monkeypatch, tmp_path):
)
)

# scheduler loop must also check current time
#
# recheck 0: one minute into the epoch: cron minute is 1
# recheck 1: (non-refill): nothing to do
# recheck n: (non-refill): nothing to do (number depends on OS scheduler)
check_times = (step / 10 for step in range(600, 610))

monkeypatch.setattr(
'fate.sched.tiered_tenancy.time',
TimeMock(
60.0, # recheck: one minute into the epoch: cron minute is 1
*check_times,
)
)

#
# execute scheduler with captured logs
#
with confpatch.caplog() as logs:
#
# task "runs-long" is blocked and we've patched the scheduler loop's time s.t.
# a minute will immediately appear to have passed -- therefore the first task
# to complete should be "runs-late", enqueued by the re-check.
#
tasks = schedpatch.scheduler()

task0 = next(tasks)

assert task0.__name__ == 'runs-long'
assert task0.__name__ == 'runs-late'
assert task0.exception() is None
assert task0.returncode == 0
assert task0.stdout == 'done\n'
assert task0.stderr == ''

#
# the primary cohort will have enqueued twice -- for "runs-long" and then
# for the refill's "runs-late".
#
assert logs.field_count(level='debug', cohort=0, size=1, msg="enqueued cohort") == 2

assert logs.field_equals(level='debug', active=1, msg="launched pool")
assert logs.field_equals(level='debug', active=1, msg="expanded pool")
assert logs.field_equals(level='debug', active=2, msg="filled pool")

# permit "runs-long" to (finally) complete
locking_task.release()

# exhaust the scheduler of completed tasks
(task1,) = tasks

assert task1.__name__ == 'runs-late'
assert task1.__name__ == 'runs-long'
assert task1.exception() is None
assert task1.returncode == 0
assert task1.stdout == locking_task.result
assert task1.stderr == ''

assert logs.field_equals(level='debug', completed=2, total=2, active=0)
assert logs.field_equals(level='debug', completed=1, total=1, active=1)
assert logs.field_equals(level='debug', completed=1, total=2, active=0)

assert tasks.info.count == 2
assert tasks.info.next == 3600 # one hour past the epoch


def test_refill_secondary_cohort(confpatch, schedpatch, monkeypatch, tmp_path):
def test_refill_secondary_cohort(locking_task, confpatch, schedpatch, monkeypatch, tmp_path):
#
# configure a long-running single-tenancy task kicked off at minute 0, along with another
# task also at minute 0, and another task at minute 1.
Expand All @@ -184,24 +199,13 @@ def test_refill_secondary_cohort(confpatch, schedpatch, monkeypatch, tmp_path):
# remains in the queue. the long-running task (eventually) forces a "recheck" / "refill", and
# the third task must be added to a second (lower-priority) cohort.
#
# (really the initial task will just wait on the creation of a file ... which we'll create in
# test / with a patch, to fully control the task's timing.)
# (really the initial task will just wait on release of a file lock ... which we'll control
# in test / with a patch, to fully control the task's timing.)
#
release_dir = tmp_path / 'opt'
release_dir.mkdir()

release_path = release_dir / 'done.d'

confpatch.set_tasks(
{
'runs-long': {
'exec': [
'inotifywait',
'-qq',
'--event', 'create',
'--timeout', '3',
str(release_dir),
],
**locking_task.conf,
'schedule': '0 * * * *',
'scheduling': {'tenancy': 1},
},
Expand All @@ -222,7 +226,8 @@ def test_refill_secondary_cohort(confpatch, schedpatch, monkeypatch, tmp_path):
#
def patched_sleep(duration):
"""release task "runs-long" during first sleep"""
release_path.touch()
if locking_task.locked:
locking_task.release()

return time.sleep(duration)

Expand Down Expand Up @@ -261,7 +266,8 @@ def patched_sleep(duration):
task0 = next(tasks)

assert task0.__name__ == 'runs-long'
assert task0.stdout == ''
assert task0.exception() is None
assert task0.stdout == locking_task.result
assert task0.stderr == ''

assert logs.field_equals(level='debug', cohort=0, size=2, msg="enqueued cohort")
Expand All @@ -276,6 +282,7 @@ def patched_sleep(duration):
task1 = next(tasks)

assert task1.__name__ == 'on-deck'
assert task1.exception() is None
assert task1.stdout == 'done\n'
assert task1.stderr == ''

Expand All @@ -285,6 +292,7 @@ def patched_sleep(duration):
(task2,) = tasks

assert task2.__name__ == 'runs-late'
assert task2.exception() is None
assert task2.stdout == 'done\n'
assert task2.stderr == ''

Expand Down

0 comments on commit 1ccd9bb

Please sign in to comment.