Skip to content

Commit

Permalink
Merge pull request #147 from nansencenter/bioargo
Browse files Browse the repository at this point in the history
Add parameterized entry_id prefix to tabledap crawler
  • Loading branch information
aperrin66 authored Oct 8, 2024
2 parents 2083f64 + 35cbd87 commit 31575a5
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 58 deletions.
63 changes: 53 additions & 10 deletions geospaas_harvesting/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,19 @@ providers:
type: 'netcdf'
longitude_attribute: 'LONGITUDE'
latitude_attribute: 'LATITUDE'
argo:
argo_profile:
type: 'tabledap'
url: 'https://erddap.ifremer.fr/erddap/tabledap/ArgoFloats.json'
id_attr: 'platform_type'
entry_id_prefix: 'argo_profile_'
id_attrs: ['platform_number', 'cycle_number']
longitude_attr: 'longitude'
latitude_attr: 'latitude'
time_attr: 'time'
position_qc_attr: 'position_qc'
time_qc_attr: 'time_qc'
valid_qc_codes: ['1', '2', '8']
variables:
- 'platform_number'
- 'cycle_number'
- 'pres'
- 'pres_qc'
Expand All @@ -84,16 +86,57 @@ providers:
- 'psal_adjusted'
- 'psal_adjusted_qc'
- 'psal_adjusted_error'
argo_trajectory:
type: 'tabledap'
url: 'https://erddap.ifremer.fr/erddap/tabledap/ArgoFloats.json'
entry_id_prefix: 'argo_trajectory_'
id_attrs: ['platform_number']
longitude_attr: 'longitude'
latitude_attr: 'latitude'
time_attr: 'time'
position_qc_attr: 'position_qc'
time_qc_attr: 'time_qc'
valid_qc_codes: ['1', '2', '8']
variables:
- 'platform_number'
bioargo_profile:
type: 'tabledap'
url: 'https://erddap.ifremer.fr/erddap/tabledap/ArgoFloats-synthetic-BGC.json'
entry_id_prefix: 'bioargo_profile_'
id_attrs: ['platform_number', 'cycle_number']
longitude_attr: 'longitude'
latitude_attr: 'latitude'
time_attr: 'time'
position_qc_attr: 'position_qc'
time_qc_attr: 'time_qc'
valid_qc_codes: ['1', '2', '8']
variables:
- 'platform_number'
- 'cycle_number'
- 'pres'
- 'pres_qc'
- 'pres_adjusted'
- 'pres_adjusted_qc'
- 'pres_adjusted_error'
- 'doxy'
- 'doxy_qc'
- 'temp_doxy'
- 'temp_doxy_qc'
- 'molar_doxy'
- 'molar_doxy_qc'
- 'turbidity'
- 'turbidity_qc'
- 'doxy_adjusted'
- 'doxy_adjusted_qc'
- 'chla'
- 'chla_qc'
- 'nitrate'
- 'nitrate_qc'
- 'chla_adjusted'
- 'chla_adjusted_qc'
bioargo_trajectory:
type: 'tabledap'
url: 'https://erddap.ifremer.fr/erddap/tabledap/ArgoFloats-synthetic-BGC.json'
entry_id_prefix: 'bioargo_trajectory_'
id_attrs: ['platform_number']
longitude_attr: 'longitude'
latitude_attr: 'latitude'
time_attr: 'time'
position_qc_attr: 'position_qc'
time_qc_attr: 'time_qc'
valid_qc_codes: ['1', '2', '8']
variables:
- 'platform_number'
...
57 changes: 41 additions & 16 deletions geospaas_harvesting/crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,8 @@ class ERDDAPTableCrawler(Crawler):
logger = logging.getLogger(__name__ + '.ERDDAPTableCrawler')

