Skip to content

Commit

Permalink
DD-1467 DD-1493 cleanup of batch processors (#55)
Browse files Browse the repository at this point in the history
* moved searching for all dataset pids out of get_entries, consider util wrappers around both when needed again
* reworded logging to something neutral
* dropped the batch processors introduced by DD-1493
  • Loading branch information
jo-pol authored Mar 7, 2024
1 parent 874b360 commit 823b98e
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 190 deletions.
54 changes: 28 additions & 26 deletions src/datastation/common/batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,22 @@
from datastation.common.csv import CsvReport


def get_pids(pid_or_pids_file, search_api=None, query="*", subtree="root", object_type="dataset", dry_run=False):
def get_pids(pid_or_pids_file):
""" kept for backward compatibility"""
return get_entries(pid_or_pids_file, search_api, query, subtree, object_type, dry_run)
return get_entries(pid_or_pids_file)


def get_entries(entries, search_api=None, query="*", subtree="root", object_type="dataset", dry_run=False):
def get_entries(entries):
"""
Args:
entries: A string (e.g. a PID for the default object_type 'dataset'),
or a file with a list of strings or dict objects.
search_api: must be provided if entries is None
query: passed on to search_api().search
object_type: passed on to search_api().search
subtree (object): passed on to search_api().search
dry_run: Do not perform the action, but show what would be done.
Only applicable if entries is None.
Returns: an iterator with strings or dict objects.
if entries is not provided, it searches for all objects of object_type
and extracts their pids, fetching the result pages lazy.
entries: A string (e.g. a dataset PID or dataverse alias),
or a plain text file with a string per line
Returns: a list with strings
"""
if entries is None:
result = search_api.search(query=query, subtree=subtree, object_type=object_type, dry_run=dry_run)
return map(lambda rec: rec['global_id'], result)
return []
elif os.path.isfile(os.path.expanduser(entries)):
objects = []
with open(os.path.expanduser(entries)) as f:
Expand All @@ -53,15 +44,18 @@ def process_entries(self, entries, callback):
""" The callback is called for each entry in entries.
Args:
entries: a single string (e.g. PID) or dict, or a list of string or dicts
callback: a function that takes a single entry as argument
entries: a stream of arguments for the callback.
callback: a function that takes a single entry as argument.
Returns:
None
If an entry is a string or a dictionary with key 'PID',
the value is used for progress logging.
"""
if type(entries) is list:
if entries is None:
logging.info("Nothing to process")
return
elif type(entries) is list:
num_entries = len(entries)
logging.info(f"Start batch processing on {num_entries} entries")
else:
Expand All @@ -74,19 +68,27 @@ def process_entries(self, entries, callback):
if self.wait > 0 and i > 1:
logging.debug(f"Waiting {self.wait} seconds before processing next entry")
time.sleep(self.wait)
if type(obj) is dict and 'PID' in obj.keys():
logging.info(f"Processing {i} of {num_entries}: {obj['PID']}")
elif type(obj) is str:
logging.info(f"Processing {i} of {num_entries}: {obj}")
if num_entries > 1:
progress_message = f"Processing {i} of {num_entries} entries"
elif num_entries == -1:
progress_message = f"Processing entry number {i}"
else:
logging.info(f"Processing {i} of {num_entries}")
progress_message = None
if progress_message is not None:
if type(obj) is str:
logging.info(f"{progress_message}: {obj}")
elif type(obj) is dict and 'PID' in obj.keys():
logging.info(f"{progress_message}: {obj['PID']}")
else:
logging.info(progress_message)
callback(obj)
except Exception as e:
logging.exception("Exception occurred", exc_info=True)
logging.exception(f"Exception occurred on entry nr {i}", exc_info=True)
if self.fail_on_first_error:
logging.error(f"Stop processing because of an exception: {e}")
break
logging.debug("fail_on_first_error is False, continuing...")
logging.info(f"Batch processing ended: {i} entries processed")


class BatchProcessorWithReport(BatchProcessor):
Expand Down
131 changes: 0 additions & 131 deletions src/datastation/common/common_batch_processing.py

This file was deleted.

2 changes: 2 additions & 0 deletions src/datastation/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import argparse
import requests


def add_dry_run_arg(parser):
parser.add_argument('-d', '--dry-run', action='store_true',
help='Do not perform the action, but show what would be done.')
Expand Down Expand Up @@ -118,6 +119,7 @@ def sizeof_fmt(num, suffix='B'):
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)


