Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid local storage #903

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 82 additions & 31 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,19 @@
from openeogeotrellis.deploy.batch_job_metadata import _assemble_result_metadata, _transform_stac_metadata, \
_convert_job_metadatafile_outputs_to_s3_urls, _get_tracker_metadata
from openeogeotrellis.integrations.hadoop import setup_kerberos_auth
from openeogeotrellis.udf import (collect_python_udf_dependencies, install_python_udf_dependencies,
UDF_PYTHON_DEPENDENCIES_FOLDER_NAME, )
from openeogeotrellis.utils import (describe_path, log_memory, get_jvm, add_permissions, json_default, )
from openeogeotrellis.udf import (
collect_python_udf_dependencies,
install_python_udf_dependencies,
UDF_PYTHON_DEPENDENCIES_FOLDER_NAME,
)
from openeogeotrellis.utils import (
describe_path,
log_memory,
get_jvm,
add_permissions,
json_default,
to_jsonable,
)

logger = logging.getLogger('openeogeotrellis.deploy.batch_job')

Expand Down Expand Up @@ -308,16 +318,21 @@ def run_job(
"institution": "openEO platform - Geotrellis backend: " + __version__
}

assets_metadata = {}
assets_metadata = []
ml_model_metadata = None

unique_process_ids = CollectUniqueProcessIdsVisitor().accept_process_graph(process_graph).process_ids

result_metadata = _assemble_result_metadata(tracer=tracer, result=results[0], output_file=output_file,
unique_process_ids=unique_process_ids,
asset_metadata={},
ml_model_metadata=ml_model_metadata,skip_gdal=True)
#perform a first metadata write _before_ actually computing the result. This provides a bit more info, even if the job fails.
result_metadata = _assemble_result_metadata(
tracer=tracer,
result=results[0],
output_file=output_file,
unique_process_ids=unique_process_ids,
apply_gdal=False,
asset_metadata={},
ml_model_metadata=ml_model_metadata,
)
# perform a first metadata write _before_ actually computing the result. This provides a bit more info, even if the job fails.
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir)

for result in results:
Expand All @@ -340,8 +355,7 @@ def run_job(
for name, asset in the_assets_metadata.items():
add_permissions(Path(asset["href"]), stat.S_IWGRP)
logger.info(f"wrote {len(the_assets_metadata)} assets to {output_file}")
_export_workspace(result, result_metadata, the_assets_metadata, stac_metadata_dir=job_dir)
assets_metadata = {**assets_metadata, **the_assets_metadata}
assets_metadata.append(the_assets_metadata)

if any(dependency['card4l'] for dependency in dependencies): # TODO: clean this up
logger.debug("awaiting Sentinel Hub CARD4L data...")
Expand Down Expand Up @@ -397,17 +411,45 @@ def run_job(
}
}]

result_metadata = _assemble_result_metadata(tracer=tracer, result=result, output_file=output_file,
unique_process_ids=unique_process_ids,
asset_metadata=assets_metadata,
ml_model_metadata=ml_model_metadata,skip_gdal=True)
result_metadata = _assemble_result_metadata(
tracer=tracer,
result=result,
output_file=output_file,
unique_process_ids=unique_process_ids,
apply_gdal=False,
asset_metadata={
# TODO: flattened instead of per-result, clean this up?
asset_key: asset_metadata
for result_assets_metadata in assets_metadata
for asset_key, asset_metadata in result_assets_metadata.items()
},
ml_model_metadata=ml_model_metadata,
)

write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir)
logger.debug("Starting GDAL-based retrieval of asset metadata")
result_metadata = _assemble_result_metadata(tracer=tracer, result=result, output_file=output_file,
unique_process_ids=unique_process_ids,
asset_metadata=assets_metadata,
ml_model_metadata=ml_model_metadata,skip_gdal=False)
result_metadata = _assemble_result_metadata(
tracer=tracer,
result=result,
output_file=output_file,
unique_process_ids=unique_process_ids,
apply_gdal=True,
asset_metadata={
# TODO: flattened instead of per-result, clean this up?
asset_key: asset_metadata
for result_assets_metadata in assets_metadata
for asset_key, asset_metadata in result_assets_metadata.items()
},
ml_model_metadata=ml_model_metadata,
)

assert len(results) == len(assets_metadata)
for result, result_assets_metadata in zip(results, assets_metadata):
_export_workspace(
result, result_metadata, result_asset_keys=result_assets_metadata.keys(), stac_metadata_dir=job_dir
)

