From 25a5fd3a670e67b239bdedd070917e080113bd15 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Fri, 28 Jul 2023 15:04:20 +0200 Subject: [PATCH] Issue #432 Fix failing tests on Windows --- tests/extra/test_job_management.py | 144 ++++++++++++++++++++++++++++- 1 file changed, 142 insertions(+), 2 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 989fd8239..00e4eb4e3 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,5 +1,7 @@ import json import multiprocessing +import platform +import threading from unittest import mock # TODO: can we avoid using httpretty? @@ -117,6 +119,13 @@ def start_job(row, connection, **kwargs): metadata_path = manager.get_job_metadata_path(job_id="job-2022") assert metadata_path.exists() + @pytest.mark.skipif( + platform.system() == "Windows", + reason=( + "Windows support for multiprocessing is too different, the errors to" + "solve are too complicated: pickling certain local functions fails." + ), + ) def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): """Make sure the MultiBackendJobManager does not hang after all processes finish. @@ -220,11 +229,10 @@ def start_job(row, connection, **kwargs): def start_worker_thread(): manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - # process_finished_file.write_text("done") # This should be finished almost immediately within a second. # If it takes too long then we assume it will run forever. - # We will check that the exit code of this process == 0, i.e. it finished normally. + # We will check that the exit code of this process == 0, i.e. it finished normally. # If it gets killed it will have a different exit code # (On Linux usually -9 SIGKILL) proc = multiprocessing.Process(target=start_worker_thread, name="Worker process") @@ -258,6 +266,138 @@ def start_worker_thread(): metadata_path = manager.get_job_metadata_path(job_id="job-2021") assert metadata_path.exists() + def test_manager_must_exit_when_all_jobs_done_windows(self, tmp_path, requests_mock, sleep_mock): + """Make sure the MultiBackendJobManager does not hang after all processes finish. + + Regression test for: + https://github.com/Open-EO/openeo-python-client/issues/432 + + Cause was that the run_jobs had an infinite loop when jobs ended with status error. + """ + + requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) + requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) + + def mock_job_status(job_id, succeeds: bool): + """Mock job status polling sequence. + We set up one job with finishes with status error + """ + response_list = sum( + [ + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "queued", + } + } + ], + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "running", + } + } + ], + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "finished" if succeeds else "error", + } + } + ], + ], + [], + ) + for backend in ["http://foo.test", "http://bar.test"]: + requests_mock.get(f"{backend}/jobs/{job_id}", response_list) + # It also needs job results endpoint for succesful jobs and the + # log endpoint for a failed job. Both are dummy implementations. + # When the job is finished the system tries to download the + # results or the logs and that is what needs these endpoints. + if succeeds: + requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) + else: + response = { + "level": "error", + "logs": [ + { + "id": "1", + "code": "SampleError", + "level": "error", + "message": "Error for testing", + "time": "2019-08-24T14:15:22Z", + "data": None, + "path": [], + "usage": {}, + "links": [], + } + ], + "links": [], + } + requests_mock.get(f"{backend}/jobs/{job_id}/logs?level=error", json=response) + + mock_job_status("job-2018", succeeds=True) + mock_job_status("job-2019", succeeds=True) + mock_job_status("job-2020", succeeds=True) + mock_job_status("job-2021", succeeds=True) + mock_job_status("job-2022", succeeds=False) + + root_dir = tmp_path / "job_mgr_root" + manager = MultiBackendJobManager(root_dir=root_dir) + + manager.add_backend("foo", connection=openeo.connect("http://foo.test")) + manager.add_backend("bar", connection=openeo.connect("http://bar.test")) + + df = pd.DataFrame( + { + "year": [2018, 2019, 2020, 2021, 2022], + # Use simple points in WKT format to test conversion to the geometry dtype + "geometry": ["POINT (1 2)"] * 5, + } + ) + output_file = tmp_path / "jobs.csv" + + def start_job(row, connection, **kwargs): + year = row["year"] + return BatchJob(job_id=f"job-{year}", connection=connection) + + is_done_file = tmp_path / "is_done.txt" + + def start_worker_thread(): + manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + is_done_file.write_text("Done!") + + thread = threading.Thread(target=start_worker_thread, name="Worker process", daemon=True) + + timeout_sec = 5.0 + thread.start() + # We stop waiting for the process after the timeout. + # If that happens it is likely we detected that run_jobs will loop infinitely. + thread.join(timeout=timeout_sec) + + assert is_done_file.exists(), ( + "MultiBackendJobManager did not finish on its own and was killed. " + "Infinite loop is probable." + ) + + # Also check that we got sensible end results. + result = pd.read_csv(output_file) + assert len(result) == 5 + assert set(result.status) == {"finished", "error"} + assert set(result.backend_name) == {"foo", "bar"} + + # We expect that the job metadata was saved for a successful job, + # so verify that it exists. + # Checking it for one of the jobs is enough. + metadata_path = manager.get_job_metadata_path(job_id="job-2021") + assert metadata_path.exists() + + def test_on_error_log(self, tmp_path, requests_mock): backend = "http://foo.test" requests_mock.get(backend, json={"api_version": "1.1.0"})