Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

search: add new plugin CopDataSpaceSearch #1231

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 123 additions & 1 deletion eodag/plugins/search/qssearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
# limitations under the License.
from __future__ import annotations

import concurrent
import logging
import re
from collections.abc import Iterable
from copy import copy as copy_copy
from queue import Empty, Queue
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -55,7 +57,7 @@
from requests.adapters import HTTPAdapter
from requests.auth import AuthBase

from eodag.api.product import EOProduct
from eodag.api.product import AssetsDict, EOProduct
from eodag.api.product.metadata_mapping import (
NOT_AVAILABLE,
format_query_params,
Expand All @@ -81,6 +83,7 @@
format_dict_items,
get_args,
get_ssl_context,
guess_file_type,
quote,
string_to_jsonpath,
update_nested_dict,
Expand All @@ -99,8 +102,14 @@
)

if TYPE_CHECKING:
from concurrent.futures import Future

from eodag.config import PluginConfig


DATA_EXTENSIONS = ["jp2", "tiff", "nc", "grib"]


logger = logging.getLogger("eodag.search.qssearch")


Expand Down Expand Up @@ -1243,9 +1252,122 @@ def normalize_results(
# once metadata pre-mapping applied execute QueryStringSearch.normalize_results
products = super(ODataV4Search, self).normalize_results(results, **kwargs)

_ODataV4NodeCrawler(self).discover_assets(products)

return products


class _ODataV4NodeCrawler:
"""A class that crawls through the node tree structure of OData v.4 products.

Add the all files (node leaves) as assets of the product."""

def __init__(self, search_plugin: ODataV4Search, max_connections: int = 100):
"""Init the crawler.

:param search_plugin: The OData search plugin
:type search_plugin: :class:`~eodag.plugins.search.ODataV4Search`
:param max_connections: (optional) Maximum number of connections for HTTP requests
:type max_connections: int
"""
self.search_plugin = search_plugin
self.max_connections = max_connections
self.nodes_queue: Queue = Queue()

def _fetch_node(self, product: EOProduct, url: str) -> Tuple[EOProduct, str, Dict]:
"""Send the HTTP request of a single node.

:param product: The EO product associated with the given URL
:type product: :class:`~eodag.api.product._product.EOProduct`
:param url: URL of the node to fetch
:type url: string
:returns: The EO Product, the node URL, the JSON response
:rtype: Tuple[:class:`~eodag.api.product._product.EOProduct`, str, Dict]
"""
response = QueryStringSearch._request(
self.search_plugin,
PreparedSearch(
url=url,
exception_message=f"Skipping error while fetching node {url}",
),
)
return (product, url, response.json())

def _scrape_nodes(self, future: Future):
"""Scrape child nodes.

Given the a node, scrape all the child nodes and add them a new jobs to the executor.

:param future: The result of `_fetch_node()`
:type product: :class:`~concurrent.futures.Future`
"""
product, url, json_response = future.result()
# loop over the list of children
for node in json_response["result"]:
if node["ChildrenNumber"] > 0:
# explore sub-nodes
self.nodes_queue.put(node["Nodes"]["uri"])
elif node["ContentLength"] > 0:
# this is a file
# replace suffix "/Nodes" with "/$value" in the download link
file_url = node["Nodes"]["uri"]
file_url = file_url[: -len("/Nodes")] + "/$value"
# build the asset's key as the sequence of all the node's names (skip the first node)
url_parse = urlparse(file_url)
node_path = [
p for p in url_parse.path.split("/") if p.startswith("Nodes(")
]
if node_path:
# skip the first node
node_path = node_path[1:]
if node_path:
# extract the names by removing "Nodes(...)"
node_path = [p[6:-1] for p in node_path]
asset_key = "/".join(node_path)
role = (
"data"
if node["Id"].split(".")[-1] in DATA_EXTENSIONS
else "metadata"
)
product.assets[asset_key] = {
"title": node["Id"],
"roles": [role],
"href": file_url,
}
if mime_type := guess_file_type(node["Id"]):
product.assets[asset_key]["type"] = mime_type

def discover_assets(self, products: list[EOProduct]) -> None:
"""Add discovered assets to the products in the given list.

:param products: List of EO products whose assets are discovered
:type products: list[:class:`~eodag.api.product._product.EOProduct`]
"""

if not getattr(self.search_plugin.config, "discover_assets", False):
return

discover_endpoint = self.search_plugin.config.api_endpoint.rstrip("/")
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_connections
) as executor:
self.nodes_queue = Queue()
# init the queue with the root node of each product
for product in products:
product.assets = AssetsDict(product)
id = product.properties.get("uid")
base_url = f"{discover_endpoint}({id})"
self.nodes_queue.put(f"{base_url}/Nodes")
# start consuming the queue and submitting jobs
while True:
try:
url = self.nodes_queue.get(timeout=HTTP_REQ_TIMEOUT)
job = executor.submit(self._fetch_node, product, url)
job.add_done_callback(self._scrape_nodes)
except Empty:
return


