diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index b4b8aa3..0c895fb 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -84,6 +84,7 @@ streaming_flask_response, ) from openeo_aggregator.constants import ( + CROSSBACKEND_GRAPH_SPLIT_METHOD, JOB_OPTION_FORCE_BACKEND, JOB_OPTION_SPLIT_STRATEGY, JOB_OPTION_TILE_GRID, @@ -102,6 +103,7 @@ from openeo_aggregator.partitionedjobs import PartitionedJob from openeo_aggregator.partitionedjobs.crossbackend import ( CrossBackendJobSplitter, + DeepGraphSplitter, LoadCollectionGraphSplitter, ) from openeo_aggregator.partitionedjobs.splitting import FlimsySplitter, TileGridSplitter @@ -806,25 +808,33 @@ def create_job( if "process_graph" not in process: raise ProcessGraphMissingException() + # Coverage of messy "split_strategy" job option https://github.com/Open-EO/openeo-aggregator/issues/156 # TODO: better, more generic/specific job_option(s)? - if job_options and (job_options.get(JOB_OPTION_SPLIT_STRATEGY) or job_options.get(JOB_OPTION_TILE_GRID)): - if job_options.get(JOB_OPTION_SPLIT_STRATEGY) == "crossbackend": - # TODO this is temporary feature flag to trigger "crossbackend" splitting - return self._create_crossbackend_job( - user_id=user_id, - process=process, - api_version=api_version, - metadata=metadata, - job_options=job_options, - ) - else: - return self._create_partitioned_job( - user_id=user_id, - process=process, - api_version=api_version, - metadata=metadata, - job_options=job_options, - ) + split_strategy = (job_options or {}).get(JOB_OPTION_SPLIT_STRATEGY) + # TODO: this job option "tile_grid" is quite generic and not very explicit about being a job splitting approach + tile_grid = (job_options or {}).get(JOB_OPTION_TILE_GRID) + + crossbackend_mode = ( + split_strategy == "crossbackend" or isinstance(split_strategy, dict) and "crossbackend" in split_strategy + ) + spatial_split_mode = tile_grid or split_strategy == "flimsy" + + if crossbackend_mode: + return self._create_crossbackend_job( + user_id=user_id, + process=process, + api_version=api_version, + metadata=metadata, + job_options=job_options, + ) + elif spatial_split_mode: + return self._create_partitioned_job( + user_id=user_id, + process=process, + api_version=api_version, + metadata=metadata, + job_options=job_options, + ) else: return self._create_job_standard( user_id=user_id, @@ -939,16 +949,42 @@ def _create_crossbackend_job( if not self.partitioned_job_tracker: raise FeatureUnsupportedException(message="Partitioned job tracking is not supported") - def backend_for_collection(collection_id) -> str: - return self._catalog.get_backends_for_collection(cid=collection_id)[0] + split_strategy = (job_options or {}).get(JOB_OPTION_SPLIT_STRATEGY) + if split_strategy == "crossbackend": + graph_split_method = CROSSBACKEND_GRAPH_SPLIT_METHOD.SIMPLE + elif isinstance(split_strategy, dict) and isinstance(split_strategy.get("crossbackend"), dict): + graph_split_method = split_strategy.get("crossbackend", {}).get( + "method", CROSSBACKEND_GRAPH_SPLIT_METHOD.SIMPLE + ) + else: + raise ValueError(f"Invalid split strategy {split_strategy!r}") + + _log.info(f"_create_crossbackend_job: {graph_split_method=} from {split_strategy=}") + if graph_split_method == CROSSBACKEND_GRAPH_SPLIT_METHOD.SIMPLE: + + def backend_for_collection(collection_id) -> str: + return self._catalog.get_backends_for_collection(cid=collection_id)[0] - splitter = CrossBackendJobSplitter( - graph_splitter=LoadCollectionGraphSplitter( + graph_splitter = LoadCollectionGraphSplitter( backend_for_collection=backend_for_collection, # TODO: job option for `always_split` feature? always_split=True, ) - ) + elif graph_split_method == CROSSBACKEND_GRAPH_SPLIT_METHOD.DEEP: + + def supporting_backends(node_id: str, node: dict) -> Union[List[str], None]: + if node["process_id"] == "load_collection": + collection_id = node["arguments"]["id"] + return self._catalog.get_backends_for_collection(cid=collection_id) + + graph_splitter = DeepGraphSplitter( + supporting_backends=supporting_backends, + primary_backend=split_strategy.get("crossbackend", {}).get("primary_backend"), + ) + else: + raise ValueError(f"Invalid graph split strategy {graph_split_method!r}") + + splitter = CrossBackendJobSplitter(graph_splitter=graph_splitter) pjob_id = self.partitioned_job_tracker.create_crossbackend_pjob( user_id=user_id, process=process, metadata=metadata, job_options=job_options, splitter=splitter diff --git a/src/openeo_aggregator/constants.py b/src/openeo_aggregator/constants.py index 68621bd..02a7a4a 100644 --- a/src/openeo_aggregator/constants.py +++ b/src/openeo_aggregator/constants.py @@ -4,3 +4,9 @@ # Experimental feature to force a certain upstream back-end through job options JOB_OPTION_FORCE_BACKEND = "_agg_force_backend" + + +class CROSSBACKEND_GRAPH_SPLIT_METHOD: + # Poor-man's StrEnum + SIMPLE = "simple" + DEEP = "deep" diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py index f55b206..7fc7d76 100644 --- a/src/openeo_aggregator/partitionedjobs/crossbackend.py +++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py @@ -743,7 +743,7 @@ def next_nodes(node_id: NodeId) -> Iterable[NodeId]: return up, down - def produce_split_locations(self, limit: int = 4) -> Iterator[List[NodeId]]: + def produce_split_locations(self, limit: int = 10) -> Iterator[List[NodeId]]: """ Produce disjoint subgraphs that can be processed independently. @@ -811,49 +811,55 @@ class DeepGraphSplitter(ProcessGraphSplitterInterface): More advanced graph splitting (compared to just splitting off `load_collection` nodes) """ - def __init__(self, supporting_backends: SupportingBackendsMapper): + def __init__(self, supporting_backends: SupportingBackendsMapper, primary_backend: Optional[BackendId] = None): self._supporting_backends_mapper = supporting_backends + self._primary_backend = primary_backend def split(self, process_graph: FlatPG) -> _PGSplitResult: graph = _GraphViewer.from_flat_graph( flat_graph=process_graph, supporting_backends=self._supporting_backends_mapper ) - # TODO: make picking "optimal" split location set a bit more deterministic (e.g. sort first) - (split_nodes,) = graph.produce_split_locations(limit=1) - _log.debug(f"DeepGraphSplitter.split: split nodes: {split_nodes=}") - - secondary_graphs: List[_SubGraphData] = [] - graph_to_split = graph - for split_node_id in split_nodes: - up, down = graph_to_split.split_at(split_node_id=split_node_id) - # Use upstream graph as secondary graph - node_ids = set(nid for nid, _ in up.iter_nodes()) - backend_candidates = up.get_backend_candidates_for_node_set(node_ids) - # TODO: better backend selection? - # TODO handle case where backend_candidates is None? - backend_id = sorted(backend_candidates)[0] - _log.debug(f"DeepGraphSplitter.split: secondary graph: from {split_node_id=}: {backend_id=} {node_ids=}") - secondary_graphs.append( - _SubGraphData( - split_node=split_node_id, - node_ids=node_ids, - backend_id=backend_id, + for split_nodes in graph.produce_split_locations(): + _log.debug(f"DeepGraphSplitter.split: evaluating split nodes: {split_nodes=}") + + secondary_graphs: List[_SubGraphData] = [] + graph_to_split = graph + for split_node_id in split_nodes: + up, down = graph_to_split.split_at(split_node_id=split_node_id) + # Use upstream graph as secondary graph + node_ids = set(nid for nid, _ in up.iter_nodes()) + backend_candidates = up.get_backend_candidates_for_node_set(node_ids) + # TODO: better backend selection? + # TODO handle case where backend_candidates is None? + backend_id = sorted(backend_candidates)[0] + _log.debug( + f"DeepGraphSplitter.split: secondary graph: from {split_node_id=}: {backend_id=} {node_ids=}" + ) + secondary_graphs.append( + _SubGraphData( + split_node=split_node_id, + node_ids=node_ids, + backend_id=backend_id, + ) ) - ) - - # Prepare for next split (if any) - graph_to_split = down - # Remaining graph is primary graph - primary_graph = graph_to_split - primary_node_ids = set(n for n, _ in primary_graph.iter_nodes()) - backend_candidates = primary_graph.get_backend_candidates_for_node_set(primary_node_ids) - primary_backend_id = sorted(backend_candidates)[0] - _log.debug(f"DeepGraphSplitter.split: primary graph: {primary_backend_id=} {primary_node_ids=}") + # Prepare for next split (if any) + graph_to_split = down + + # Remaining graph is primary graph + primary_graph = graph_to_split + primary_node_ids = set(n for n, _ in primary_graph.iter_nodes()) + backend_candidates = primary_graph.get_backend_candidates_for_node_set(primary_node_ids) + primary_backend_id = sorted(backend_candidates)[0] + _log.debug(f"DeepGraphSplitter.split: primary graph: {primary_backend_id=} {primary_node_ids=}") + + if self._primary_backend is None or primary_backend_id == self._primary_backend: + _log.debug(f"DeepGraphSplitter.split: current split matches constraints") + return _PGSplitResult( + primary_node_ids=primary_node_ids, + primary_backend_id=primary_backend_id, + secondary_graphs=secondary_graphs, + ) - return _PGSplitResult( - primary_node_ids=primary_node_ids, - primary_backend_id=primary_backend_id, - secondary_graphs=secondary_graphs, - ) + raise GraphSplitException("DeepGraphSplitter.split: No matching split found.") diff --git a/src/openeo_aggregator/testing.py b/src/openeo_aggregator/testing.py index 6cf60d8..8ec90de 100644 --- a/src/openeo_aggregator/testing.py +++ b/src/openeo_aggregator/testing.py @@ -103,6 +103,8 @@ def approx_now(abs=10): class ApproxStr: """Pytest helper in style of `pytest.approx`, but for string checking, based on prefix, body and or suffix""" + # TODO: port to dirty_equals + def __init__( self, prefix: Optional[str] = None, diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py index c22cc40..826eb5e 100644 --- a/tests/partitionedjobs/test_api.py +++ b/tests/partitionedjobs/test_api.py @@ -52,7 +52,7 @@ def __init__(self, date: str): def dummy1(backend1, requests_mock) -> DummyBackend: # TODO: rename this fixture to dummy_backed1 for clarity dummy = DummyBackend(requests_mock=requests_mock, backend_url=backend1, job_id_template="1-jb-{i}") - dummy.setup_basic_requests_mocks() + dummy.setup_basic_requests_mocks(collections=["S1", "S2"]) dummy.register_user(bearer_token=TEST_USER_BEARER_TOKEN, user_id=TEST_USER) return dummy @@ -61,7 +61,7 @@ def dummy1(backend1, requests_mock) -> DummyBackend: def dummy2(backend2, requests_mock) -> DummyBackend: # TODO: rename this fixture to dummy_backed2 for clarity dummy = DummyBackend(requests_mock=requests_mock, backend_url=backend2, job_id_template="2-jb-{i}") - dummy.setup_basic_requests_mocks(collections=["S22"]) + dummy.setup_basic_requests_mocks(collections=["T11", "T22"]) dummy.register_user(bearer_token=TEST_USER_BEARER_TOKEN, user_id=TEST_USER) return dummy @@ -685,7 +685,15 @@ def _partitioned_job_tracking(self, zk_client): yield @now.mock - def test_create_job_simple(self, flask_app, api100, zk_db, dummy1): + @pytest.mark.parametrize( + "split_strategy", + [ + "crossbackend", + {"crossbackend": {"method": "simple"}}, + {"crossbackend": {"method": "deep"}}, + ], + ) + def test_create_job_simple(self, flask_app, api100, zk_db, dummy1, split_strategy): """Handling of single "load_collection" process graph""" api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) @@ -695,7 +703,7 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1): "/jobs", json={ "process": {"process_graph": pg}, - "job_options": {"split_strategy": "crossbackend"}, + "job_options": {"split_strategy": split_strategy}, }, ).assert_status_code(201) @@ -719,7 +727,7 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1): "created": self.now.epoch, "process": {"process_graph": pg}, "metadata": {"log_level": "info"}, - "job_options": {"split_strategy": "crossbackend"}, + "job_options": {"split_strategy": split_strategy}, "result_jobs": ["main"], } @@ -753,12 +761,20 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1): assert pg == {"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection", "result": True}} @now.mock - def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock): + @pytest.mark.parametrize( + "split_strategy", + [ + "crossbackend", + {"crossbackend": {"method": "simple"}}, + {"crossbackend": {"method": "deep", "primary_backend": "b1"}}, + ], + ) + def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, dummy2, requests_mock, split_strategy): api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) pg = { "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, - "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}}, "merge": { "process_id": "merge_cubes", "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}}, @@ -767,13 +783,16 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock) } requests_mock.get( - "https://b1.test/v1/jobs/1-jb-0/results?partial=true", - json={"links": [{"rel": "canonical", "href": "https://data.b1.test/123abc"}]}, + "https://b2.test/v1/jobs/2-jb-0/results?partial=true", + json={"links": [{"rel": "canonical", "href": "https://data.b2.test/123abc"}]}, ) res = api100.post( "/jobs", - json={"process": {"process_graph": pg}, "job_options": {"split_strategy": "crossbackend"}}, + json={ + "process": {"process_graph": pg}, + "job_options": {"split_strategy": split_strategy}, + }, ).assert_status_code(201) pjob_id = "pj-20220119-123456" @@ -796,7 +815,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock) "created": self.now.epoch, "process": {"process_graph": pg}, "metadata": {"log_level": "info"}, - "job_options": {"split_strategy": "crossbackend"}, + "job_options": {"split_strategy": split_strategy}, "result_jobs": ["main"], } @@ -810,17 +829,17 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock) # Inspect stored subjob metadata subjobs = zk_db.list_subjobs(user_id=TEST_USER, pjob_id=pjob_id) assert subjobs == { - "b1:lc2": { - "backend_id": "b1", + "b2:lc2": { + "backend_id": "b2", "process_graph": { - "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}}, "_agg_crossbackend_save_result": { "process_id": "save_result", "arguments": {"data": {"from_node": "lc2"}, "format": "GTiff"}, "result": True, }, }, - "title": "Partitioned job pjob_id='pj-20220119-123456' sjob_id='b1:lc2'", + "title": "Partitioned job pjob_id='pj-20220119-123456' sjob_id='b2:lc2'", }, "main": { "backend_id": "b1", @@ -828,7 +847,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock) "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, "lc2": { "process_id": "load_stac", - "arguments": {"url": "https://data.b1.test/123abc"}, + "arguments": {"url": "https://data.b2.test/123abc"}, }, "merge": { "process_id": "merge_cubes", @@ -841,7 +860,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock) } sjob_id = "main" - expected_job_id = "1-jb-1" + expected_job_id = "1-jb-0" assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == { "status": "created", "timestamp": self.now.epoch, @@ -853,7 +872,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock) "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, "lc2": { "process_id": "load_stac", - "arguments": {"url": "https://data.b1.test/123abc"}, + "arguments": {"url": "https://data.b2.test/123abc"}, }, "merge": { "process_id": "merge_cubes", @@ -862,17 +881,17 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock) }, } - sjob_id = "b1:lc2" - expected_job_id = "1-jb-0" + sjob_id = "b2:lc2" + expected_job_id = "2-jb-0" assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == { "status": "created", "timestamp": self.now.epoch, "message": None, } assert zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == expected_job_id - assert dummy1.get_job_status(TEST_USER, expected_job_id) == "created" - assert dummy1.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == { - "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + assert dummy2.get_job_status(TEST_USER, expected_job_id) == "created" + assert dummy2.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == { + "lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}}, "_agg_crossbackend_save_result": { "process_id": "save_result", "arguments": {"data": {"from_node": "lc2"}, "format": "GTiff"}, @@ -881,13 +900,21 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock) } @now.mock - def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, requests_mock): + @pytest.mark.parametrize( + "split_strategy", + [ + "crossbackend", + {"crossbackend": {"method": "simple"}}, + {"crossbackend": {"method": "deep", "primary_backend": "b1"}}, + ], + ) + def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, dummy2, requests_mock, split_strategy): """Run the jobs and get results""" api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) pg = { "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, - "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}}, "merge": { "process_id": "merge_cubes", "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}}, @@ -896,15 +923,15 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, requests_ } requests_mock.get( - "https://b1.test/v1/jobs/1-jb-0/results?partial=true", - json={"links": [{"rel": "canonical", "href": "https://data.b1.test/123abc"}]}, + "https://b2.test/v1/jobs/2-jb-0/results?partial=true", + json={"links": [{"rel": "canonical", "href": "https://data.b2.test/123abc"}]}, ) res = api100.post( "/jobs", json={ "process": {"process_graph": pg}, - "job_options": {"split_strategy": "crossbackend"}, + "job_options": {"split_strategy": split_strategy}, }, ).assert_status_code(201) @@ -923,21 +950,21 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, requests_ # start job api100.post(f"/jobs/{expected_job_id}/results").assert_status_code(202) - dummy1.set_job_status(TEST_USER, "1-jb-0", status="running") - dummy1.set_job_status(TEST_USER, "1-jb-1", status="queued") + dummy2.set_job_status(TEST_USER, "2-jb-0", status="running") + dummy1.set_job_status(TEST_USER, "1-jb-0", status="queued") res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) assert res.json == DictSubSet({"id": expected_job_id, "status": "running", "progress": 0}) # First job is ready - dummy1.set_job_status(TEST_USER, "1-jb-0", status="finished") - dummy1.setup_assets(job_id=f"1-jb-0", assets=["1-jb-0-result.tif"]) - dummy1.set_job_status(TEST_USER, "1-jb-1", status="running") + dummy2.set_job_status(TEST_USER, "2-jb-0", status="finished") + dummy2.setup_assets(job_id=f"2-jb-0", assets=["2-jb-0-result.tif"]) + dummy1.set_job_status(TEST_USER, "1-jb-0", status="running") res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) assert res.json == DictSubSet({"id": expected_job_id, "status": "running", "progress": 50}) # Main job is ready too - dummy1.set_job_status(TEST_USER, "1-jb-1", status="finished") - dummy1.setup_assets(job_id=f"1-jb-1", assets=["1-jb-1-result.tif"]) + dummy1.set_job_status(TEST_USER, "1-jb-0", status="finished") + dummy1.setup_assets(job_id=f"1-jb-0", assets=["1-jb-0-result.tif"]) res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) assert res.json == DictSubSet({"id": expected_job_id, "status": "finished", "progress": 100}) @@ -947,10 +974,10 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, requests_ { "id": expected_job_id, "assets": { - "main-1-jb-1-result.tif": { - "href": "https://b1.test/v1/jobs/1-jb-1/results/1-jb-1-result.tif", + "main-1-jb-0-result.tif": { + "href": "https://b1.test/v1/jobs/1-jb-0/results/1-jb-0-result.tif", "roles": ["data"], - "title": "main-1-jb-1-result.tif", + "title": "main-1-jb-0-result.tif", "type": "application/octet-stream", }, }, @@ -958,14 +985,22 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, requests_ ) @now.mock - def test_failing_create(self, flask_app, api100, zk_db, dummy1): + @pytest.mark.parametrize( + "split_strategy", + [ + "crossbackend", + {"crossbackend": {"method": "simple"}}, + {"crossbackend": {"method": "deep", "primary_backend": "b1"}}, + ], + ) + def test_failing_create(self, flask_app, api100, zk_db, dummy1, dummy2, split_strategy): """Run what happens when creation of sub batch job fails on upstream backend""" api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) - dummy1.fail_create_job = True + dummy2.fail_create_job = True pg = { "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, - "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}}, "merge": { "process_id": "merge_cubes", "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}}, @@ -977,7 +1012,7 @@ def test_failing_create(self, flask_app, api100, zk_db, dummy1): "/jobs", json={ "process": {"process_graph": pg}, - "job_options": {"split_strategy": "crossbackend"}, + "job_options": {"split_strategy": split_strategy}, }, ).assert_status_code(201)