Skip to content

Commit

Permalink
remove files/objects exported to workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
bossie committed Oct 17, 2024
1 parent 2ad7f2c commit bbe2ed9
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 21 deletions.
1 change: 1 addition & 0 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ def _export_workspace(result: SaveResult, result_metadata: dict, result_asset_ke
workspace_repository=backend_config_workspace_repository,
hrefs=asset_hrefs + stac_hrefs,
default_merge=OPENEO_BATCH_JOB_ID,
remove_original=True,
)


Expand Down
21 changes: 14 additions & 7 deletions openeogeotrellis/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ class ObjectStorageWorkspace(Workspace):
def __init__(self, bucket: str):
self.bucket = bucket

def import_file(self, file: Path, merge: str):
def import_file(self, file: Path, merge: str, remove_original: bool = False):
merge = os.path.normpath(merge)
subdirectory = merge[1:] if merge.startswith("/") else merge

key = subdirectory + "/" + file.name
s3_client().upload_file(str(file), self.bucket, key)

_log.debug(f"uploaded {file.absolute()} to s3://{self.bucket}/{key}")
if remove_original:
file.unlink()

def import_object(self, s3_uri: str, merge: str):
_log.debug(f"{'moved' if remove_original else 'uploaded'} {file.absolute()} to s3://{self.bucket}/{key}")

def import_object(self, s3_uri: str, merge: str, remove_original: bool = False):
uri_parts = urlparse(s3_uri)

if not uri_parts.scheme or uri_parts.scheme.lower() != "s3":
Expand All @@ -35,8 +38,12 @@ def import_object(self, s3_uri: str, merge: str):

target_key = f"{merge}/{filename}"

s3_client().copy_object(
CopySource={"Bucket": source_bucket, "Key": source_key}, Bucket=self.bucket, Key=target_key
)
s3 = s3_client()
s3.copy_object(CopySource={"Bucket": source_bucket, "Key": source_key}, Bucket=self.bucket, Key=target_key)
if remove_original:
s3.delete_object(Bucket=source_bucket, Key=source_key)

_log.debug(f"copied s3://{source_bucket}/{source_key} to s3://{self.bucket}/{target_key}")
_log.debug(
f"{'moved' if remove_original else 'copied'} "
f"s3://{source_bucket}/{source_key} to s3://{self.bucket}/{target_key}"
)
4 changes: 2 additions & 2 deletions tests/test_batch_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ def test_export_workspace(tmp_path):

job_dir_files = set(os.listdir(tmp_path))
assert len(job_dir_files) > 0
# assert "openEO_2021-01-05Z.tif" not in job_dir_files # TODO: uncomment
assert "openEO_2021-01-05Z.tif" not in job_dir_files

workspace_files = set(os.listdir(workspace_dir))
assert workspace_files == {"collection.json", "openEO_2021-01-05Z.tif", "openEO_2021-01-05Z.tif.json"}
Expand Down Expand Up @@ -1393,7 +1393,7 @@ def test_multiple_save_result_single_export_workspace(tmp_path):
job_dir_files = set(os.listdir(tmp_path))
assert len(job_dir_files) > 0
assert "openEO.nc" in job_dir_files
# assert "openEO.tif" not in job_dir_files # TODO: uncomment
assert "openEO.tif" not in job_dir_files

workspace_files = set(os.listdir(workspace_dir))
assert workspace_files == {"collection.json", "openEO.tif", "openEO.tif.json"}
Expand Down
46 changes: 34 additions & 12 deletions tests/test_object_storage_workspace.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,58 @@
from pathlib import Path
from typing import Set

import pytest


from openeogeotrellis.workspace import ObjectStorageWorkspace


def test_import_file(mock_s3_client, mock_s3_bucket):
some_file = Path(__file__)
@pytest.mark.parametrize("remove_original", [False, True])
def test_import_file(tmp_path, mock_s3_client, mock_s3_bucket, remove_original):
source_directory = tmp_path / "src"
source_directory.mkdir()
source_file = source_directory / "file"
source_file.touch()

merge = "some/target"

workspace = ObjectStorageWorkspace(bucket="openeo-fake-bucketname")
workspace.import_file(some_file, merge=merge)
workspace.import_file(source_file, merge=merge, remove_original=remove_original)

assert _workspace_keys(mock_s3_client, workspace.bucket, merge) == {f"{merge}/{some_file.name}"}
assert _workspace_keys(mock_s3_client, workspace.bucket, prefix=merge) == {f"{merge}/{source_file.name}"}
assert source_file.exists() != remove_original


def test_import_object(mock_s3_client, mock_s3_bucket):
some_file = Path(__file__)
@pytest.mark.parametrize("remove_original", [False, True])
def test_import_object(tmp_path, mock_s3_client, mock_s3_bucket, remove_original):
source_bucket = target_bucket = "openeo-fake-bucketname"
source_key = "some/source/object"
merge = "some/target"
assert source_key != merge

with open(some_file, "rb") as f:
source_directory = tmp_path / "src"
source_directory.mkdir()
source_file = source_directory / "file"
source_file.touch()

with open(source_file, "rb") as f:
mock_s3_client.put_object(Bucket=source_bucket, Key=source_key, Body=f.read())

assert _workspace_keys(mock_s3_client, source_bucket, prefix=source_key) == {source_key}

workspace = ObjectStorageWorkspace(bucket=target_bucket)
workspace.import_object(f"s3://{source_bucket}/{source_key}", merge=merge)
workspace.import_object(f"s3://{source_bucket}/{source_key}", merge=merge, remove_original=remove_original)

assert _workspace_keys(mock_s3_client, workspace.bucket, prefix=merge) == {"some/target/object"}

if remove_original:
assert _workspace_keys(mock_s3_client, source_bucket) == {"some/target/object"}
else:
assert _workspace_keys(mock_s3_client, source_bucket) == {"some/source/object", "some/target/object"}

assert _workspace_keys(mock_s3_client, workspace.bucket, merge) == {"some/target/object"}

def _workspace_keys(s3_client, bucket, prefix=None) -> Set[str]:
kwargs = dict(Bucket=bucket)
if prefix:
kwargs["Prefix"] = prefix

def _workspace_keys(s3_client, bucket, prefix) -> Set[str]:
return {obj["Key"] for obj in s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix).get("Contents", [])}
return {obj["Key"] for obj in s3_client.list_objects_v2(**kwargs).get("Contents", [])}

0 comments on commit bbe2ed9

Please sign in to comment.