Skip to content

Commit

Permalink
Refactor whitespace in geocoding and USGSTransformer classes for impr…
Browse files Browse the repository at this point in the history
…oved readability
  • Loading branch information
emmanuelmathot committed Feb 5, 2025
1 parent 487edde commit 30aff67
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 32 deletions.
47 changes: 25 additions & 22 deletions pystac_monty/geocoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ def get_geometry_from_admin_units(self, admin_units: str) -> Optional[Dict[str,
@abstractmethod
def get_geometry_by_country_name(self, country_name: str) -> Optional[Dict[str, Any]]:
pass

@abstractmethod
def get_iso3_from_geometry(self, geometry: Dict[str, Any]) -> Optional[str]:
pass

@abstractmethod
def get_geometry_from_iso3(self, iso3: str) -> Optional[Dict[str, Any]]:
pass



WORLD_ADMIN_BOUNDARIES_FGB = "world_admin_boundaries.fgb"


class WorldAdministrativeBoundariesGeocoder(MontyGeoCoder):
def __init__(self, fgb_path: str, simplify_tolerance: float = 0.01) -> None:
self.fgb_path = fgb_path
Expand All @@ -35,7 +37,7 @@ def __init__(self, fgb_path: str, simplify_tolerance: float = 0.01) -> None:
self._simplify_tolerance = simplify_tolerance
self._cache: Dict[str, Union[Dict[str, Any], int, None]] = {}
self._initialize_path()

def _initialize_path(self) -> None:
if self._is_zip_file(self.fgb_path):
fgb_name = self._find_fgb_in_zip(self.fgb_path)
Expand All @@ -44,15 +46,15 @@ def _initialize_path(self) -> None:
self._path = f"zip://{self.fgb_path}!/{fgb_name}"
else:
self._path = self.fgb_path

def _is_zip_file(self, file_path: str) -> bool:
"""Check if a file is a ZIP file"""
try:
with zipfile.ZipFile(file_path, "r"):
return True
except zipfile.BadZipFile:
return False

def _find_fgb_in_zip(self, zip_path: str) -> Optional[str]:
"""Find the first .fgb file in a ZIP archive"""
with zipfile.ZipFile(zip_path, "r") as zf:
Expand All @@ -61,11 +63,11 @@ def _find_fgb_in_zip(self, zip_path: str) -> Optional[str]:
if name.lower().endswith(".fgb"):
return name
return None

def get_iso3_from_geometry(self, geometry: Dict[str, Any]) -> Optional[str]:
if not geometry or not self._path:
return None

try:
point = shape(geometry)
with fiona.open(self._path, layer=self._layer) as src:
Expand All @@ -75,19 +77,19 @@ def get_iso3_from_geometry(self, geometry: Dict[str, Any]) -> Optional[str]:
except Exception as e:
print(f"Error getting ISO3 from geometry: {str(e)}")
return None

return None

def get_geometry_from_admin_units(self, admin_units: str) -> Optional[Dict[str, Any]]:
raise NotImplementedError("Method not implemented")

def get_geometry_by_country_name(self, country_name: str) -> Optional[Dict[str, Any]]:
raise NotImplementedError("Method not implemented")

def get_geometry_from_iso3(self, iso3: str) -> Optional[Dict[str, Any]]:
if not iso3 or not self._path:
return None

try:
with fiona.open(self._path, layer=self._layer) as src:
for feature in src:
Expand All @@ -97,12 +99,13 @@ def get_geometry_from_iso3(self, iso3: str) -> Optional[Dict[str, Any]]:
except Exception as e:
print(f"Error getting geometry from ISO3: {str(e)}")
return None

return None


GAUL2014_2015_GPCK_ZIP = "gaul2014_2015.gpkg"


class GAULGeocoder(MontyGeoCoder):
"""
Implementation of MontyGeoCoder using GAUL geopackage for geocoding.
Expand Down Expand Up @@ -318,10 +321,10 @@ def get_geometry_by_country_name(self, country_name: str) -> Optional[Dict[str,
except Exception as e:
print(f"Error getting country geometry for {country_name}: {str(e)}")
return None

def get_iso3_from_geometry(self, geometry: Dict[str, Any]) -> Optional[str]:
raise NotImplementedError("Method not implemented")

def get_geometry_from_iso3(self, iso3: str) -> Optional[Dict[str, Any]]:
raise NotImplementedError("Method not implemented")

Expand Down Expand Up @@ -413,7 +416,7 @@ def get_geometry_by_country_name(self, country_name: str) -> Optional[Dict[str,
except Exception as e:
print(f"Error getting mock country geometry: {str(e)}")
return None

def get_iso3_from_geometry(self, geometry: Dict[str, Any]) -> Optional[str]:
"""
Get ISO3 code for a geometry.
Expand All @@ -431,23 +434,23 @@ def get_iso3_from_geometry(self, geometry: Dict[str, Any]) -> Optional[str]:
try:
# Convert input geometry to shapely
input_shape = shape(geometry)

# Test intersection with all test geometries
for iso3, test_geom in self._test_geometries.items():
# Skip non-country geometries (like 'admin1')
if len(iso3) != 3:
continue

test_shape = shape(test_geom["geometry"])
if input_shape.intersects(test_shape):
return iso3

return None

except Exception as e:
print(f"Error getting mock ISO3 from geometry: {str(e)}")
return None

def get_geometry_from_iso3(self, iso3: str) -> Optional[Dict[str, Any]]:
"""
Get geometry for an ISO3 code.
Expand All @@ -466,5 +469,5 @@ def get_geometry_from_iso3(self, iso3: str) -> Optional[Dict[str, Any]]:
return self._test_geometries.get(iso3)
except Exception as e:
print(f"Error getting mock geometry from ISO3: {str(e)}")

return None
29 changes: 19 additions & 10 deletions pystac_monty/sources/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,22 @@ def make_hazard_event_item(self) -> Item:
]
hazard_item.bbox = extent
# polygon from extent
hazard_item.geometry = mapping(shape({"type": "Polygon", "coordinates": [[
[extent[0], extent[1]],
[extent[2], extent[1]],
[extent[2], extent[3]],
[extent[0], extent[3]],
[extent[0], extent[1]],
]]}))
hazard_item.geometry = mapping(
shape(
{
"type": "Polygon",
"coordinates": [
[
[extent[0], extent[1]],
[extent[2], extent[1]],
[extent[2], extent[3]],
[extent[0], extent[3]],
[extent[0], extent[1]],
]
],
}
)
)

# Set collection and roles
hazard_item.set_collection(self.get_hazard_collection())
Expand Down Expand Up @@ -480,7 +489,7 @@ def make_impact_items(self, hazard_item: Item) -> List[Item]:
country["fatalities"],
"people",
country["country_code"],
hazard_item
hazard_item,
)
impact_items.append(fatalities_item)

Expand All @@ -496,7 +505,7 @@ def make_impact_items(self, hazard_item: Item) -> List[Item]:
country["us_dollars"],
"usd",
country["country_code"],
hazard_item
hazard_item,
)
impact_items.append(economic_item)

Expand All @@ -510,7 +519,7 @@ def _create_impact_item_from_losses(
value: float,
unit: str,
iso2: str,
hazard_item: Item
hazard_item: Item,
) -> Item:
"""Helper method to create impact items from PAGER losses data."""
event_item = self.make_source_event_item()
Expand Down
1 change: 1 addition & 0 deletions tests/extensions/test_usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

geocoder = WorldAdministrativeBoundariesGeocoder(get_data_file("world-administrative-boundaries.fgb"), 0.1)


def load_scenarios(scenarios: list[tuple[str, str, str]]) -> list[USGSTransformer]:
"""Load test scenarios for USGS transformation testing.
Expand Down

0 comments on commit 30aff67

Please sign in to comment.