def __init__(self, url,
id_attr,
id_attrs,
entry_id_prefix='',
longitude_attr='longitude', latitude_attr='latitude', time_attr='time',
position_qc_attr='', time_qc_attr='', valid_qc_codes=None,
search_terms=None, variables=None,
Expand All @@ -917,7 +918,8 @@ def __init__(self, url,
self.url = url
else:
raise ValueError("The URL should end with .json")
self.id_attr = id_attr
self.id_attrs = id_attrs
self.entry_id_prefix = entry_id_prefix
self.longitude_attr = longitude_attr
self.latitude_attr = latitude_attr
self.time_attr = time_attr
Expand All @@ -930,7 +932,7 @@ def __init__(self, url,
def __eq__(self, other):
return (
self.url == other.url and
self.id_attr == other.id_attr and
self.id_attrs == other.id_attrs and
self.longitude_attr == other.longitude_attr and
self.latitude_attr == other.latitude_attr and
self.time_attr == other.time_attr and
Expand All @@ -946,7 +948,7 @@ def set_initial_state(self):

def get_ids(self):
"""Fetch identifiers matching the search terms"""
url = f"{self.url}?{self.id_attr}&distinct()"
url = f"{self.url}?{','.join(self.id_attrs)}&distinct()"
kwargs = {}
url = '&'.join([url] + self.search_terms)
try:
Expand All @@ -956,18 +958,36 @@ def get_ids(self):
url, error.response.content, exc_info=True)
raise
for row in response.json()['table']['rows']:
yield row[0]
yield row[:len(self.id_attrs)]

def _make_condition_parameters(self, parameters):
"""Prepare the parameters to filter a query using the id
attributes. Necessary because the API requires different
formats depending on the type of parameter
"""
params = {}
for key, value in parameters.items():
if isinstance(value, str):
params[key] = f'"{value}"'
else:
params[key] = value
return params

def crawl(self):
attributes = [self.time_attr, self.longitude_attr, self.latitude_attr]
for qc_attr in (self.time_qc_attr, self.position_qc_attr):
if qc_attr:
attributes.append(qc_attr)
attributes.extend(self.variables)
for dataset_id in self.get_ids():
for id_values in self.get_ids():
id_attrs = dict(zip(self.id_attrs, id_values))
id_condition = '&'.join(
f"{id_attr}={id_value}"
for id_attr, id_value in self._make_condition_parameters(id_attrs).items()
)
yield DatasetInfo(
f'{self.url}?{",".join(attributes)}&{self.id_attr}="{dataset_id}"',
{'entry_id': dataset_id})
f'{self.url}?{",".join(attributes)}&{id_condition}',
{'id_attributes': id_attrs})

def _check_qc(self, qc_value):
"""Return True if the QC value indicates valid data or the
Expand All @@ -984,16 +1004,16 @@ def _make_coverage_url(self):
qc_attributes +
f'&distinct()&orderBy("{self.time_attr}")')

def get_coverage(self, dataset_id):
def get_coverage(self, id_attributes):
"""Get the temporal and spatial coverage for a specific dataset
"""
try:
response = self._http_get(self._make_coverage_url(), request_parameters={
'params': {self.id_attr: f'"{dataset_id}"'}
'params': self._make_condition_parameters(id_attributes)
})
except requests.HTTPError as error:
self.logger.error("Could not get coverage for dataset %s: %s",
dataset_id, error.response.content)
id_attributes, error.response.content)
raise
rows = response.json()['table']['rows']

Expand All @@ -1004,8 +1024,9 @@ def get_coverage(self, dataset_id):
for row in rows:
if time_coverage_start is None and self._check_qc(row[3]):
time_coverage_start = row[0]
if self._check_qc(row[4]):
trajectory.append((row[1], row[2]))
point = (row[1], row[2])
if point not in trajectory and self._check_qc(row[4]):
trajectory.append(point)

# get the last time with valid QC
time_coverage_end = None
Expand All @@ -1015,7 +1036,7 @@ def get_coverage(self, dataset_id):
break

if time_coverage_start is None or time_coverage_end is None or not trajectory:
raise RuntimeError(f"Could not determine coverage for dataset {dataset_id}")
raise RuntimeError(f"Could not determine coverage for dataset {id_attributes}")

return ((time_coverage_start, time_coverage_end), trajectory)

Expand All @@ -1041,9 +1062,13 @@ def get_normalized_attributes(self, dataset_info, **kwargs):
"""Use metanorm to normalize a DatasetInfo's raw attributes"""
raw_attributes = dataset_info.metadata
self.add_url(dataset_info.url, raw_attributes)
coverage = self.get_coverage(dataset_info.metadata['entry_id'])
coverage = self.get_coverage(dataset_info.metadata['id_attributes'])
raw_attributes['entry_id'] = (
self.entry_id_prefix +
'_'.join(map(str, dataset_info.metadata['id_attributes'].values()))
)
raw_attributes['temporal_coverage'] = coverage[0]
raw_attributes['trajectory'] = shapely.geometry.LineString(coverage[1]).wkt
raw_attributes['trajectory'] = shapely.geometry.MultiPoint(coverage[1]).wkt
raw_attributes['product_metadata'] = self.get_product_metadata()

normalized_attributes = self._metadata_handler.get_parameters(raw_attributes)
Expand Down
6 changes: 4 additions & 2 deletions geospaas_harvesting/providers/erddap.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class ERDDAPTableProvider(Provider):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url = kwargs['url'].rstrip('/')
self.id_attr = kwargs['id_attr']
self.entry_id_prefix = kwargs.get('entry_id_prefix', '')
self.id_attrs = kwargs['id_attrs']
self.longitude_attr = kwargs['longitude_attr']
self.latitude_attr = kwargs['latitude_attr']
self.time_attr = kwargs['time_attr']
Expand All @@ -27,7 +28,8 @@ def _make_crawler(self, parameters):
search_terms.extend(self._make_temporal_condition(time_range))
return ERDDAPTableCrawler(
self.url,
self.id_attr,
self.id_attrs,
entry_id_prefix=self.entry_id_prefix,
longitude_attr=self.longitude_attr,
latitude_attr=self.latitude_attr,
time_attr=self.time_attr,
Expand Down
4 changes: 2 additions & 2 deletions tests/providers/test_erddap.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class ERDDAPTableProviderTest(unittest.TestCase):
def setUp(self):
self.provider = ERDDAPTableProvider(
url='https://foo.json',
id_attr='id',
id_attrs=['id'],
longitude_attr='lon',
latitude_attr='lat',
time_attr='time',
Expand All @@ -35,7 +35,7 @@ def test_make_crawler(self):
'search_terms': ['platform_number="123456"']}),
ERDDAPTableCrawler(
'https://foo.json',
'id',
['id'],
longitude_attr='lon',
latitude_attr='lat',
time_attr='time',
Expand Down
Loading

0 comments on commit 31575a5

Please sign in to comment.