From 95ebcd42c5b8461a4430146673509334b97fc52b Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Thu, 27 Jul 2023 18:24:26 +0200 Subject: [PATCH 1/6] Issue #432 Fix: MultibackendJobManager should also finish when some batchjobs finish with error status. --- openeo/extra/job_management.py | 1 + tests/extra/test_job_management.py | 132 +++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index b050a7031..2dedf64fb 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -256,6 +256,7 @@ def run_jobs( (df.status != "finished") & (df.status != "skipped") & (df.status != "start_failed") + & (df.status != "error") ].size > 0 ): diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index f151561c5..8357dc277 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -116,6 +116,138 @@ def start_job(row, connection, **kwargs): metadata_path = manager.get_job_metadata_path(job_id="job-2022") assert metadata_path.exists() + def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): + requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) + requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) + + def mock_job_status(job_id, succeeds: bool): + """Mock job status polling sequence""" + response_list = sum( + [ + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "queued", + } + } + ], + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "running", + } + } + ], + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "finished" if succeeds else "error", + } + } + ], + ], + [], + ) + for backend in ["http://foo.test", "http://bar.test"]: + requests_mock.get(f"{backend}/jobs/{job_id}", response_list) + # It also needs the job results endpoint, though that can be a dummy implementation. + # When the job is finished the system tries to download the results and that is what + # needs this endpoint. + if succeeds: + requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) + else: + response = { + "level": "error", + "logs": [ + { + "id": "1", + "code": "SampleError", + "level": "error", + "message": "Error for testing", + "time": "2019-08-24T14:15:22Z", + "data": None, + "path": [], + "usage": {}, + "links": [], + } + ], + "links": [], + } + requests_mock.get(f"{backend}/jobs/{job_id}/logs?level=error", json=response) + + mock_job_status("job-2018", succeeds=True) + mock_job_status("job-2019", succeeds=True) + mock_job_status("job-2020", succeeds=True) + mock_job_status("job-2021", succeeds=True) + mock_job_status("job-2022", succeeds=False) + + root_dir = tmp_path / "job_mgr_root" + manager = MultiBackendJobManager(root_dir=root_dir) + + manager.add_backend("foo", connection=openeo.connect("http://foo.test")) + manager.add_backend("bar", connection=openeo.connect("http://bar.test")) + + df = pd.DataFrame( + { + "year": [2018, 2019, 2020, 2021, 2022], + # Use simple points in WKT format to test conversion to the geometry dtype + "geometry": ["POINT (1 2)"] * 5, + } + ) + output_file = tmp_path / "jobs.csv" + + def start_job(row, connection, **kwargs): + year = row["year"] + return BatchJob(job_id=f"job-{year}", connection=connection) + + # manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + ## assert sleep_mock.call_count > 1000 + + import multiprocessing + import datetime as dt + from pathlib import Path + + duration_file: Path = tmp_path / "duration.txt" + + def start_worker_thread(): + time_start = dt.datetime.now() + manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + time_end = dt.datetime.now() + dur: dt.timedelta = time_end - time_start + duration_file.write_text(str(dur.total_seconds())) + + # This should be finished almost immediately within a second. + # If it takes too long then we assume it will run forever. + proc = multiprocessing.Process(target=start_worker_thread, name="Worker process") + + timeout_sec = 5.0 + proc.start() + proc.join(timeout=timeout_sec) + if proc.is_alive: + # now forcibly kill the process. + proc.kill() + proc.join() + + assert duration_file.exists(), "MultiBackendJobManager did not stop on its own and was killed" + duration = float(duration_file.read_text()) + assert duration < timeout_sec, "MultiBackendJobManager did stop on its but took longer than expected to finish" + + result = pd.read_csv(output_file) + assert len(result) == 5 + assert set(result.status) == {"finished", "error"} + assert set(result.backend_name) == {"foo", "bar"} + + # We expect that the job metadata was saved, so verify that it exists. + # Checking for one of the jobs is enough. + metadata_path = manager.get_job_metadata_path(job_id="job-2021") + assert metadata_path.exists() + def test_on_error_log(self, tmp_path, requests_mock): backend = "http://foo.test" requests_mock.get(backend, json={"api_version": "1.1.0"}) From 22340df40e33a18209e29c50ffbdfac5465b017e Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Fri, 28 Jul 2023 11:21:50 +0200 Subject: [PATCH 2/6] Issue #432 Simplified unit test. --- tests/extra/test_job_management.py | 55 +++++++++++++++++++----------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 8357dc277..08c5ccb9c 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,4 +1,7 @@ +import datetime as dt import json +import multiprocessing +from pathlib import Path from unittest import mock # TODO: can we avoid using httpretty? @@ -117,11 +120,21 @@ def start_job(row, connection, **kwargs): assert metadata_path.exists() def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): + """Make sure the MultiBackendJobManager does not hang after all processes finish. + + Regression test for: + https://github.com/Open-EO/openeo-python-client/issues/432 + + Cause was that the run_jobs had an infinite loop when jobs ended with status error. + """ + requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) def mock_job_status(job_id, succeeds: bool): - """Mock job status polling sequence""" + """Mock job status polling sequence. + We set up one job with finishes with status error + """ response_list = sum( [ [ @@ -156,9 +169,10 @@ def mock_job_status(job_id, succeeds: bool): ) for backend in ["http://foo.test", "http://bar.test"]: requests_mock.get(f"{backend}/jobs/{job_id}", response_list) - # It also needs the job results endpoint, though that can be a dummy implementation. - # When the job is finished the system tries to download the results and that is what - # needs this endpoint. + # It also needs job results endpoint for succesful jobs and the + # log endpoint for a failed job. Both are dummy implementations. + # When the job is finished the system tries to download the + # results or the logs and that is what needs these endpoints. if succeeds: requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) else: @@ -209,42 +223,43 @@ def start_job(row, connection, **kwargs): # manager.run_jobs(df=df, start_job=start_job, output_file=output_file) ## assert sleep_mock.call_count > 1000 - import multiprocessing - import datetime as dt - from pathlib import Path - - duration_file: Path = tmp_path / "duration.txt" - def start_worker_thread(): - time_start = dt.datetime.now() manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - time_end = dt.datetime.now() - dur: dt.timedelta = time_end - time_start - duration_file.write_text(str(dur.total_seconds())) + # process_finished_file.write_text("done") # This should be finished almost immediately within a second. # If it takes too long then we assume it will run forever. + # We will check that the exit code of this process == 0, i.e. it finished normally. + # If it gets killed it will have a different exit code + # (On Linux usually -9 SIGKILL) proc = multiprocessing.Process(target=start_worker_thread, name="Worker process") timeout_sec = 5.0 proc.start() + # We stop waiting for the process after the timeout. + # If that happens it is likely we detected that run_jobs will loop infinitely. proc.join(timeout=timeout_sec) + if proc.is_alive: - # now forcibly kill the process. + # now forcibly kill the process, then have to join it again. proc.kill() proc.join() - assert duration_file.exists(), "MultiBackendJobManager did not stop on its own and was killed" - duration = float(duration_file.read_text()) - assert duration < timeout_sec, "MultiBackendJobManager did stop on its but took longer than expected to finish" + assert proc.exitcode == 0, ( + "MultiBackendJobManager did not finish on its own and was killed. " + + "Infinite loop is probable. Expected exit code == 0, but found " + + f"proc.exitcode={proc.exitcode!r}, proc={proc!r}" + ) + # Also check that we got sensible end results. result = pd.read_csv(output_file) assert len(result) == 5 assert set(result.status) == {"finished", "error"} assert set(result.backend_name) == {"foo", "bar"} - # We expect that the job metadata was saved, so verify that it exists. - # Checking for one of the jobs is enough. + # We expect that the job metadata was saved for a successful job, + # so verify that it exists. + # Checking it for one of the jobs is enough. metadata_path = manager.get_job_metadata_path(job_id="job-2021") assert metadata_path.exists() From f7be3470e2618b63224cf80366c9b60c72fd1044 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Fri, 28 Jul 2023 11:27:47 +0200 Subject: [PATCH 3/6] Issue #432 Remove unused imports --- tests/extra/test_job_management.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 08c5ccb9c..5679e7dac 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,7 +1,5 @@ -import datetime as dt import json import multiprocessing -from pathlib import Path from unittest import mock # TODO: can we avoid using httpretty? From 4dc2ff5bcd27e853280e1092195b2e4f50acff61 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Fri, 28 Jul 2023 12:16:21 +0200 Subject: [PATCH 4/6] Issue #432 Fix unit test for python 3.6 (method did not exist yet) --- tests/extra/test_job_management.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 5679e7dac..989fd8239 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -218,9 +218,6 @@ def start_job(row, connection, **kwargs): year = row["year"] return BatchJob(job_id=f"job-{year}", connection=connection) - # manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - ## assert sleep_mock.call_count > 1000 - def start_worker_thread(): manager.run_jobs(df=df, start_job=start_job, output_file=output_file) # process_finished_file.write_text("done") @@ -240,7 +237,7 @@ def start_worker_thread(): if proc.is_alive: # now forcibly kill the process, then have to join it again. - proc.kill() + proc.terminate() proc.join() assert proc.exitcode == 0, ( From 25a5fd3a670e67b239bdedd070917e080113bd15 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Fri, 28 Jul 2023 15:04:20 +0200 Subject: [PATCH 5/6] Issue #432 Fix failing tests on Windows --- tests/extra/test_job_management.py | 144 ++++++++++++++++++++++++++++- 1 file changed, 142 insertions(+), 2 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 989fd8239..00e4eb4e3 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,5 +1,7 @@ import json import multiprocessing +import platform +import threading from unittest import mock # TODO: can we avoid using httpretty? @@ -117,6 +119,13 @@ def start_job(row, connection, **kwargs): metadata_path = manager.get_job_metadata_path(job_id="job-2022") assert metadata_path.exists() + @pytest.mark.skipif( + platform.system() == "Windows", + reason=( + "Windows support for multiprocessing is too different, the errors to" + "solve are too complicated: pickling certain local functions fails." + ), + ) def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): """Make sure the MultiBackendJobManager does not hang after all processes finish. @@ -220,11 +229,10 @@ def start_job(row, connection, **kwargs): def start_worker_thread(): manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - # process_finished_file.write_text("done") # This should be finished almost immediately within a second. # If it takes too long then we assume it will run forever. - # We will check that the exit code of this process == 0, i.e. it finished normally. + # We will check that the exit code of this process == 0, i.e. it finished normally. # If it gets killed it will have a different exit code # (On Linux usually -9 SIGKILL) proc = multiprocessing.Process(target=start_worker_thread, name="Worker process") @@ -258,6 +266,138 @@ def start_worker_thread(): metadata_path = manager.get_job_metadata_path(job_id="job-2021") assert metadata_path.exists() + def test_manager_must_exit_when_all_jobs_done_windows(self, tmp_path, requests_mock, sleep_mock): + """Make sure the MultiBackendJobManager does not hang after all processes finish. + + Regression test for: + https://github.com/Open-EO/openeo-python-client/issues/432 + + Cause was that the run_jobs had an infinite loop when jobs ended with status error. + """ + + requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) + requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) + + def mock_job_status(job_id, succeeds: bool): + """Mock job status polling sequence. + We set up one job with finishes with status error + """ + response_list = sum( + [ + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "queued", + } + } + ], + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "running", + } + } + ], + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "finished" if succeeds else "error", + } + } + ], + ], + [], + ) + for backend in ["http://foo.test", "http://bar.test"]: + requests_mock.get(f"{backend}/jobs/{job_id}", response_list) + # It also needs job results endpoint for succesful jobs and the + # log endpoint for a failed job. Both are dummy implementations. + # When the job is finished the system tries to download the + # results or the logs and that is what needs these endpoints. + if succeeds: + requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) + else: + response = { + "level": "error", + "logs": [ + { + "id": "1", + "code": "SampleError", + "level": "error", + "message": "Error for testing", + "time": "2019-08-24T14:15:22Z", + "data": None, + "path": [], + "usage": {}, + "links": [], + } + ], + "links": [], + } + requests_mock.get(f"{backend}/jobs/{job_id}/logs?level=error", json=response) + + mock_job_status("job-2018", succeeds=True) + mock_job_status("job-2019", succeeds=True) + mock_job_status("job-2020", succeeds=True) + mock_job_status("job-2021", succeeds=True) + mock_job_status("job-2022", succeeds=False) + + root_dir = tmp_path / "job_mgr_root" + manager = MultiBackendJobManager(root_dir=root_dir) + + manager.add_backend("foo", connection=openeo.connect("http://foo.test")) + manager.add_backend("bar", connection=openeo.connect("http://bar.test")) + + df = pd.DataFrame( + { + "year": [2018, 2019, 2020, 2021, 2022], + # Use simple points in WKT format to test conversion to the geometry dtype + "geometry": ["POINT (1 2)"] * 5, + } + ) + output_file = tmp_path / "jobs.csv" + + def start_job(row, connection, **kwargs): + year = row["year"] + return BatchJob(job_id=f"job-{year}", connection=connection) + + is_done_file = tmp_path / "is_done.txt" + + def start_worker_thread(): + manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + is_done_file.write_text("Done!") + + thread = threading.Thread(target=start_worker_thread, name="Worker process", daemon=True) + + timeout_sec = 5.0 + thread.start() + # We stop waiting for the process after the timeout. + # If that happens it is likely we detected that run_jobs will loop infinitely. + thread.join(timeout=timeout_sec) + + assert is_done_file.exists(), ( + "MultiBackendJobManager did not finish on its own and was killed. " + "Infinite loop is probable." + ) + + # Also check that we got sensible end results. + result = pd.read_csv(output_file) + assert len(result) == 5 + assert set(result.status) == {"finished", "error"} + assert set(result.backend_name) == {"foo", "bar"} + + # We expect that the job metadata was saved for a successful job, + # so verify that it exists. + # Checking it for one of the jobs is enough. + metadata_path = manager.get_job_metadata_path(job_id="job-2021") + assert metadata_path.exists() + + def test_on_error_log(self, tmp_path, requests_mock): backend = "http://foo.test" requests_mock.get(backend, json={"api_version": "1.1.0"}) From 0e37fb21cf57a2998410924e6032f4813c9ed818 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Mon, 31 Jul 2023 09:58:28 +0200 Subject: [PATCH 6/6] fixup! Issue #432 Fix failing tests on Windows --- tests/extra/test_job_management.py | 147 ----------------------------- 1 file changed, 147 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 00e4eb4e3..952b40ef6 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -119,13 +119,6 @@ def start_job(row, connection, **kwargs): metadata_path = manager.get_job_metadata_path(job_id="job-2022") assert metadata_path.exists() - @pytest.mark.skipif( - platform.system() == "Windows", - reason=( - "Windows support for multiprocessing is too different, the errors to" - "solve are too complicated: pickling certain local functions fails." - ), - ) def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): """Make sure the MultiBackendJobManager does not hang after all processes finish. @@ -223,146 +216,6 @@ def mock_job_status(job_id, succeeds: bool): ) output_file = tmp_path / "jobs.csv" - def start_job(row, connection, **kwargs): - year = row["year"] - return BatchJob(job_id=f"job-{year}", connection=connection) - - def start_worker_thread(): - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - - # This should be finished almost immediately within a second. - # If it takes too long then we assume it will run forever. - # We will check that the exit code of this process == 0, i.e. it finished normally. - # If it gets killed it will have a different exit code - # (On Linux usually -9 SIGKILL) - proc = multiprocessing.Process(target=start_worker_thread, name="Worker process") - - timeout_sec = 5.0 - proc.start() - # We stop waiting for the process after the timeout. - # If that happens it is likely we detected that run_jobs will loop infinitely. - proc.join(timeout=timeout_sec) - - if proc.is_alive: - # now forcibly kill the process, then have to join it again. - proc.terminate() - proc.join() - - assert proc.exitcode == 0, ( - "MultiBackendJobManager did not finish on its own and was killed. " - + "Infinite loop is probable. Expected exit code == 0, but found " - + f"proc.exitcode={proc.exitcode!r}, proc={proc!r}" - ) - - # Also check that we got sensible end results. - result = pd.read_csv(output_file) - assert len(result) == 5 - assert set(result.status) == {"finished", "error"} - assert set(result.backend_name) == {"foo", "bar"} - - # We expect that the job metadata was saved for a successful job, - # so verify that it exists. - # Checking it for one of the jobs is enough. - metadata_path = manager.get_job_metadata_path(job_id="job-2021") - assert metadata_path.exists() - - def test_manager_must_exit_when_all_jobs_done_windows(self, tmp_path, requests_mock, sleep_mock): - """Make sure the MultiBackendJobManager does not hang after all processes finish. - - Regression test for: - https://github.com/Open-EO/openeo-python-client/issues/432 - - Cause was that the run_jobs had an infinite loop when jobs ended with status error. - """ - - requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) - requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) - - def mock_job_status(job_id, succeeds: bool): - """Mock job status polling sequence. - We set up one job with finishes with status error - """ - response_list = sum( - [ - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "queued", - } - } - ], - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "running", - } - } - ], - [ - { - "json": { - "id": job_id, - "title": f"Job {job_id}", - "status": "finished" if succeeds else "error", - } - } - ], - ], - [], - ) - for backend in ["http://foo.test", "http://bar.test"]: - requests_mock.get(f"{backend}/jobs/{job_id}", response_list) - # It also needs job results endpoint for succesful jobs and the - # log endpoint for a failed job. Both are dummy implementations. - # When the job is finished the system tries to download the - # results or the logs and that is what needs these endpoints. - if succeeds: - requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) - else: - response = { - "level": "error", - "logs": [ - { - "id": "1", - "code": "SampleError", - "level": "error", - "message": "Error for testing", - "time": "2019-08-24T14:15:22Z", - "data": None, - "path": [], - "usage": {}, - "links": [], - } - ], - "links": [], - } - requests_mock.get(f"{backend}/jobs/{job_id}/logs?level=error", json=response) - - mock_job_status("job-2018", succeeds=True) - mock_job_status("job-2019", succeeds=True) - mock_job_status("job-2020", succeeds=True) - mock_job_status("job-2021", succeeds=True) - mock_job_status("job-2022", succeeds=False) - - root_dir = tmp_path / "job_mgr_root" - manager = MultiBackendJobManager(root_dir=root_dir) - - manager.add_backend("foo", connection=openeo.connect("http://foo.test")) - manager.add_backend("bar", connection=openeo.connect("http://bar.test")) - - df = pd.DataFrame( - { - "year": [2018, 2019, 2020, 2021, 2022], - # Use simple points in WKT format to test conversion to the geometry dtype - "geometry": ["POINT (1 2)"] * 5, - } - ) - output_file = tmp_path / "jobs.csv" - def start_job(row, connection, **kwargs): year = row["year"] return BatchJob(job_id=f"job-{year}", connection=connection)