From 35f9c08b1c17f6e138226a8a71b35540f438f083 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Wed, 30 Nov 2022 14:55:13 -0500 Subject: [PATCH] etl: add new context.json file in the PHI/build dir This file holds some run-to-run key/value pairs. It's starting with some info from the last successful run (intended to be used for incremental bulk exports) but might grow other status information over time, like what the current output format is, etc. --- cumulus/common.py | 17 ++++++++--- cumulus/config.py | 4 ++- cumulus/context.py | 71 +++++++++++++++++++++++++++++++++++++++++++ cumulus/etl.py | 21 ++++++++++--- tests/test_context.py | 49 +++++++++++++++++++++++++++++ tests/test_etl.py | 34 ++++++++++++++++++++- 6 files changed, 185 insertions(+), 11 deletions(-) create mode 100644 cumulus/context.py create mode 100644 tests/test_context.py diff --git a/cumulus/common.py b/cumulus/common.py index 0cc43cf0..75e0024f 100644 --- a/cumulus/common.py +++ b/cumulus/common.py @@ -241,15 +241,23 @@ def print_header(name: str) -> None: # ############################################################################### -def timestamp_datetime() -> str: +def datetime_now() -> datetime.datetime: + """ + UTC date and time, suitable for use as a FHIR 'instant' data type + """ + return datetime.datetime.now(datetime.timezone.utc) + + +def timestamp_datetime(time: datetime.datetime = None) -> str: """ Human-readable UTC date and time :return: MMMM-DD-YYY hh:mm:ss """ - return datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + time = time or datetime_now() + return time.strftime('%Y-%m-%d %H:%M:%S') -def timestamp_filename() -> str: +def timestamp_filename(time: datetime.datetime = None) -> str: """ Human-readable UTC date and time suitable for a filesystem path @@ -257,4 +265,5 @@ def timestamp_filename() -> str: :return: MMMM-DD-YYY__hh.mm.ss """ - return datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d__%H.%M.%S') + time = time or datetime_now() + return time.strftime('%Y-%m-%d__%H.%M.%S') diff --git a/cumulus/config.py b/cumulus/config.py index 0f18af5f..a81e4c80 100644 --- a/cumulus/config.py +++ b/cumulus/config.py @@ -1,5 +1,6 @@ """ETL job config with summary""" +import datetime import os from socket import gethostname @@ -15,6 +16,7 @@ def __init__( dir_input: str, store_format: store.Format, dir_phi: store.Root, + timestamp: datetime.datetime = None, comment: str = None, batch_size: int = 1, # this default is never really used - overridden by command line args ): @@ -28,7 +30,7 @@ def __init__( self.dir_input = dir_input self.format = store_format self.dir_phi = dir_phi - self.timestamp = common.timestamp_filename() + self.timestamp = common.timestamp_filename(timestamp) self.hostname = gethostname() self.comment = comment or '' self.batch_size = batch_size diff --git a/cumulus/context.py b/cumulus/context.py new file mode 100644 index 00000000..0b3445e4 --- /dev/null +++ b/cumulus/context.py @@ -0,0 +1,71 @@ +""" +ETL job context, holding some persistent state between runs. +""" + +import datetime +from typing import Optional + +from cumulus import common + + +class JobContext: + """ + Context for an ETL job. + + This is not really settings or config in the sense of user-derived values. + A prime example is "last successful run time" which the next run might want to use as part of its work + (like to only extract the changed data since the last run) but is otherwise ephemeral information. + + Another possible (but not yet implemented) use might be to store some past config values, + like the last used output format. To either give the user a chance to correct a mistake or to know + how to change from one format to another. + + This is stored in the phi/build directory and is thus safe to store possible PHI. + """ + + _LAST_SUCCESSFUL_DATETIME = 'last_successful_datetime' + _LAST_SUCCESSFUL_INPUT_DIR = 'last_successful_input_dir' + _LAST_SUCCESSFUL_OUTPUT_DIR = 'last_successful_output_dir' + + def __init__(self, path: str): + """ + :param path: path to context file + """ + self._path: str = path + try: + self._data = common.read_json(path) + except (FileNotFoundError, PermissionError): + self._data = {} + + @property + def last_successful_datetime(self) -> Optional[datetime.datetime]: + value = self._data.get(self._LAST_SUCCESSFUL_DATETIME) + if value is not None: + return datetime.datetime.fromisoformat(value) + return None + + @last_successful_datetime.setter + def last_successful_datetime(self, value: datetime.datetime) -> None: + self._data[self._LAST_SUCCESSFUL_DATETIME] = value.isoformat() + + @property + def last_successful_input_dir(self) -> Optional[str]: + return self._data.get(self._LAST_SUCCESSFUL_INPUT_DIR) + + @last_successful_input_dir.setter + def last_successful_input_dir(self, value: str) -> None: + self._data[self._LAST_SUCCESSFUL_INPUT_DIR] = value + + @property + def last_successful_output_dir(self) -> Optional[str]: + return self._data.get(self._LAST_SUCCESSFUL_OUTPUT_DIR) + + @last_successful_output_dir.setter + def last_successful_output_dir(self, value: str) -> None: + self._data[self._LAST_SUCCESSFUL_OUTPUT_DIR] = value + + def save(self) -> None: + common.write_json(self._path, self.as_json(), indent=4) # pretty-print this since it isn't large + + def as_json(self) -> dict: + return dict(self._data) diff --git a/cumulus/etl.py b/cumulus/etl.py index 29bca3d1..01c82d9e 100644 --- a/cumulus/etl.py +++ b/cumulus/etl.py @@ -24,7 +24,7 @@ from fhirclient.models.patient import Patient from fhirclient.models.resource import Resource -from cumulus import common, ctakes, deid, formats, loaders, store +from cumulus import common, context, ctakes, deid, formats, loaders, store from cumulus.config import JobConfig, JobSummary ############################################################################### @@ -361,6 +361,9 @@ def main(args: List[str]): root_output = store.Root(args.dir_output, create=True) root_phi = store.Root(args.dir_phi, create=True) + job_context = context.JobContext(root_phi.joinpath('context.json')) + job_datetime = common.datetime_now() # grab timestamp before we do anything + if args.input_format == 'i2b2': config_loader = loaders.I2b2Loader(root_input) else: @@ -375,19 +378,27 @@ def main(args: List[str]): deid_dir = load_and_deidentify(config_loader) + # Prepare config for jobs config = JobConfig(config_loader, deid_dir.name, config_store, root_phi, comment=args.comment, - batch_size=args.batch_size) - + batch_size=args.batch_size, timestamp=job_datetime) + common.write_json(config.path_config(), config.as_json(), indent=4) common.print_header('Configuration:') print(json.dumps(config.as_json(), indent=4)) - common.write_json(config.path_config(), config.as_json(), indent=4) - + # Finally, actually run the meat of the pipeline! summaries = etl_job(config) + + # Print results to the console common.print_header('Results:') for summary in summaries: print(json.dumps(summary.as_json(), indent=4)) + # Update job context for future runs + job_context.last_successful_datetime = job_datetime + job_context.last_successful_input_dir = root_input.path + job_context.last_successful_output_dir = root_output.path + job_context.save() + def main_cli(): main(sys.argv[1:]) diff --git a/tests/test_context.py b/tests/test_context.py new file mode 100644 index 00000000..4928bba3 --- /dev/null +++ b/tests/test_context.py @@ -0,0 +1,49 @@ +"""Tests for context.py""" + +import datetime +import json +import tempfile +import unittest + +from cumulus.context import JobContext + + +class TestJobContext(unittest.TestCase): + """Test case for JobContext""" + + def test_missing_file_context(self): + context = JobContext('nope') + self.assertEqual({}, context.as_json()) + + def test_save_and_load(self): + with tempfile.NamedTemporaryFile(mode='w+') as f: + json.dump({ + 'last_successful_input_dir': '/input/dir', + }, f) + f.flush() + + context = JobContext(f.name) + self.assertEqual({ + 'last_successful_input_dir': '/input/dir', + }, context.as_json()) + + context.last_successful_datetime = datetime.datetime(2008, 5, 1, 14, 30, 30, tzinfo=datetime.timezone.utc) + self.assertEqual({ + 'last_successful_datetime': '2008-05-01T14:30:30+00:00', + 'last_successful_input_dir': '/input/dir', + }, context.as_json()) + + context.save() + context2 = JobContext(f.name) + self.assertEqual(context.as_json(), context2.as_json()) + + def test_last_successful_props(self): + context = JobContext('nope') + context.last_successful_datetime = datetime.datetime(2008, 5, 1, 14, 30, 30, tzinfo=datetime.timezone.utc) + context.last_successful_input_dir = '/input' + context.last_successful_output_dir = '/output' + self.assertEqual({ + 'last_successful_datetime': '2008-05-01T14:30:30+00:00', + 'last_successful_input_dir': '/input', + 'last_successful_output_dir': '/output', + }, context.as_json()) diff --git a/tests/test_etl.py b/tests/test_etl.py index 01a20075..f65d6ade 100644 --- a/tests/test_etl.py +++ b/tests/test_etl.py @@ -16,7 +16,7 @@ import s3fs from fhirclient.models.extension import Extension -from cumulus import common, config, deid, etl, store +from cumulus import common, config, context, deid, etl, store from cumulus.loaders.i2b2 import extract from tests.ctakesmock import CtakesMixin, fake_ctakes_extract @@ -191,6 +191,38 @@ def test_comment(self): self.assertEqual(config_file['comment'], 'Run by foo on machine bar') +class TestI2b2EtlJobContext(BaseI2b2EtlSimple): + """Test case for the job context data""" + + def setUp(self): + super().setUp() + self.context_path = os.path.join(self.phi_path, 'context.json') + + def test_context_updated_on_success(self): + """Verify that we update the success timestamp etc. when the job succeeds""" + self.run_etl() + job_context = context.JobContext(self.context_path) + self.assertEqual('2021-09-14T21:23:45+00:00', job_context.last_successful_datetime.isoformat()) + self.assertEqual(self.input_path, job_context.last_successful_input_dir) + self.assertEqual(self.output_path, job_context.last_successful_output_dir) + + def test_context_not_updated_on_failure(self): + """Verify that we don't update the success timestamp etc. when the job fails""" + input_context = { + 'last_successful_datetime': '2000-01-01T10:10:10+00:00', + 'last_successful_input': '/input', + 'last_successful_output': '/output', + } + common.write_json(self.context_path, input_context) + + with mock.patch('cumulus.etl.etl_job', side_effect=ZeroDivisionError): + with self.assertRaises(ZeroDivisionError): + self.run_etl() + + # Confirm we didn't change anything + self.assertEqual(input_context, common.read_json(self.context_path)) + + class TestI2b2EtlBatches(BaseI2b2EtlSimple): """Test case for etl batching"""