def plural(word: str):
if word.endswith('s'):
return word + "es"
Expand Down
8 changes: 6 additions & 2 deletions src/datastation/dv_dataset_edit_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ def parse_value_args():
with open(args.pid_or_file, newline='') as csvfile:
# restkey must be an invalid <typeName> to prevent it from being processed
reader = DictReader(csvfile, skipinitialspace=True, restkey='rest.column')
if 'PID' not in reader.fieldnames:
parser.error(f"No column 'PID' found in " + args.pid_or_file)
if reader is None or reader.fieldnames is None or len(reader.fieldnames) == 0:
parser.error(f"{args.pid_or_file} is empty or not a CSV file.")
return
if 'PID' not in reader.fieldnames or len(reader.fieldnames) == 0:
parser.error(f"No column 'PID' (or no other columns) found in " + args.pid_or_file)
return
run(reader)
else:
run([parse_value_args()])
Expand Down
15 changes: 10 additions & 5 deletions src/datastation/dv_dataset_get_attributes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
import json

from datastation.common.batch_processing import get_pids, BatchProcessor, BatchProcessorWithReport
from datastation.common.batch_processing import get_entries, BatchProcessor
from datastation.common.config import init
from datastation.common.utils import add_batch_processor_args, add_dry_run_arg
from datastation.dataverse.datasets import Datasets
Expand All @@ -14,9 +14,9 @@ def main():

attr_group = parser.add_argument_group()
attr_group.add_argument("--user-with-role", dest="user_with_role",
help="List users with a specific role on the dataset",)
help="List users with a specific role on the dataset",)
attr_group.add_argument("--storage", dest="storage", action="store_true",
help="The storage in bytes",)
help="The storage in bytes",)

group = parser.add_mutually_exclusive_group(required=True)

Expand All @@ -40,8 +40,13 @@ def main():
dataverse_client = DataverseClient(config["dataverse"])

datasets = Datasets(dataverse_client, dry_run=args.dry_run)
BatchProcessor(wait=args.wait, fail_on_first_error=args.fail_fast).process_pids(
get_pids(args.pid_or_pids_file, dataverse_client.search_api(), dry_run=args.dry_run),
if args.all_datasets:
search_result = dataverse_client.search_api().search(dry_run=args.dry_run)
pids = map(lambda rec: rec['global_id'], search_result) # lazy iterator
else:
pids = get_entries(args.pid_or_pids_file)
BatchProcessor(wait=args.wait, fail_on_first_error=args.fail_fast).process_entries(
pids,
lambda pid: print(json.dumps(datasets.get_dataset_attributes(pid, **attribute_options), skipkeys=True)))


Expand Down
12 changes: 6 additions & 6 deletions src/datastation/dv_dataverse_role_assignment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse

from datastation.common.common_batch_processing import get_aliases, DataverseBatchProcessorWithReport
from datastation.common.batch_processing import get_entries, BatchProcessorWithReport
from datastation.common.config import init
from datastation.common.utils import add_batch_processor_args, add_dry_run_arg
from datastation.dataverse.dataverse_client import DataverseClient
Expand All @@ -13,8 +13,8 @@ def list_role_assignments(args, dataverse_client: DataverseClient):

def add_role_assignments(args, dataverse_client: DataverseClient):
role_assignment = DataverseRole(dataverse_client, args.dry_run)
aliases = get_aliases(args.alias_or_alias_file)
create_batch_processor(args).process_aliases(
aliases = get_entries(args.alias_or_alias_file)
create_batch_processor(args).process_entries(
aliases,
lambda alias,
csv_report: role_assignment.add_role_assignment(args.role_assignment,
Expand All @@ -25,8 +25,8 @@ def add_role_assignments(args, dataverse_client: DataverseClient):

def remove_role_assignments(args, dataverse_client: DataverseClient):
role_assignment = DataverseRole(dataverse_client, args.dry_run)
aliases = get_aliases(args.alias_or_alias_file)
create_batch_processor(args).process_aliases(
aliases = get_entries(args.alias_or_alias_file)
create_batch_processor(args).process_entries(
aliases,
lambda alias,
csv_report: role_assignment.remove_role_assignment(args.role_assignment,
Expand All @@ -36,7 +36,7 @@ def remove_role_assignments(args, dataverse_client: DataverseClient):


def create_batch_processor(args):
return DataverseBatchProcessorWithReport(
return BatchProcessorWithReport(
wait=args.wait,
fail_on_first_error=args.fail_fast,
report_file=args.report_file,
Expand Down
Loading

0 comments on commit 823b98e

Please sign in to comment.