diff --git a/cumulus/common.py b/cumulus/common.py index 0cc43cf0..75e0024f 100644 --- a/cumulus/common.py +++ b/cumulus/common.py @@ -241,15 +241,23 @@ def print_header(name: str) -> None: # ############################################################################### -def timestamp_datetime() -> str: +def datetime_now() -> datetime.datetime: + """ + UTC date and time, suitable for use as a FHIR 'instant' data type + """ + return datetime.datetime.now(datetime.timezone.utc) + + +def timestamp_datetime(time: datetime.datetime = None) -> str: """ Human-readable UTC date and time :return: MMMM-DD-YYY hh:mm:ss """ - return datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + time = time or datetime_now() + return time.strftime('%Y-%m-%d %H:%M:%S') -def timestamp_filename() -> str: +def timestamp_filename(time: datetime.datetime = None) -> str: """ Human-readable UTC date and time suitable for a filesystem path @@ -257,4 +265,5 @@ def timestamp_filename() -> str: :return: MMMM-DD-YYY__hh.mm.ss """ - return datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d__%H.%M.%S') + time = time or datetime_now() + return time.strftime('%Y-%m-%d__%H.%M.%S') diff --git a/cumulus/config.py b/cumulus/config.py index 0f18af5f..a81e4c80 100644 --- a/cumulus/config.py +++ b/cumulus/config.py @@ -1,5 +1,6 @@ """ETL job config with summary""" +import datetime import os from socket import gethostname @@ -15,6 +16,7 @@ def __init__( dir_input: str, store_format: store.Format, dir_phi: store.Root, + timestamp: datetime.datetime = None, comment: str = None, batch_size: int = 1, # this default is never really used - overridden by command line args ): @@ -28,7 +30,7 @@ def __init__( self.dir_input = dir_input self.format = store_format self.dir_phi = dir_phi - self.timestamp = common.timestamp_filename() + self.timestamp = common.timestamp_filename(timestamp) self.hostname = gethostname() self.comment = comment or '' self.batch_size = batch_size diff --git a/cumulus/context.py b/cumulus/context.py new file mode 100644 index 00000000..0b3445e4 --- /dev/null +++ b/cumulus/context.py @@ -0,0 +1,71 @@ +""" +ETL job context, holding some persistent state between runs. +""" + +import datetime +from typing import Optional + +from cumulus import common + + +class JobContext: + """ + Context for an ETL job. + + This is not really settings or config in the sense of user-derived values. + A prime example is "last successful run time" which the next run might want to use as part of its work + (like to only extract the changed data since the last run) but is otherwise ephemeral information. + + Another possible (but not yet implemented) use might be to store some past config values, + like the last used output format. To either give the user a chance to correct a mistake or to know + how to change from one format to another. + + This is stored in the phi/build directory and is thus safe to store possible PHI. + """ + + _LAST_SUCCESSFUL_DATETIME = 'last_successful_datetime' + _LAST_SUCCESSFUL_INPUT_DIR = 'last_successful_input_dir' + _LAST_SUCCESSFUL_OUTPUT_DIR = 'last_successful_output_dir' + + def __init__(self, path: str): + """ + :param path: path to context file + """ + self._path: str = path + try: + self._data = common.read_json(path) + except (FileNotFoundError, PermissionError): + self._data = {} + + @property + def last_successful_datetime(self) -> Optional[datetime.datetime]: + value = self._data.get(self._LAST_SUCCESSFUL_DATETIME) + if value is not None: + return datetime.datetime.fromisoformat(value) + return None + + @last_successful_datetime.setter + def last_successful_datetime(self, value: datetime.datetime) -> None: + self._data[self._LAST_SUCCESSFUL_DATETIME] = value.isoformat() + + @property + def last_successful_input_dir(self) -> Optional[str]: + return self._data.get(self._LAST_SUCCESSFUL_INPUT_DIR) + + @last_successful_input_dir.setter + def last_successful_input_dir(self, value: str) -> None: + self._data[self._LAST_SUCCESSFUL_INPUT_DIR] = value + + @property + def last_successful_output_dir(self) -> Optional[str]: + return self._data.get(self._LAST_SUCCESSFUL_OUTPUT_DIR) + + @last_successful_output_dir.setter + def last_successful_output_dir(self, value: str) -> None: + self._data[self._LAST_SUCCESSFUL_OUTPUT_DIR] = value + + def save(self) -> None: + common.write_json(self._path, self.as_json(), indent=4) # pretty-print this since it isn't large + + def as_json(self) -> dict: + return dict(self._data) diff --git a/cumulus/etl.py b/cumulus/etl.py index 29bca3d1..01c82d9e 100644 --- a/cumulus/etl.py +++ b/cumulus/etl.py @@ -24,7 +24,7 @@ from fhirclient.models.patient import Patient from fhirclient.models.resource import Resource -from cumulus import common, ctakes, deid, formats, loaders, store +from cumulus import common, context, ctakes, deid, formats, loaders, store from cumulus.config import JobConfig, JobSummary ############################################################################### @@ -361,6 +361,9 @@ def main(args: List[str]): root_output = store.Root(args.dir_output, create=True) root_phi = store.Root(args.dir_phi, create=True) + job_context = context.JobContext(root_phi.joinpath('context.json')) + job_datetime = common.datetime_now() # grab timestamp before we do anything + if args.input_format == 'i2b2': config_loader = loaders.I2b2Loader(root_input) else: @@ -375,19 +378,27 @@ def main(args: List[str]): deid_dir = load_and_deidentify(config_loader) + # Prepare config for jobs config = JobConfig(config_loader, deid_dir.name, config_store, root_phi, comment=args.comment, - batch_size=args.batch_size) - + batch_size=args.batch_size, timestamp=job_datetime) + common.write_json(config.path_config(), config.as_json(), indent=4) common.print_header('Configuration:') print(json.dumps(config.as_json(), indent=4)) - common.write_json(config.path_config(), config.as_json(), indent=4) - + # Finally, actually run the meat of the pipeline! summaries = etl_job(config) + + # Print results to the console common.print_header('Results:') for summary in summaries: print(json.dumps(summary.as_json(), indent=4)) + # Update job context for future runs + job_context.last_successful_datetime = job_datetime + job_context.last_successful_input_dir = root_input.path + job_context.last_successful_output_dir = root_output.path + job_context.save() + def main_cli(): main(sys.argv[1:]) diff --git a/tests/test_context.py b/tests/test_context.py new file mode 100644 index 00000000..4928bba3 --- /dev/null +++ b/tests/test_context.py @@ -0,0 +1,49 @@ +"""Tests for context.py""" + +import datetime +import json +import tempfile +import unittest + +from cumulus.context import JobContext + + +class TestJobContext(unittest.TestCase): + """Test case for JobContext""" + + def test_missing_file_context(self): + context = JobContext('nope') + self.assertEqual({}, context.as_json()) + + def test_save_and_load(self): + with tempfile.NamedTemporaryFile(mode='w+') as f: + json.dump({ + 'last_successful_input_dir': '/input/dir', + }, f) + f.flush() + + context = JobContext(f.name) + self.assertEqual({ + 'last_successful_input_dir': '/input/dir', + }, context.as_json()) + + context.last_successful_datetime = datetime.datetime(2008, 5, 1, 14, 30, 30, tzinfo=datetime.timezone.utc) + self.assertEqual({ + 'last_successful_datetime': '2008-05-01T14:30:30+00:00', + 'last_successful_input_dir': '/input/dir', + }, context.as_json()) + + context.save() + context2 = JobContext(f.name) + self.assertEqual(context.as_json(), context2.as_json()) + + def test_last_successful_props(self): + context = JobContext('nope') + context.last_successful_datetime = datetime.datetime(2008, 5, 1, 14, 30, 30, tzinfo=datetime.timezone.utc) + context.last_successful_input_dir = '/input' + context.last_successful_output_dir = '/output' + self.assertEqual({ + 'last_successful_datetime': '2008-05-01T14:30:30+00:00', + 'last_successful_input_dir': '/input', + 'last_successful_output_dir': '/output', + }, context.as_json()) diff --git a/tests/test_etl.py b/tests/test_etl.py index 01a20075..f65d6ade 100644 --- a/tests/test_etl.py +++ b/tests/test_etl.py @@ -16,7 +16,7 @@ import s3fs from fhirclient.models.extension import Extension -from cumulus import common, config, deid, etl, store +from cumulus import common, config, context, deid, etl, store from cumulus.loaders.i2b2 import extract from tests.ctakesmock import CtakesMixin, fake_ctakes_extract @@ -191,6 +191,38 @@ def test_comment(self): self.assertEqual(config_file['comment'], 'Run by foo on machine bar') +class TestI2b2EtlJobContext(BaseI2b2EtlSimple): + """Test case for the job context data""" + + def setUp(self): + super().setUp() + self.context_path = os.path.join(self.phi_path, 'context.json') + + def test_context_updated_on_success(self): + """Verify that we update the success timestamp etc. when the job succeeds""" + self.run_etl() + job_context = context.JobContext(self.context_path) + self.assertEqual('2021-09-14T21:23:45+00:00', job_context.last_successful_datetime.isoformat()) + self.assertEqual(self.input_path, job_context.last_successful_input_dir) + self.assertEqual(self.output_path, job_context.last_successful_output_dir) + + def test_context_not_updated_on_failure(self): + """Verify that we don't update the success timestamp etc. when the job fails""" + input_context = { + 'last_successful_datetime': '2000-01-01T10:10:10+00:00', + 'last_successful_input': '/input', + 'last_successful_output': '/output', + } + common.write_json(self.context_path, input_context) + + with mock.patch('cumulus.etl.etl_job', side_effect=ZeroDivisionError): + with self.assertRaises(ZeroDivisionError): + self.run_etl() + + # Confirm we didn't change anything + self.assertEqual(input_context, common.read_json(self.context_path)) + + class TestI2b2EtlBatches(BaseI2b2EtlSimple): """Test case for etl batching"""