From 823b98e9552d99bec0903b681ec1f89fa113aa88 Mon Sep 17 00:00:00 2001 From: Jo Pol Date: Thu, 7 Mar 2024 10:36:57 +0100 Subject: [PATCH] DD-1467 DD-1493 cleanup of batch processors (#55) * moved searching for all dataset pids out of get_entries, consider util wrappers around both when needed again * reworded logging to something neutral * dropped the batch processors introduced by DD-1493 --- src/datastation/common/batch_processing.py | 54 +++--- .../common/common_batch_processing.py | 131 -------------- src/datastation/common/utils.py | 2 + src/datastation/dv_dataset_edit_metadata.py | 8 +- src/datastation/dv_dataset_get_attributes.py | 15 +- .../dv_dataverse_role_assignment.py | 12 +- src/tests/test_batch_processing.py | 168 +++++++++++++++--- 7 files changed, 200 insertions(+), 190 deletions(-) delete mode 100644 src/datastation/common/common_batch_processing.py diff --git a/src/datastation/common/batch_processing.py b/src/datastation/common/batch_processing.py index baf50f9..f0a3471 100644 --- a/src/datastation/common/batch_processing.py +++ b/src/datastation/common/batch_processing.py @@ -5,31 +5,22 @@ from datastation.common.csv import CsvReport -def get_pids(pid_or_pids_file, search_api=None, query="*", subtree="root", object_type="dataset", dry_run=False): +def get_pids(pid_or_pids_file): """ kept for backward compatibility""" - return get_entries(pid_or_pids_file, search_api, query, subtree, object_type, dry_run) + return get_entries(pid_or_pids_file) -def get_entries(entries, search_api=None, query="*", subtree="root", object_type="dataset", dry_run=False): +def get_entries(entries): """ Args: - entries: A string (e.g. a PID for the default object_type 'dataset'), - or a file with a list of strings or dict objects. - search_api: must be provided if entries is None - query: passed on to search_api().search - object_type: passed on to search_api().search - subtree (object): passed on to search_api().search - dry_run: Do not perform the action, but show what would be done. - Only applicable if entries is None. - - Returns: an iterator with strings or dict objects. - if entries is not provided, it searches for all objects of object_type - and extracts their pids, fetching the result pages lazy. + entries: A string (e.g. a dataset PID or dataverse alias), + or a plain text file with a string per line + + Returns: a list with strings """ if entries is None: - result = search_api.search(query=query, subtree=subtree, object_type=object_type, dry_run=dry_run) - return map(lambda rec: rec['global_id'], result) + return [] elif os.path.isfile(os.path.expanduser(entries)): objects = [] with open(os.path.expanduser(entries)) as f: @@ -53,15 +44,18 @@ def process_entries(self, entries, callback): """ The callback is called for each entry in entries. Args: - entries: a single string (e.g. PID) or dict, or a list of string or dicts - callback: a function that takes a single entry as argument + entries: a stream of arguments for the callback. + callback: a function that takes a single entry as argument. Returns: None If an entry is a string or a dictionary with key 'PID', the value is used for progress logging. """ - if type(entries) is list: + if entries is None: + logging.info("Nothing to process") + return + elif type(entries) is list: num_entries = len(entries) logging.info(f"Start batch processing on {num_entries} entries") else: @@ -74,19 +68,27 @@ def process_entries(self, entries, callback): if self.wait > 0 and i > 1: logging.debug(f"Waiting {self.wait} seconds before processing next entry") time.sleep(self.wait) - if type(obj) is dict and 'PID' in obj.keys(): - logging.info(f"Processing {i} of {num_entries}: {obj['PID']}") - elif type(obj) is str: - logging.info(f"Processing {i} of {num_entries}: {obj}") + if num_entries > 1: + progress_message = f"Processing {i} of {num_entries} entries" + elif num_entries == -1: + progress_message = f"Processing entry number {i}" else: - logging.info(f"Processing {i} of {num_entries}") + progress_message = None + if progress_message is not None: + if type(obj) is str: + logging.info(f"{progress_message}: {obj}") + elif type(obj) is dict and 'PID' in obj.keys(): + logging.info(f"{progress_message}: {obj['PID']}") + else: + logging.info(progress_message) callback(obj) except Exception as e: - logging.exception("Exception occurred", exc_info=True) + logging.exception(f"Exception occurred on entry nr {i}", exc_info=True) if self.fail_on_first_error: logging.error(f"Stop processing because of an exception: {e}") break logging.debug("fail_on_first_error is False, continuing...") + logging.info(f"Batch processing ended: {i} entries processed") class BatchProcessorWithReport(BatchProcessor): diff --git a/src/datastation/common/common_batch_processing.py b/src/datastation/common/common_batch_processing.py deleted file mode 100644 index c9ada7a..0000000 --- a/src/datastation/common/common_batch_processing.py +++ /dev/null @@ -1,131 +0,0 @@ -import logging -import os -import time - -from datastation.common.csv import CsvReport -from datastation.common.utils import plural - - -# Base class for batch processing of items -class CommonBatchProcessor: - def __init__(self, item_name="item", wait=0.1, fail_on_first_error=True): - self.item_name = item_name - self.wait = wait - self.fail_on_first_error = fail_on_first_error - - def process_items(self, items, callback): - if type(items) is list: - num_items = len(items) - logging.info(f"Start batch processing on {num_items} {plural(self.item_name)}") - else: - logging.info(f"Start batch processing on unknown number of {plural(self.item_name)}") - num_items = -1 - i = 0 - for item in items: - i += 1 - try: - if self.wait > 0 and i > 1: - logging.debug(f"Waiting {self.wait} seconds before processing next {self.item_name}") - time.sleep(self.wait) - logging.info(f"Processing {i} of {num_items}: {item}") - callback(item) - except Exception as e: - logging.exception("Exception occurred", exc_info=True) - if self.fail_on_first_error: - logging.error(f"Stop processing because of an exception: {e}") - break - logging.debug("fail_on_first_error is False, continuing...") - - -def get_provided_items_iterator(item_or_items_file, item_name="item"): - if item_or_items_file is None: - logging.debug(f"No {plural(item_name)} provided.") - return None - elif os.path.isfile(os.path.expanduser(item_or_items_file)): - items = [] - with open(os.path.expanduser(item_or_items_file)) as f: - for line in f: - items.append(line.strip()) - return items - else: - return [item_or_items_file] - - -def get_pids(pid_or_pids_file, search_api=None, query="*", subtree="root", object_type="dataset", dry_run=False): - """ - - Args: - pid_or_pids_file: The dataset pid, or a file with a list of pids. - search_api: must be provided if pid_or_pids_file is None - query: passed on to search_api().search - object_type: passed on to search_api().search - subtree (object): passed on to search_api().search - dry_run: Do not perform the action, but show what would be done. - Only applicable if pid_or_pids_file is None. - - Returns: an iterator with pids, - if pid_or_pids_file is not provided, it searches for all datasets - and extracts their pids, fetching the result pages lazy. - """ - if pid_or_pids_file is None: - result = search_api.search(query=query, subtree=subtree, object_type=object_type, dry_run=dry_run) - return map(lambda rec: rec['global_id'], result) - else: - return get_provided_items_iterator(pid_or_pids_file, "pid") - - -def get_aliases(alias_or_aliases_file): - """ - - Args: - alias_or_aliases_file: The dataverse alias, or a file with a list of aliases. - - Returns: an iterator with aliases - """ - return get_provided_items_iterator(alias_or_aliases_file, "alias") - - -class DatasetBatchProcessor(CommonBatchProcessor): - - def __init__(self, wait=0.1, fail_on_first_error=True): - super().__init__("pid", wait, fail_on_first_error) - - def process_pids(self, pids, callback): - super().process_items(pids, callback) - - -class DatasetBatchProcessorWithReport(DatasetBatchProcessor): - - def __init__(self, report_file=None, headers=None, wait=0.1, fail_on_first_error=True): - super().__init__(wait, fail_on_first_error) - if headers is None: - headers = ["DOI", "Modified", "Change"] - self.report_file = report_file - self.headers = headers - - def process_pids(self, pids, callback): - with CsvReport(os.path.expanduser(self.report_file), self.headers) as csv_report: - super().process_pids(pids, lambda pid: callback(pid, csv_report)) - - -class DataverseBatchProcessor(CommonBatchProcessor): - - def __init__(self, wait=0.1, fail_on_first_error=True): - super().__init__("alias", wait, fail_on_first_error) - - def process_aliases(self, aliases, callback): - super().process_items(aliases, callback) - - -class DataverseBatchProcessorWithReport(DataverseBatchProcessor): - - def __init__(self, report_file=None, headers=None, wait=0.1, fail_on_first_error=True): - super().__init__(wait, fail_on_first_error) - if headers is None: - headers = ["alias", "Modified", "Change"] - self.report_file = report_file - self.headers = headers - - def process_aliases(self, aliases, callback): - with CsvReport(os.path.expanduser(self.report_file), self.headers) as csv_report: - super().process_aliases(aliases, lambda alias: callback(alias, csv_report)) diff --git a/src/datastation/common/utils.py b/src/datastation/common/utils.py index d4b4155..c0f63db 100644 --- a/src/datastation/common/utils.py +++ b/src/datastation/common/utils.py @@ -5,6 +5,7 @@ import argparse import requests + def add_dry_run_arg(parser): parser.add_argument('-d', '--dry-run', action='store_true', help='Do not perform the action, but show what would be done.') @@ -118,6 +119,7 @@ def sizeof_fmt(num, suffix='B'): num /= 1024.0 return "%.1f%s%s" % (num, 'Yi', suffix) + def plural(word: str): if word.endswith('s'): return word + "es" diff --git a/src/datastation/dv_dataset_edit_metadata.py b/src/datastation/dv_dataset_edit_metadata.py index 4702629..7aa05f3 100644 --- a/src/datastation/dv_dataset_edit_metadata.py +++ b/src/datastation/dv_dataset_edit_metadata.py @@ -55,8 +55,12 @@ def parse_value_args(): with open(args.pid_or_file, newline='') as csvfile: # restkey must be an invalid to prevent it from being processed reader = DictReader(csvfile, skipinitialspace=True, restkey='rest.column') - if 'PID' not in reader.fieldnames: - parser.error(f"No column 'PID' found in " + args.pid_or_file) + if reader is None or reader.fieldnames is None or len(reader.fieldnames) == 0: + parser.error(f"{args.pid_or_file} is empty or not a CSV file.") + return + if 'PID' not in reader.fieldnames or len(reader.fieldnames) == 0: + parser.error(f"No column 'PID' (or no other columns) found in " + args.pid_or_file) + return run(reader) else: run([parse_value_args()]) diff --git a/src/datastation/dv_dataset_get_attributes.py b/src/datastation/dv_dataset_get_attributes.py index de6fe2c..4c151d1 100644 --- a/src/datastation/dv_dataset_get_attributes.py +++ b/src/datastation/dv_dataset_get_attributes.py @@ -1,7 +1,7 @@ import argparse import json -from datastation.common.batch_processing import get_pids, BatchProcessor, BatchProcessorWithReport +from datastation.common.batch_processing import get_entries, BatchProcessor from datastation.common.config import init from datastation.common.utils import add_batch_processor_args, add_dry_run_arg from datastation.dataverse.datasets import Datasets @@ -14,9 +14,9 @@ def main(): attr_group = parser.add_argument_group() attr_group.add_argument("--user-with-role", dest="user_with_role", - help="List users with a specific role on the dataset",) + help="List users with a specific role on the dataset",) attr_group.add_argument("--storage", dest="storage", action="store_true", - help="The storage in bytes",) + help="The storage in bytes",) group = parser.add_mutually_exclusive_group(required=True) @@ -40,8 +40,13 @@ def main(): dataverse_client = DataverseClient(config["dataverse"]) datasets = Datasets(dataverse_client, dry_run=args.dry_run) - BatchProcessor(wait=args.wait, fail_on_first_error=args.fail_fast).process_pids( - get_pids(args.pid_or_pids_file, dataverse_client.search_api(), dry_run=args.dry_run), + if args.all_datasets: + search_result = dataverse_client.search_api().search(dry_run=args.dry_run) + pids = map(lambda rec: rec['global_id'], search_result) # lazy iterator + else: + pids = get_entries(args.pid_or_pids_file) + BatchProcessor(wait=args.wait, fail_on_first_error=args.fail_fast).process_entries( + pids, lambda pid: print(json.dumps(datasets.get_dataset_attributes(pid, **attribute_options), skipkeys=True))) diff --git a/src/datastation/dv_dataverse_role_assignment.py b/src/datastation/dv_dataverse_role_assignment.py index 474ccbd..6dc2827 100644 --- a/src/datastation/dv_dataverse_role_assignment.py +++ b/src/datastation/dv_dataverse_role_assignment.py @@ -1,6 +1,6 @@ import argparse -from datastation.common.common_batch_processing import get_aliases, DataverseBatchProcessorWithReport +from datastation.common.batch_processing import get_entries, BatchProcessorWithReport from datastation.common.config import init from datastation.common.utils import add_batch_processor_args, add_dry_run_arg from datastation.dataverse.dataverse_client import DataverseClient @@ -13,8 +13,8 @@ def list_role_assignments(args, dataverse_client: DataverseClient): def add_role_assignments(args, dataverse_client: DataverseClient): role_assignment = DataverseRole(dataverse_client, args.dry_run) - aliases = get_aliases(args.alias_or_alias_file) - create_batch_processor(args).process_aliases( + aliases = get_entries(args.alias_or_alias_file) + create_batch_processor(args).process_entries( aliases, lambda alias, csv_report: role_assignment.add_role_assignment(args.role_assignment, @@ -25,8 +25,8 @@ def add_role_assignments(args, dataverse_client: DataverseClient): def remove_role_assignments(args, dataverse_client: DataverseClient): role_assignment = DataverseRole(dataverse_client, args.dry_run) - aliases = get_aliases(args.alias_or_alias_file) - create_batch_processor(args).process_aliases( + aliases = get_entries(args.alias_or_alias_file) + create_batch_processor(args).process_entries( aliases, lambda alias, csv_report: role_assignment.remove_role_assignment(args.role_assignment, @@ -36,7 +36,7 @@ def remove_role_assignments(args, dataverse_client: DataverseClient): def create_batch_processor(args): - return DataverseBatchProcessorWithReport( + return BatchProcessorWithReport( wait=args.wait, fail_on_first_error=args.fail_fast, report_file=args.report_file, diff --git a/src/tests/test_batch_processing.py b/src/tests/test_batch_processing.py index 8bb9819..3b703b7 100644 --- a/src/tests/test_batch_processing.py +++ b/src/tests/test_batch_processing.py @@ -1,4 +1,3 @@ -import os import time from datetime import datetime @@ -7,50 +6,132 @@ class TestBatchProcessor: - def test_process_pids(self, capsys): + def test_process_pids(self, capsys, caplog): + caplog.set_level('DEBUG') batch_processor = BatchProcessor() - pids = ["1", "2", "3"] + pids = ["A", "B", "C"] callback = lambda pid: print(pid) batch_processor.process_pids(pids, callback) captured = capsys.readouterr() - assert captured.out == "1\n2\n3\n" + assert captured.out == "A\nB\nC\n" + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on 3 entries') + assert (caplog.records[1].message == 'Processing 1 of 3 entries: A') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing 2 of 3 entries: B') + assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[5].message == 'Processing 3 of 3 entries: C') + assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed') - def test_process_objects_with_pids(self, capsys): + def test_process_dicts_with_pids(self, capsys, caplog): + caplog.set_level('DEBUG') batch_processor = BatchProcessor() - objects = [{"PID": "1", "param": "value1"}, - {"PID": "2", "param": "value2"}, - {"PID": "1", "param": "value3"}] + objects = [{"PID": "a", "param": "value1"}, + {"PID": "b", "param": "value2"}, + {"PID": "a", "param": "value3"}] batch_processor.process_pids(objects, lambda obj: print(obj)) captured = capsys.readouterr() - assert captured.out == ("{'PID': '1', 'param': 'value1'}\n" - "{'PID': '2', 'param': 'value2'}\n" - "{'PID': '1', 'param': 'value3'}\n") + assert captured.out == ("{'PID': 'a', 'param': 'value1'}\n" + "{'PID': 'b', 'param': 'value2'}\n" + "{'PID': 'a', 'param': 'value3'}\n") + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on 3 entries') + assert (caplog.records[1].message == 'Processing 1 of 3 entries: a') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing 2 of 3 entries: b') + assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[5].message == 'Processing 3 of 3 entries: a') + assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed') - def test_process_objects_without_pids(self, capsys): + def test_empty_stream(self, capsys, caplog): + caplog.set_level('DEBUG') + batch_processor = BatchProcessor() + objects = map(lambda rec: rec[''], []) # lazy empty iterator + batch_processor.process_pids(objects, lambda obj: print(obj)) + assert capsys.readouterr().out == "" + assert caplog.text == ( + 'INFO root:batch_processing.py:62 Start batch processing on unknown number of entries\n' + 'INFO root:batch_processing.py:91 Batch processing ended: 0 entries processed\n') + assert len(caplog.records) == 2 + assert (caplog.records[0].message == 'Start batch processing on unknown number of entries') + assert (caplog.records[1].message == 'Batch processing ended: 0 entries processed') + + def test_single_item(self, capsys, caplog): + caplog.set_level('DEBUG') + batch_processor = BatchProcessor() + batch_processor.process_entries(['a'], lambda obj: print(obj)) + assert capsys.readouterr().out == "a\n" + assert caplog.text == ( + 'INFO root:batch_processing.py:60 Start batch processing on 1 entries\n' + 'INFO root:batch_processing.py:91 Batch processing ended: 1 entries processed\n') + assert len(caplog.records) == 2 + assert (caplog.records[0].message == 'Start batch processing on 1 entries') + assert (caplog.records[1].message == 'Batch processing ended: 1 entries processed') + + def test_no_entries(self, capsys, caplog): + caplog.set_level('DEBUG') + batch_processor = BatchProcessor() + batch_processor.process_pids(None, lambda obj: print(obj)) + assert capsys.readouterr().out == "" + assert caplog.text == 'INFO root:batch_processing.py:56 Nothing to process\n' + assert len(caplog.records) == 1 + assert (caplog.records[0].message == 'Nothing to process') + + def test_empty_list(self, capsys, caplog): + caplog.set_level('DEBUG') + batch_processor = BatchProcessor() + batch_processor.process_pids([], lambda obj: print(obj)) + assert capsys.readouterr().out == "" + assert caplog.text == ('INFO root:batch_processing.py:60 Start batch processing on 0 entries\n' + 'INFO root:batch_processing.py:91 Batch processing ended: 0 entries ' + 'processed\n') + assert len(caplog.records) == 2 + assert (caplog.records[0].message == 'Start batch processing on 0 entries') + assert (caplog.records[1].message == 'Batch processing ended: 0 entries processed') + + def test_process_objects_without_pids(self, capsys, caplog): + caplog.set_level('DEBUG') batch_processor = BatchProcessor() objects = [{"no-pid": "1", "param": "value1"}, {"no-pid": "2", "param": "value2"}, {"no-pid": "1", "param": "value3"}] batch_processor.process_pids(objects, lambda obj: print(obj['param'])) captured = capsys.readouterr() - assert captured.out == ("value1\nvalue2\nvalue3\n") + assert captured.out == "value1\nvalue2\nvalue3\n" + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on 3 entries') + assert (caplog.records[1].message == 'Processing 1 of 3 entries') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing 2 of 3 entries') + assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[5].message == 'Processing 3 of 3 entries') + assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed') - def test_process_pids_with_wait_on_iterator(self, capsys): + def test_process_pids_with_wait_on_iterator(self, capsys, caplog): + caplog.set_level('DEBUG') batch_processor = BatchProcessor(wait=0.1) def as_is(rec): time.sleep(0.1) print(f"lazy-{rec}") return rec - pids = map(as_is, ["1", "2", "3"]) + pids = map(as_is, ["a", "b", "c"]) callback = lambda pid: print(pid) start_time = datetime.now() batch_processor.process_pids(pids, callback) end_time = datetime.now() captured = capsys.readouterr() # as_is is called alternated with callback - assert captured.out == "lazy-1\n1\nlazy-2\n2\nlazy-3\n3\n" + assert captured.out == "lazy-a\na\nlazy-b\nb\nlazy-c\nc\n" assert (end_time - start_time).total_seconds() >= 0.5 + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on unknown number of entries') + assert (caplog.records[1].message == 'Processing entry number 1: a') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing entry number 2: b') + assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[5].message == 'Processing entry number 3: c') + assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed') def test_process_pids_with_wait_on_list(self, capsys): def as_is(rec): @@ -58,7 +139,7 @@ def as_is(rec): print(f"lazy-{rec}") return rec batch_processor = BatchProcessor(wait=0.1) - pids = [as_is(rec) for rec in ["1", "2", "3"]] + pids = [as_is(rec) for rec in ["1", "2", "3"]] callback = lambda pid: print(pid) start_time = datetime.now() batch_processor.process_pids(pids, callback) @@ -69,13 +150,60 @@ def as_is(rec): assert captured.out == "lazy-1\nlazy-2\nlazy-3\n1\n2\n3\n" assert (end_time - start_time).total_seconds() >= 0.2 + def test_process_exception(self, caplog): + caplog.set_level('DEBUG') + + def raise_second(rec): + if rec == "b": + raise Exception("b is not allowed") + + batch_processor = BatchProcessor(wait=0.1) + pids = ["a", "b", "c"] + callback = lambda pid: raise_second(pid) + batch_processor.process_pids(pids, callback) + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on 3 entries') + assert (caplog.records[1].message == 'Processing 1 of 3 entries: a') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing 2 of 3 entries: b') + assert (caplog.records[4].message == 'Exception occurred on entry nr 2') + assert (caplog.records[5].message == 'Stop processing because of an exception: b is not allowed') + + # actually the 2nd entry is not processed + assert (caplog.records[6].message == 'Batch processing ended: 2 entries processed') + + def test_continue_after_exception(self, caplog): + caplog.set_level('DEBUG') + + def raise_second(rec): + if rec == "b": + raise Exception("b is not allowed") + + batch_processor = BatchProcessor(wait=0.1, fail_on_first_error=False) + pids = ["a", "b", "c"] + callback = lambda pid: raise_second(pid) + batch_processor.process_pids(pids, callback) + assert len(caplog.records) == 9 + assert (caplog.records[5].message == 'fail_on_first_error is False, continuing...') + assert (caplog.records[6].message == 'Waiting 0.1 seconds before processing next entry') + # see other tests for other lines + def test_get_single_pid(self): pids = get_pids('doi:10.5072/DAR/ATALUT') assert pids == ['doi:10.5072/DAR/ATALUT'] def test_get_pids_from_file(self, tmp_path): - with open(os.path.join(tmp_path, 'pids.txt'), 'w') as f: + pids_file = tmp_path / "pids.txt" + with open(pids_file, 'w') as f: f.write('doi:10.5072/DAR/ATALUT\ndoi:10.17026/dans-xfg-s8q3\n') f.close() - pids = get_pids(f.name) - assert pids == ['doi:10.5072/DAR/ATALUT','doi:10.17026/dans-xfg-s8q3'] + pids = get_pids(pids_file) + assert pids == ['doi:10.5072/DAR/ATALUT', 'doi:10.17026/dans-xfg-s8q3'] + + def test_get_pids_from_empty_file(self, tmp_path): + pids_file = tmp_path / "empty.txt" + open(pids_file, 'w').close() + assert get_pids(pids_file) == [] + + def test_no_pids_or_file(self): + assert get_pids(None) == []