# TODO: delete exported assets from local disk/Swift
finally:
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, job_dir)

Expand Down Expand Up @@ -441,20 +483,24 @@ def write_metadata(metadata, metadata_file, job_dir: Path):
_convert_job_metadatafile_outputs_to_s3_urls(metadata_file)


def _export_workspace(result: SaveResult, result_metadata: dict, assets_metadata: dict, stac_metadata_dir: Path):
asset_hrefs = [asset["href"] for asset in assets_metadata.values()]
def _export_workspace(result: SaveResult, result_metadata: dict, result_asset_keys: List[str], stac_metadata_dir: Path):
asset_hrefs = [result_metadata.get("assets", {})[asset_key]["href"] for asset_key in result_asset_keys]
stac_hrefs = [
f"file:{path}" for path in _write_exported_stac_collection(stac_metadata_dir, result_metadata, assets_metadata)
f"file:{path}"
for path in _write_exported_stac_collection(stac_metadata_dir, result_metadata, result_asset_keys)
]
result.export_workspace(
workspace_repository=backend_config_workspace_repository,
hrefs=asset_hrefs + stac_hrefs,
default_merge=OPENEO_BATCH_JOB_ID,
remove_original=True,
)


def _write_exported_stac_collection(
job_dir: Path, result_metadata: dict, assets_metadata: dict
job_dir: Path,
result_metadata: dict,
asset_keys: List[str],
) -> List[Path]: # TODO: change to Set?
def write_stac_item_file(asset_id: str, asset: dict) -> Path:
item_file = job_dir / f"{asset_id}.json"
Expand All @@ -480,21 +526,26 @@ def write_stac_item_file(asset_id: str, asset: dict) -> Path:
"properties": properties,
"links": [], # TODO
"assets": {
asset_id: dict_no_none(**{
"href": f"./{Path(asset['href']).name}",
"roles": asset.get("roles"),
"type": asset.get("type"),
"eo:bands": asset.get("bands"),
})
asset_id: dict_no_none(
**{
"href": f"./{Path(asset['href']).name}",
"roles": asset.get("roles"),
"type": asset.get("type"),
"eo:bands": asset.get("bands"),
"raster:bands": to_jsonable(asset.get("raster:bands")),
}
)
},
}

with open(item_file, "wt") as fi:
json.dump(stac_item, fi)
json.dump(stac_item, fi, allow_nan=False)

return item_file

item_files = [write_stac_item_file(asset_id, asset) for asset_id, asset in assets_metadata.items()]
item_files = [
write_stac_item_file(asset_key, result_metadata.get("assets", {})[asset_key]) for asset_key in asset_keys
]

def item_link(item_file: Path) -> dict:
return {
Expand Down
24 changes: 16 additions & 8 deletions openeogeotrellis/deploy/batch_job_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@
logger = logging.getLogger(__name__)


def _assemble_result_metadata(tracer: DryRunDataTracer, result: SaveResult, output_file: Path,
unique_process_ids: Set[str], asset_metadata: Dict = None,
ml_model_metadata: Dict = None, skip_gdal=False) -> dict:
def _assemble_result_metadata(
tracer: DryRunDataTracer,
result: SaveResult,
output_file: Path,
unique_process_ids: Set[str],
apply_gdal,
asset_metadata: Dict = None,
ml_model_metadata: Dict = None,
) -> dict:
metadata = extract_result_metadata(tracer)

def epsg_code(gps_crs) -> Optional[int]:
Expand Down Expand Up @@ -69,17 +75,19 @@ def epsg_code(gps_crs) -> Optional[int]:
}
else:
# New approach: SaveResult has generated metadata already for us
if skip_gdal:
metadata['assets'] = asset_metadata
else:
if apply_gdal:
try:
_extract_asset_metadata(
job_result_metadata=metadata, asset_metadata=asset_metadata, job_dir=output_file.parent, epsg=epsg
job_result_metadata=metadata,
asset_metadata=asset_metadata,
job_dir=output_file.parent,
epsg=epsg,
)
except Exception as e:
error_summary = GeoPySparkBackendImplementation.summarize_exception_static(e)
logger.exception("Error while creating asset metadata: " + error_summary.summary)

else:
metadata["assets"] = asset_metadata