class PostJsonSearch(QueryStringSearch):
"""A specialisation of a QueryStringSearch that uses POST method"""

Expand Down
3 changes: 2 additions & 1 deletion eodag/resources/providers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3898,6 +3898,7 @@
- "ContentDate/Start lt {completionTimeFromAscendingNode#to_iso_utc_datetime}"
- "ContentDate/End gt {startTimeFromAscendingNode#to_iso_utc_datetime}"
- contains(Name,'{id}')
discover_assets: true
discover_metadata:
auto_discovery: true
metadata_pattern: '^(?!collection)[a-zA-Z0-9]+$'
Expand Down Expand Up @@ -4003,7 +4004,7 @@
realm: 'CDSE'
client_id: 'cdse-public'
client_secret: null
token_provision: qs
token_provision: header
token_qs_key: 'token'
auth_error_code: 401
ssl_verify: true
Expand Down
94 changes: 93 additions & 1 deletion tests/units/test_download_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import yaml

from eodag.api.product.metadata_mapping import DEFAULT_METADATA_MAPPING
from eodag.utils import ProgressCallback
from eodag.utils import MockResponse, ProgressCallback
from eodag.utils.exceptions import DownloadError
from tests import TEST_RESOURCES_PATH
from tests.context import (
Expand Down Expand Up @@ -1999,3 +1999,95 @@ def test_plugins_download_creodias_s3(
self.assertEqual(mock_finalize_s2_safe_product.call_count, 0)
self.assertEqual(mock_check_manifest_file_list.call_count, 0)
self.assertEqual(mock_flatten_top_directories.call_count, 1)


class TestDownloadPluginODataV4(BaseDownloadPluginTest):
def setUp(self):
super(TestDownloadPluginODataV4, self).setUp()

@mock.patch("eodag.plugins.download.http.ODataV4Download._request", autospec=True)
def test_plugins_download_odatav4_discover_assets(self, mock__request):
"""ODataV4Download.discover_assets must add additional nodes as assets if configured"""
product = EOProduct(
"cop_dataspace",
dict(
geometry="POINT (0 0)",
title="dummy_product",
id="dummy",
),
)
plugin = self.get_download_plugin(product)
api_endpoint = plugin.config.discover_assets_endpoint.rstrip("/")
# response to /Products(1234-5678)/Nodes(dummy)/Nodes
response_1 = {
"result": [
{
"Id": "bar",
"Name": "bar",
"ContentLength": 0,
"ChildrenNumber": 2,
"Nodes": {
"uri": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes"
},
},
{
"Id": "metadata.xml",
"Name": "metadata.xml",
"ContentLength": 100,
"ChildrenNumber": 0,
"Nodes": {
"uri": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(metadata.xml)/Nodes"
},
},
]
}
# response to /Products(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes
response_2 = {
"result": [
{
"Id": "img_1.jp2",
"Name": "img_1.jp2",
"ContentLength": 100,
"ChildrenNumber": 0,
"Nodes": {
"uri": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes(img_1.jp2)/Nodes"
},
},
{
"Id": "img_2.jp2",
"Name": "img_2.jp2",
"ContentLength": 100,
"ChildrenNumber": 0,
"Nodes": {
"uri": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes(img_2.jp2)/Nodes"
},
},
]
}
mock__request.side_effect = [
MockResponse(response_1, 200),
MockResponse(response_2, 200),
]

expected_assets = {
"metadata.xml": {
"title": "metadata.xml",
"roles": ["metadata"],
"href": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(metadata.xml)/$value",
"type": "text/xml",
},
"bar/img_1.jp2": {
"title": "img_1.jp2",
"roles": ["data"],
"href": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes(img_1.jp2)/$value",
"type": "image/jpeg2000",
},
"bar/img_2.jp2": {
"title": "img_2.jp2",
"roles": ["data"],
"href": f"{api_endpoint}(1234-5678)/Nodes(dummy)/Nodes(bar)/Nodes(img_2.jp2)/$value",
"type": "image/jpeg2000",
},
}
product = plugin.discover_assets(product)
self.assertDictEqual(product.assets.as_dict(), expected_assets)
Loading