Skip to content

Commit

Permalink
Drop Python 3.7 (dask#562)
Browse files Browse the repository at this point in the history
* Drop Python 3.7

* Fix cleanup fixture probem (see fistributed#9137)

* Override cleanup distributed fixture, and reconfigure dask-jobqueue when called

* Use power of 2 for the memory checks in tests (see dask#7484)

* Apply Black

* Apply correct version of black...

* conda install Python in HTCondor Docker image

* Fix HTCondor Dockerfile

* Fix PBS Issue

* Add a timeout for every wait_for_workers call

* Fix HTCondor tests, leave room for scheduling cycle to take place for HTCondor

* flake check

* debugging HTCondor tests on CI

* Flake

* reduce negotiator interval for faster job queuing, more debugging logs

* move condor command at the right place

* always run cleanup step, and print more things on HTCondor

* Clean temporary debug and other not necessary modifications

* Disable HTCondor CI for now

* Import loop_in_thread fixture from distributed

* Override method from distributed to make tests pass
  • Loading branch information
guillaumeeb authored Aug 7, 2022
1 parent a04811a commit 5308486
Show file tree
Hide file tree
Showing 21 changed files with 265 additions and 155 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ jobs:
strategy:
fail-fast: false
matrix:
jobqueue: ["htcondor", "pbs", "sge", "slurm", "none"]
#HTCondor disabled for now
jobqueue: ["pbs", "sge", "slurm", "none"]

steps:
- name: Cancel previous runs
Expand Down Expand Up @@ -51,6 +52,7 @@ jobs:
jobqueue_script
- name: Cleanup
if: always()
run: |
source ci/${{ matrix.jobqueue }}.sh
jobqueue_after_script
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ log
.cache/
.pytest_cache
docs/source/generated
dask-worker-space/
2 changes: 1 addition & 1 deletion ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
- defaults
dependencies:
- python=3.7
- python=3.8
- dask
- distributed
- flake8
Expand Down
7 changes: 6 additions & 1 deletion ci/htcondor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ function jobqueue_before_install {
docker-compose pull
docker-compose build
./start-htcondor.sh
docker-compose exec -T submit /bin/bash -c "condor_status"
docker-compose exec -T submit /bin/bash -c "condor_q"
cd -

docker ps -a
Expand All @@ -23,12 +25,15 @@ function jobqueue_install {

function jobqueue_script {
cd ./ci/htcondor
docker-compose exec -T --user submituser submit /bin/bash -c "cd; pytest /dask-jobqueue/dask_jobqueue --verbose -E htcondor -s"
docker-compose exec -T --user submituser submit /bin/bash -c "cd; pytest /dask-jobqueue/dask_jobqueue --log-cli-level DEBUG --capture=tee-sys --verbose -E htcondor "
cd -
}

function jobqueue_after_script {
cd ./ci/htcondor
docker-compose exec -T submit /bin/bash -c "condor_q"
docker-compose exec -T submit /bin/bash -c "condor_status"
docker-compose exec -T submit /bin/bash -c "condor_history"
docker-compose exec -T cm /bin/bash -c " grep -R \"\" /var/log/condor/ "
cd -
}
17 changes: 10 additions & 7 deletions ci/htcondor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
FROM htcondor/submit:el7 as submit

RUN yum install -y gcc git
RUN yum install -y python3-devel python3-pip
RUN pip3 install dask distributed pytest
RUN curl -o miniconda.sh https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && \
bash miniconda.sh -f -b -p /opt/anaconda && \
/opt/anaconda/bin/conda clean -tipy && \
rm -f miniconda.sh
ENV PATH /opt/anaconda/bin:$PATH
RUN conda install --yes -c conda-forge python=3.8 dask distributed flake8 pytest pytest-asyncio

FROM htcondor/execute:el7 as execute
RUN yum install -y python3
COPY --from=submit /usr/local/lib/python3.6 /usr/local/lib/python3.6
COPY --from=submit /usr/local/lib64/python3.6 /usr/local/lib64/python3.6
FROM htcondor/execute:el7 as execute

COPY --from=submit /opt/anaconda /opt/anaconda
ENV PATH /opt/anaconda/bin:$PATH
1 change: 1 addition & 0 deletions ci/htcondor/condor_config.local
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
NEGOTIATOR_INTERVAL=10
4 changes: 4 additions & 0 deletions ci/htcondor/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
- USE_POOL_PASSWORD=yes
volumes:
- secrets:/root/secrets
- ./condor_config.local:/etc/condor/condor_config.local
command: bash -c 'condor_store_cred -p password -f /root/secrets/pool_password ; exec bash -x /start.sh'

submit:
Expand All @@ -24,6 +25,7 @@ services:
volumes:
- secrets:/root/secrets
- ../..:/dask-jobqueue
- ./condor_config.local:/etc/condor/condor_config.local

execute1:
image: daskdev/dask-jobqueue:htcondor-execute
Expand All @@ -38,6 +40,7 @@ services:
- cm
volumes:
- secrets:/root/secrets
- ./condor_config.local:/etc/condor/condor_config.local

execute2:
image: daskdev/dask-jobqueue:htcondor-execute
Expand All @@ -52,6 +55,7 @@ services:
- cm
volumes:
- secrets:/root/secrets
- ./condor_config.local:/etc/condor/condor_config.local

volumes:
secrets:
2 changes: 1 addition & 1 deletion ci/pbs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ RUN curl -o miniconda.sh https://repo.anaconda.com/miniconda/Miniconda3-latest-L
bash miniconda.sh -f -b -p /opt/anaconda && \
/opt/anaconda/bin/conda clean -tipy && \
rm -f miniconda.sh
RUN conda install --yes -c conda-forge python=3.7 dask distributed flake8 pytest pytest-asyncio
RUN conda install --yes -c conda-forge python=3.8 dask distributed flake8 pytest pytest-asyncio

# Copy entrypoint and other needed scripts
COPY ./*.sh /
Expand Down
6 changes: 3 additions & 3 deletions ci/sge/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
context: .
target: master
args:
PYTHON_VERSION: 3.7
PYTHON_VERSION: 3.8
container_name: sge_master
hostname: sge_master
#network_mode: host
Expand All @@ -22,7 +22,7 @@ services:
context: .
target: slave
args:
PYTHON_VERSION: 3.7
PYTHON_VERSION: 3.8
container_name: slave_one
hostname: slave_one
#network_mode: host
Expand All @@ -40,7 +40,7 @@ services:
context: .
target: slave
args:
PYTHON_VERSION: 3.7
PYTHON_VERSION: 3.8
container_name: slave_two
hostname: slave_two
#network_mode: host
Expand Down
2 changes: 1 addition & 1 deletion ci/slurm/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN curl -o miniconda.sh https://repo.anaconda.com/miniconda/Miniconda3-latest-L
/opt/anaconda/bin/conda clean -tipy && \
rm -f miniconda.sh
ENV PATH /opt/anaconda/bin:$PATH
RUN conda install --yes -c conda-forge python=3.7 dask distributed flake8 pytest pytest-asyncio
RUN conda install --yes -c conda-forge python=3.8 dask distributed flake8 pytest pytest-asyncio

ENV LC_ALL en_US.UTF-8

Expand Down
27 changes: 26 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# content of conftest.py

# Make loop fixture available in all tests
from distributed.utils_test import loop # noqa: F401
from distributed.utils_test import loop, loop_in_thread # noqa: F401

import pytest

import dask_jobqueue.config
import dask_jobqueue.lsf
import dask
import distributed.utils_test
import copy

from dask_jobqueue import (
PBSCluster,
Expand Down Expand Up @@ -94,6 +97,28 @@ def mock_lsf_version(monkeypatch, request):
}


# Overriding cleanup method from distributed that has been added to the loop
# fixture, because it just wipe the Main Loop in our tests, and dask-jobqueue is
# not ready for this.
# FIXME
@pytest.fixture
def cleanup():
dask_jobqueue.config.reconfigure()
yield


# Overriding distributed.utils_test.reset_config() method because it reset the
# config from ou tests.
# FIXME
def reset_config():
dask.config.config.clear()
dask.config.config.update(copy.deepcopy(distributed.utils_test.original_config))
dask_jobqueue.config.reconfigure()


distributed.utils_test.reset_config = reset_config


@pytest.fixture(
params=[pytest.param(v, marks=[pytest.mark.env(k)]) for (k, v) in all_envs.items()]
)
Expand Down
15 changes: 10 additions & 5 deletions dask_jobqueue/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
import dask
import yaml

fn = os.path.join(os.path.dirname(__file__), "jobqueue.yaml")
dask.config.ensure_file(source=fn)

with open(fn) as f:
defaults = yaml.safe_load(f)
def reconfigure():
fn = os.path.join(os.path.dirname(__file__), "jobqueue.yaml")
dask.config.ensure_file(source=fn)

dask.config.update(dask.config.config, defaults, priority="old")
with open(fn) as f:
defaults = yaml.safe_load(f)

dask.config.update_defaults(defaults)


reconfigure()
15 changes: 8 additions & 7 deletions dask_jobqueue/tests/test_htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dask_jobqueue import HTCondorCluster
from dask_jobqueue.core import Job

QUEUE_WAIT = 30 # seconds
from . import QUEUE_WAIT


def test_header():
Expand Down Expand Up @@ -67,20 +67,18 @@ def test_job_script():

@pytest.mark.env("htcondor")
def test_basic(loop):
with HTCondorCluster(cores=1, memory="100MB", disk="100MB", loop=loop) as cluster:
with HTCondorCluster(cores=1, memory="100MiB", disk="100MiB", loop=loop) as cluster:
with Client(cluster) as client:

cluster.scale(2)

start = time()
client.wait_for_workers(2)
client.wait_for_workers(2, timeout=QUEUE_WAIT)

future = client.submit(lambda x: x + 1, 10)
assert future.result(QUEUE_WAIT) == 11

workers = list(client.scheduler_info()["workers"].values())
w = workers[0]
assert w["memory_limit"] == 1e8
assert w["memory_limit"] == 100 * 1024**2
assert w["nthreads"] == 1

cluster.scale(0)
Expand All @@ -104,7 +102,7 @@ def test_extra_args_broken_cancel(loop):

cluster.scale(2)

client.wait_for_workers(2)
client.wait_for_workers(2, timeout=QUEUE_WAIT)
workers = Job._call(["condor_q", "-af", "jobpid"]).strip()
assert workers, "we got dask workers"

Expand All @@ -120,6 +118,9 @@ def test_extra_args_broken_cancel(loop):
if time() > start + QUEUE_WAIT // 3:
return

# Remove running job as it prevents other jobs execution in subsequent tests
Job._call(["condor_rm", "-all"]).strip()


def test_config_name_htcondor_takes_custom_config():
conf = {
Expand Down
11 changes: 6 additions & 5 deletions dask_jobqueue/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dask_jobqueue.pbs import PBSJob
from dask_jobqueue.core import JobQueueCluster
from dask.distributed import Scheduler, Client
from dask_jobqueue.tests import QUEUE_WAIT
from distributed.core import Status

import pytest
Expand All @@ -23,7 +24,7 @@ async def test_job(EnvSpecificCluster):
job = job_cls(scheduler=s.address, name="foo", cores=1, memory="1GB")
job = await job
async with Client(s.address, asynchronous=True) as client:
await client.wait_for_workers(1)
await client.wait_for_workers(1, timeout=QUEUE_WAIT)
assert list(s.workers.values())[0].name == "foo"

await job.close()
Expand All @@ -47,7 +48,7 @@ async def test_cluster(EnvSpecificCluster):
assert len(cluster.workers) == 2
assert all(isinstance(w, job_cls) for w in cluster.workers.values())
assert all(w.status == Status.running for w in cluster.workers.values())
await client.wait_for_workers(2)
await client.wait_for_workers(2, timeout=QUEUE_WAIT)

cluster.scale(1)
start = time()
Expand All @@ -64,7 +65,7 @@ async def test_adapt(EnvSpecificCluster):
1, cores=1, memory="1GB", job_cls=job_cls, asynchronous=True, name="foo"
) as cluster:
async with Client(cluster, asynchronous=True) as client:
await client.wait_for_workers(1)
await client.wait_for_workers(1, timeout=QUEUE_WAIT)
cluster.adapt(minimum=0, maximum=4, interval="10ms")

start = time()
Expand All @@ -75,7 +76,7 @@ async def test_adapt(EnvSpecificCluster):
assert not cluster.workers

future = client.submit(lambda: 0)
await client.wait_for_workers(1)
await client.wait_for_workers(1, timeout=QUEUE_WAIT)

del future

Expand Down Expand Up @@ -121,7 +122,7 @@ async def test_nprocs_scale():
async with Client(cluster, asynchronous=True) as client:
cluster.scale(cores=2)
await cluster
await client.wait_for_workers(2)
await client.wait_for_workers(2, timeout=QUEUE_WAIT)
assert len(cluster.workers) == 1 # two workers, one job
assert len(s.workers) == 2
assert cluster.plan == {ws.name for ws in s.workers.values()}
Expand Down
Loading

0 comments on commit 5308486

Please sign in to comment.