Skip to content

Commit

Permalink
tests: add test for S3 orphan check and stabilize code
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Mar 5, 2024
1 parent 24d95d0 commit 254b9b9
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 59 deletions.
40 changes: 20 additions & 20 deletions dcor_control/inspect/data_ckan_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def remove_empty_folders(path):
if not path.is_dir():
return

# recurse into subfolders
# recurse into sub-folders
for pp in path.glob("*"):
remove_empty_folders(pp)

Expand All @@ -55,16 +55,16 @@ def remove_resource_data(resource_id, autocorrect=False):
This includes ancillary files as well as data in the user depot.
If `autocorrect` is False, the user is prompted before deletion.
"""
userdepot_path = paths.get_dcor_users_depot_path()
user_depot_path = paths.get_dcor_users_depot_path()
rp = get_resource_path(resource_id)
todel = []
to_del = []

# Resource file
if rp.exists() or rp.is_symlink(): # sometimes symlinks don't "exist" :)
todel.append(rp)
to_del.append(rp)

# Check for ancillary files
todel += sorted(rp.parent.glob(rp.name + "_*"))
to_del += sorted(rp.parent.glob(rp.name + "_*"))

# Check for symlinks and remove the corresponding files in the user depot
if rp.is_symlink():
Expand All @@ -75,42 +75,42 @@ def remove_resource_data(resource_id, autocorrect=False):
target = pathlib.Path(os.path.realpath(rp))
# Only delete symlinked files if they are in the user_depot
# (we don't delete figshare or internal data)
if target.exists() and str(target).startswith(str(userdepot_path)):
todel.append(target)
if target.exists() and str(target).startswith(str(user_depot_path)):
to_del.append(target)

request_removal(todel, autocorrect=autocorrect)
request_removal(to_del, autocorrect=autocorrect)


def request_removal(delpaths, autocorrect=False):
"""Request (user interaction) and perform removal of a list of paths"""
resources_path = paths.get_ckan_storage_path() / "resources"
userdepot_path = paths.get_dcor_users_depot_path()
user_depot_path = paths.get_dcor_users_depot_path()
if autocorrect:
for pp in delpaths:
print("Deleting {}".format(pp))
delok = True
del_ok = True
else:
delok = ask(
del_ok = ask(
"These files are not related to an existing resource: "
+ "".join(["\n - {}".format(pp) for pp in delpaths])
+ "\nDelete these orphaned files?"
)

if delok:
if del_ok:
for pp in delpaths:
pp.unlink()
# Also remove empty dirs
if str(pp).startswith(str(resources_path)):
# /data/ckan-HOSTNAME/resources/00e/a65/e6-cc35-...
remove_empty_folders(pp.parent.parent)
elif str(pp).startswith(str(userdepot_path)):
elif str(pp).startswith(str(user_depot_path)):
# /data/depots/users-HOSTNAME/USER-ORG/f5/ba/pkg_rid_file.rtdc
remove_empty_folders(pp.parent.parent.parent)


def check_orphaned_files(assume_yes=False):
resources_path = paths.get_ckan_storage_path() / "resources"
userdepot_path = paths.get_dcor_users_depot_path()
user_depot_path = paths.get_dcor_users_depot_path()
time_stop = time.time()
click.secho("Collecting resource ids...", bold=True)
resource_ids = get_resource_ids()
Expand All @@ -133,8 +133,8 @@ def check_orphaned_files(assume_yes=False):
elif pp.name[30:]:
# We have an ancillary file or a temporary garbage, like
# .rtdc~.
for asuf in ALLOWED_SUFFIXES:
if pp.name.endswith(asuf):
for a_suf in ALLOWED_SUFFIXES:
if pp.name.endswith(a_suf):
# We have an ancillary file
break
else:
Expand All @@ -144,15 +144,15 @@ def check_orphaned_files(assume_yes=False):
# Scan user depot for orphans
click.secho("Scanning local user depot tree for orphaned files...",
bold=True)
for pp in userdepot_path.rglob("*/*/*/*"):
for pp in user_depot_path.rglob("*/*/*/*"):
res_id = pp.name.split("_")[1]
if res_id not in resource_ids and res_id not in orphans_processed:
if assume_yes:
print("Deleting local file {}".format(pp))
delok = True
del_ok = True
else:
delok = ask("Delete orphaned local file '{}'?".format(pp))
if delok:
del_ok = ask("Delete orphaned local file '{}'?".format(pp))
if del_ok:
pp.unlink()
remove_empty_folders(pp.parent.parent.parent)
orphans_processed.append(res_id)
64 changes: 49 additions & 15 deletions dcor_control/inspect/data_ckan_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,27 @@
ARTIFACT_NAMES = ["condensed", "preview", "resource"]


def check_orphaned_s3_artifacts(assume_yes=False):
"""Check all DCOR buckets for orphaned artifacts"""
def check_orphaned_s3_artifacts(assume_yes=False, older_than_days=7,
purge_orphan_buckets=True):
"""Check all DCOR buckets for orphaned artifacts
Parameters
----------
assume_yes: bool
Set to True for non-interactive mode
older_than_days: int
Buckets must have this minimum age to be considered for deletion
purge_orphan_buckets: bool
Whether to delete buckets that are not related to any circle
"""
s3_client, _, s3_resource = s3.get_s3()

# Find buckets that do not belong to an actual circle and delete them
# list of actual circles
circles_ckan = get_circles_ckan()

# list of circles for which we have buckets that are older than a week
circles_s3 = get_circles_s3(older_than_days=0) # TODO
circles_s3 = get_circles_s3(older_than_days=older_than_days)

# bucket_definition
bucket_scheme = get_ckan_config_option("dcor_object_store.bucket_name")
Expand All @@ -32,18 +43,24 @@ def check_orphaned_s3_artifacts(assume_yes=False):

# find "older_than_days" S3 circles that are not defined in CKAN
for cs3 in circles_s3:
if cs3 not in circles_ckan:
bucket_name = bucket_scheme.format(organization_id=cs3)
if cs3 not in circles_ckan and purge_orphan_buckets:
# Purge buckets that are not representing DCOR circles.
# This is only done if `purge_orphan_buckets` is set.
click.secho(f"Found S3 bucket for non-existent circle {cs3}")
request_bucket_removal(bucket_name=cs3, autocorrect=assume_yes)
request_bucket_removal(
bucket_name=bucket_name,
autocorrect=assume_yes)
continue
# Iterate through the resources of that circle
circle_resources = list_group_resources_ckan(cs3)
bucket_name = bucket_scheme.format(organization_id=cs3)

invalid_artifacts = []
for object_name in iter_bucket_objects_s3(bucket_name):
artifact = object_name.split("/")[0]
if artifact in ARTIFACT_NAMES:
rid = "".join(object_name.split("/")[1:])
assert len(rid) == 36, "sanity check"
if rid not in circle_resources:
invalid_artifacts.append(object_name)

Expand All @@ -64,7 +81,9 @@ def get_circles_ckan():
data = sp.check_output(
f"ckan -c {ckan_ini} list-circles",
shell=True).decode().split("\n")
return [f.split()[0] for f in data if f.strip()]
circle_list = [f.split()[0] for f in data if f.strip()]
print("CKAN circles", circle_list)
return circle_list


@lru_cache(maxsize=32)
Expand All @@ -91,6 +110,7 @@ def get_circles_s3(older_than_days=0):
if r_match is not None:
circle_id = r_match.group(1)
circle_list.append(circle_id)
print("S3 circles", circle_list)
return circle_list


Expand All @@ -101,7 +121,11 @@ def iter_bucket_objects_s3(bucket_name):
"MaxKeys": 100
}
while True:
resp = s3_client.list_objects_v2(**kwargs)
try:
resp = s3_client.list_objects_v2(**kwargs)
except s3_client.exceptions.NoSuchBucket:
# Bucket has been deleted in the meantime
break

for obj in resp.get("Contents", []):
object_name = obj["Key"]
Expand All @@ -116,16 +140,21 @@ def iter_bucket_objects_s3(bucket_name):
def list_group_resources_ckan(group_name_or_id):
"""Return list of resources for a circle or collection"""
ckan_ini = paths.get_ckan_config_path()
data = sp.check_output(
f"ckan -c {ckan_ini} list-group-resources {group_name_or_id}",
shell=True).decode().split("\n")
return [f.strip() for f in data if f.strip()]
try:
data = sp.check_output(
f"ckan -c {ckan_ini} list-group-resources {group_name_or_id}",
shell=True).decode().split("\n")
resources = [f.strip() for f in data if f.strip()]
except sp.CalledProcessError:
resources = []
print("Resources", group_name_or_id, resources)
return resources


def request_bucket_removal(bucket_name, autocorrect=False):
"""Request (user interaction) the removal of an entire bucket"""
if autocorrect:
print(f"Deleting {bucket_name}")
print(f"Deleting bucket {bucket_name}")
del_ok = True
else:
del_ok = ask(f"Completely remove orphan bucket {bucket_name}?")
Expand All @@ -135,11 +164,16 @@ def request_bucket_removal(bucket_name, autocorrect=False):
# Delete the objects
request_removal_from_bucket(
bucket_name=bucket_name,
objects=iter_bucket_objects_s3(bucket_name)
objects=iter_bucket_objects_s3(bucket_name),
autocorrect=True
)
# Delete the bucket if it is not empty
if len(list(iter_bucket_objects_s3(bucket_name))) == 0:
s3_client.delete_bucket(Bucket=bucket_name)
try:
s3_client.delete_bucket(Bucket=bucket_name)
except s3_client.exceptions.NoSuchBucket:
# bucket has been deleted in the meantime
pass


def request_removal_from_bucket(bucket_name, objects, autocorrect=False):
Expand Down
92 changes: 68 additions & 24 deletions tests/test_inspect_data_ckan.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,97 @@
import pathlib
from unittest import mock

import ckan
import ckan.common
import ckan.model
import ckan.tests.factories as factories
import dcor_shared
import ckan.logic
from ckan.tests.helpers import call_action
import ckanext.dcor_schemas.plugin

from dcor_control import inspect
import dcor_shared
from dcor_shared import s3, s3cc
from dcor_shared.testing import make_dataset, synchronous_enqueue_job

from dcor_shared.testing import make_dataset
import pytest


data_path = pathlib.Path(__file__).parent / "data"


def test_check_orphaned_files(create_with_upload, monkeypatch, ckan_config,
tmpdir):
def test_check_orphaned_files_temp(create_with_upload, monkeypatch,
ckan_config, tmpdir):
"""Make sure .rtdc~ files are removed for existing resources"""
monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir))
monkeypatch.setattr(ckan.lib.uploader,
'get_storage_path',
lambda: str(tmpdir))

user = factories.User()

user_obj = ckan.model.User.by_name(user["name"])
monkeypatch.setattr(ckan.common,
'current_user',
user_obj)

owner_org = factories.Organization(users=[{
'name': user['id'],
'capacity': 'admin'
}])
# Note: `call_action` bypasses authorization!
# create 1st dataset
create_context1 = {'ignore_auth': False,
'user': user['name'], 'api_version': 3}

_, res = make_dataset(
create_context1, owner_org,
_, res_dict = make_dataset(
create_with_upload=create_with_upload,
resource_path=data_path / "calibration_beads_47.rtdc",
activate=True,
authors="Peter Pan")

path = dcor_shared.get_resource_path(res["id"])
path = dcor_shared.get_resource_path(res_dict["id"])
path_to_delete = path.with_name(path.stem + "_peter.rtdc~")
path_to_delete.touch()
assert path_to_delete.exists()
inspect.check_orphaned_files(assume_yes=True)
assert not path_to_delete.exists()


@pytest.mark.usefixtures('clean_db', 'with_request_context')
@mock.patch('ckan.plugins.toolkit.enqueue_job',
side_effect=synchronous_enqueue_job)
def test_check_orphaned_s3_artifacts(enqueue_job_mock, create_with_upload,
monkeypatch, ckan_config, tmpdir):
monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir))
monkeypatch.setattr(ckan.lib.uploader,
'get_storage_path',
lambda: str(tmpdir))
monkeypatch.setattr(
ckanext.dcor_schemas.plugin,
'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS',
True)

ds_dict, res_dict = make_dataset(
create_with_upload=create_with_upload,
resource_path=data_path / "calibration_beads_47.rtdc",
activate=True,
private=False,
authors="Peter Pan")

rid = res_dict["id"]

bucket_name, object_name = s3cc.get_s3_bucket_object_for_artifact(rid)

# Check whether the S3 resource exists
assert s3.object_exists(bucket_name, object_name)
# Check that the organization exists
org_list = ckan.logic.get_action("organization_list")()
assert ds_dict["organization"]["name"] in org_list

# Attempt to remove objects from S3, the object should still be there
# afterward.
inspect.check_orphaned_s3_artifacts(assume_yes=True,
older_than_days=0)
assert s3.object_exists(bucket_name, object_name)

# Delete the entire dataset
call_action(action_name="package_delete",
context={'ignore_auth': True, 'user': 'default'},
id=ds_dict["id"]
)
call_action(action_name="dataset_purge",
context={'ignore_auth': True, 'user': 'default'},
id=ds_dict["id"]
)

# Make sure that the S3 object is still there
assert s3.object_exists(bucket_name, object_name)

# Perform the actual cleanup
inspect.check_orphaned_s3_artifacts(assume_yes=True,
older_than_days=0)
assert not s3.object_exists(bucket_name, object_name)

0 comments on commit 254b9b9

Please sign in to comment.