# _extract_asset_metadata may already fill in metadata["epsg"], but only
# if the value of epsg was None. So we don't want to overwrite it with
Expand Down
28 changes: 8 additions & 20 deletions openeogeotrellis/job_tracker_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@
DynamicEtlApiJobCostCalculator,
)
from openeogeotrellis.job_registry import DoubleJobRegistry, ZkJobRegistry, get_deletable_dependency_sources
from openeogeotrellis.utils import StatsReporter, dict_merge_recursive

from openeogeotrellis.utils import StatsReporter, dict_merge_recursive, to_jsonable

# Note: hardcoded logger name as this script is executed directly which kills the usefulness of `__name__`.
_log = logging.getLogger("openeogeotrellis.job_tracker_v2")
Expand Down Expand Up @@ -566,9 +565,13 @@ def _sync_job_status(
job_costs = None

total_usage = dict_merge_recursive(job_metadata.usage.to_dict(), result_metadata.get("usage", {}))
double_job_registry.set_results_metadata(job_id, user_id, costs=job_costs,
usage=self._to_jsonable(dict(total_usage)),
results_metadata=self._to_jsonable(result_metadata))
double_job_registry.set_results_metadata(
job_id,
user_id,
costs=job_costs,
usage=to_jsonable(dict(total_usage)),
results_metadata=to_jsonable(result_metadata),
)

datetime_formatter = Rfc3339(propagate_none=True)

Expand All @@ -580,21 +583,6 @@ def _sync_job_status(
finished=datetime_formatter.datetime(job_metadata.finish_time),
)

@staticmethod
def _to_jsonable_float(x: float) -> Union[float, str]:
return x if isfinite(x) else str(x)

@staticmethod
def _to_jsonable(x):
if isinstance(x, float):
return JobTracker._to_jsonable_float(x)
if isinstance(x, dict):
return {JobTracker._to_jsonable(key): JobTracker._to_jsonable(value) for key, value in x.items()}
elif isinstance(x, list):
return [JobTracker._to_jsonable(elem) for elem in x]

return x


class CliApp:
def main(self, *, args: Optional[List[str]] = None):
Expand Down
16 changes: 16 additions & 0 deletions openeogeotrellis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,3 +752,19 @@ def _make_set_for_key(

def map_optional(f: Callable[[T], U], optional: Optional[T]) -> Optional[U]:
return None if optional is None else f(optional)


def to_jsonable_float(x: float) -> Union[float, str]:
"""Replaces nan, inf and -inf with its string representation to allow JSON serialization."""
return x if math.isfinite(x) else str(x)


def to_jsonable(x):
if isinstance(x, float):
return to_jsonable_float(x)
if isinstance(x, dict):
return {to_jsonable(key): to_jsonable(value) for key, value in x.items()}
elif isinstance(x, list):
return [to_jsonable(elem) for elem in x]

return x
21 changes: 14 additions & 7 deletions openeogeotrellis/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ class ObjectStorageWorkspace(Workspace):
def __init__(self, bucket: str):
self.bucket = bucket

def import_file(self, file: Path, merge: str):
def import_file(self, file: Path, merge: str, remove_original: bool = False):
merge = os.path.normpath(merge)
subdirectory = merge[1:] if merge.startswith("/") else merge

key = subdirectory + "/" + file.name
s3_client().upload_file(str(file), self.bucket, key)

_log.debug(f"uploaded {file.absolute()} to s3://{self.bucket}/{key}")
if remove_original:
file.unlink()

def import_object(self, s3_uri: str, merge: str):
_log.debug(f"{'moved' if remove_original else 'uploaded'} {file.absolute()} to s3://{self.bucket}/{key}")

def import_object(self, s3_uri: str, merge: str, remove_original: bool = False):
uri_parts = urlparse(s3_uri)

if not uri_parts.scheme or uri_parts.scheme.lower() != "s3":
Expand All @@ -35,8 +38,12 @@ def import_object(self, s3_uri: str, merge: str):

target_key = f"{merge}/{filename}"

s3_client().copy_object(
CopySource={"Bucket": source_bucket, "Key": source_key}, Bucket=self.bucket, Key=target_key
)
s3 = s3_client()
s3.copy_object(CopySource={"Bucket": source_bucket, "Key": source_key}, Bucket=self.bucket, Key=target_key)
if remove_original:
s3.delete_object(Bucket=source_bucket, Key=source_key)

_log.debug(f"copied s3://{source_bucket}/{source_key} to s3://{self.bucket}/{target_key}")
_log.debug(
f"{'moved' if remove_original else 'copied'} "
f"s3://{source_bucket}/{source_key} to s3://{self.bucket}/{target_key}"
)
Loading