Skip to content

Commit

Permalink
Issue Open-EO#571 improve reconstruction of geometries in CsvJobDatab…
Browse files Browse the repository at this point in the history
…ase/ParquetJobDatabase
  • Loading branch information
soxofaan committed Jul 23, 2024
1 parent b272791 commit 4b8593c
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 22 deletions.
19 changes: 16 additions & 3 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
from typing import Callable, Dict, NamedTuple, Optional, Union

import geopandas
import pandas as pd
import requests
import shapely.errors
Expand Down Expand Up @@ -556,13 +557,13 @@ def _is_valid_wkt(self, wkt: str) -> bool:

def read(self) -> pd.DataFrame:
df = pd.read_csv(self.path)
# `df.to_csv` in `persist()` will encode geometries as WKT, so we decode that here.
if (
"geometry" in df.columns
and df["geometry"].dtype.name != "geometry"
and self._is_valid_wkt(df["geometry"].iloc[0])
):
df["geometry"] = df["geometry"].apply(shapely.wkt.loads)
# `df.to_csv()` in `persist()` has encoded geometries as WKT, so we decode that here.
df = geopandas.GeoDataFrame(df, geometry=geopandas.GeoSeries.from_wkt(df["geometry"]))
return df

def persist(self, df: pd.DataFrame):
Expand Down Expand Up @@ -590,7 +591,19 @@ def exists(self) -> bool:
return self.path.exists()

def read(self) -> pd.DataFrame:
return pd.read_parquet(self.path)
# Unfortunately, a naive `pandas.read_parquet()` does not easily allow
# reconstructing geometries from a GeoPandas Parquet file.
# And vice-versa, `geopandas.read_parquet()` does not support reading
# Parquet file without geometries.
# So we have to guess which case we have.
# TODO is there a cleaner way to do this?
import pyarrow.parquet

metadata = pyarrow.parquet.read_metadata(self.path)
if b"geo" in metadata.metadata:
return geopandas.read_parquet(self.path)
else:
return pd.read_parquet(self.path)

def persist(self, df: pd.DataFrame):
self.path.parent.mkdir(parents=True, exist_ok=True)
Expand Down
68 changes: 49 additions & 19 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import json
import textwrap
import threading
from unittest import mock

import geopandas

# TODO: can we avoid using httpretty?
# We need it for testing the resilience, which uses an HTTPadapter with Retry
# but requests-mock also uses an HTTPAdapter for the mocking and basically
# erases the HTTPAdapter we have set up.
# httpretty avoids this specific problem because it mocks at the socket level,
# But I would rather not have two dependencies with almost the same goal.
import httpretty
import pandas
import pandas as pd
import pytest
import requests
import shapely.geometry.point as shpt
import shapely.geometry

import openeo
from openeo import BatchJob
Expand Down Expand Up @@ -456,6 +460,26 @@ def start_job(row, connection_provider, connection, **kwargs):
assert set(result.backend_name) == {"foo"}


JOB_DB_DF_BASICS = pd.DataFrame(
{
"numbers": [3, 2, 1],
"names": ["apple", "banana", "coconut"],
}
)
JOB_DB_GDF_WITH_GEOMETRY = geopandas.GeoDataFrame(
{
"numbers": [11, 22],
"geometry": [shapely.geometry.Point(1, 2), shapely.geometry.Point(2, 1)],
},
)
JOB_DB_DF_WITH_GEOJSON_STRING = pd.DataFrame(
{
"numbers": [11, 22],
"geometry": ['{"type":"Point","coordinates":[1,2]}', '{"type":"Point","coordinates":[1,2]}'],
}
)


class TestCsvJobDatabase:
def test_read_wkt(self, tmp_path):
wkt_df = pd.DataFrame(
Expand All @@ -467,7 +491,7 @@ def test_read_wkt(self, tmp_path):
path = tmp_path / "jobs.csv"
wkt_df.to_csv(path, index=False)
df = CsvJobDatabase(path).read()
assert isinstance(df.geometry[0], shpt.Point)
assert isinstance(df.geometry[0], shapely.geometry.Point)

def test_read_non_wkt(self, tmp_path):
non_wkt_df = pd.DataFrame(
Expand All @@ -481,34 +505,40 @@ def test_read_non_wkt(self, tmp_path):
df = CsvJobDatabase(path).read()
assert isinstance(df.geometry[0], str)

def test_persist_and_read(self, tmp_path):
orig = pd.DataFrame(
{
"numbers": [3, 2, 1],
"names": ["apple", "banana", "coconut"],
}
)
path = tmp_path / "jobs.csv"
@pytest.mark.parametrize(
["orig"],
[
pytest.param(JOB_DB_DF_BASICS, id="pandas basics"),
pytest.param(JOB_DB_GDF_WITH_GEOMETRY, id="geopandas with geometry"),
pytest.param(JOB_DB_DF_WITH_GEOJSON_STRING, id="pandas with geojson string as geometry"),
],
)
def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame):
path = tmp_path / "jobs.parquet"
CsvJobDatabase(path).persist(orig)
assert path.exists()

loaded = CsvJobDatabase(path).read()
assert list(loaded.dtypes) == list(orig.dtypes)
assert loaded.dtypes.to_dict() == orig.dtypes.to_dict()
assert loaded.equals(orig)
assert type(orig) is type(loaded)


class TestParquetJobDatabase:
def test_persist_and_read(self, tmp_path):
orig = pd.DataFrame(
{
"numbers": [3, 2, 1],
"names": ["apple", "banana", "coconut"],
}
)
@pytest.mark.parametrize(
["orig"],
[
pytest.param(JOB_DB_DF_BASICS, id="pandas basics"),
pytest.param(JOB_DB_GDF_WITH_GEOMETRY, id="geopandas with geometry"),
pytest.param(JOB_DB_DF_WITH_GEOJSON_STRING, id="pandas with geojson string as geometry"),
],
)
def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame):
path = tmp_path / "jobs.parquet"
ParquetJobDatabase(path).persist(orig)
assert path.exists()

loaded = ParquetJobDatabase(path).read()
assert list(loaded.dtypes) == list(orig.dtypes)
assert loaded.dtypes.to_dict() == orig.dtypes.to_dict()
assert loaded.equals(orig)
assert type(orig) is type(loaded)

0 comments on commit 4b8593c

Please sign in to comment.