Skip to content

Commit

Permalink
Merge pull request #88 from smart-on-fhir/mikix/last-run
Browse files Browse the repository at this point in the history
etl: add new context.json file in the PHI/build dir
  • Loading branch information
mikix authored Nov 30, 2022
2 parents 9831604 + 35f9c08 commit eab9670
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 11 deletions.
17 changes: 13 additions & 4 deletions cumulus/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,20 +241,29 @@ def print_header(name: str) -> None:
#
###############################################################################

def timestamp_datetime() -> str:
def datetime_now() -> datetime.datetime:
"""
UTC date and time, suitable for use as a FHIR 'instant' data type
"""
return datetime.datetime.now(datetime.timezone.utc)


def timestamp_datetime(time: datetime.datetime = None) -> str:
"""
Human-readable UTC date and time
:return: MMMM-DD-YYY hh:mm:ss
"""
return datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
time = time or datetime_now()
return time.strftime('%Y-%m-%d %H:%M:%S')


def timestamp_filename() -> str:
def timestamp_filename(time: datetime.datetime = None) -> str:
"""
Human-readable UTC date and time suitable for a filesystem path
In particular, there are no characters that need awkward escaping.
:return: MMMM-DD-YYY__hh.mm.ss
"""
return datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d__%H.%M.%S')
time = time or datetime_now()
return time.strftime('%Y-%m-%d__%H.%M.%S')
4 changes: 3 additions & 1 deletion cumulus/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""ETL job config with summary"""

import datetime
import os
from socket import gethostname

Expand All @@ -15,6 +16,7 @@ def __init__(
dir_input: str,
store_format: store.Format,
dir_phi: store.Root,
timestamp: datetime.datetime = None,
comment: str = None,
batch_size: int = 1, # this default is never really used - overridden by command line args
):
Expand All @@ -28,7 +30,7 @@ def __init__(
self.dir_input = dir_input
self.format = store_format
self.dir_phi = dir_phi
self.timestamp = common.timestamp_filename()
self.timestamp = common.timestamp_filename(timestamp)
self.hostname = gethostname()
self.comment = comment or ''
self.batch_size = batch_size
Expand Down
71 changes: 71 additions & 0 deletions cumulus/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
ETL job context, holding some persistent state between runs.
"""

import datetime
from typing import Optional

from cumulus import common


class JobContext:
"""
Context for an ETL job.
This is not really settings or config in the sense of user-derived values.
A prime example is "last successful run time" which the next run might want to use as part of its work
(like to only extract the changed data since the last run) but is otherwise ephemeral information.
Another possible (but not yet implemented) use might be to store some past config values,
like the last used output format. To either give the user a chance to correct a mistake or to know
how to change from one format to another.
This is stored in the phi/build directory and is thus safe to store possible PHI.
"""

_LAST_SUCCESSFUL_DATETIME = 'last_successful_datetime'
_LAST_SUCCESSFUL_INPUT_DIR = 'last_successful_input_dir'
_LAST_SUCCESSFUL_OUTPUT_DIR = 'last_successful_output_dir'

def __init__(self, path: str):
"""
:param path: path to context file
"""
self._path: str = path
try:
self._data = common.read_json(path)
except (FileNotFoundError, PermissionError):
self._data = {}

@property
def last_successful_datetime(self) -> Optional[datetime.datetime]:
value = self._data.get(self._LAST_SUCCESSFUL_DATETIME)
if value is not None:
return datetime.datetime.fromisoformat(value)
return None

@last_successful_datetime.setter
def last_successful_datetime(self, value: datetime.datetime) -> None:
self._data[self._LAST_SUCCESSFUL_DATETIME] = value.isoformat()

@property
def last_successful_input_dir(self) -> Optional[str]:
return self._data.get(self._LAST_SUCCESSFUL_INPUT_DIR)

@last_successful_input_dir.setter
def last_successful_input_dir(self, value: str) -> None:
self._data[self._LAST_SUCCESSFUL_INPUT_DIR] = value

@property
def last_successful_output_dir(self) -> Optional[str]:
return self._data.get(self._LAST_SUCCESSFUL_OUTPUT_DIR)

@last_successful_output_dir.setter
def last_successful_output_dir(self, value: str) -> None:
self._data[self._LAST_SUCCESSFUL_OUTPUT_DIR] = value

def save(self) -> None:
common.write_json(self._path, self.as_json(), indent=4) # pretty-print this since it isn't large

def as_json(self) -> dict:
return dict(self._data)
21 changes: 16 additions & 5 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from fhirclient.models.patient import Patient
from fhirclient.models.resource import Resource

from cumulus import common, ctakes, deid, formats, loaders, store
from cumulus import common, context, ctakes, deid, formats, loaders, store
from cumulus.config import JobConfig, JobSummary

###############################################################################
Expand Down Expand Up @@ -361,6 +361,9 @@ def main(args: List[str]):
root_output = store.Root(args.dir_output, create=True)
root_phi = store.Root(args.dir_phi, create=True)

job_context = context.JobContext(root_phi.joinpath('context.json'))
job_datetime = common.datetime_now() # grab timestamp before we do anything

if args.input_format == 'i2b2':
config_loader = loaders.I2b2Loader(root_input)
else:
Expand All @@ -375,19 +378,27 @@ def main(args: List[str]):

deid_dir = load_and_deidentify(config_loader)

# Prepare config for jobs
config = JobConfig(config_loader, deid_dir.name, config_store, root_phi, comment=args.comment,
batch_size=args.batch_size)

batch_size=args.batch_size, timestamp=job_datetime)
common.write_json(config.path_config(), config.as_json(), indent=4)
common.print_header('Configuration:')
print(json.dumps(config.as_json(), indent=4))

common.write_json(config.path_config(), config.as_json(), indent=4)

# Finally, actually run the meat of the pipeline!
summaries = etl_job(config)

# Print results to the console
common.print_header('Results:')
for summary in summaries:
print(json.dumps(summary.as_json(), indent=4))

# Update job context for future runs
job_context.last_successful_datetime = job_datetime
job_context.last_successful_input_dir = root_input.path
job_context.last_successful_output_dir = root_output.path
job_context.save()


def main_cli():
main(sys.argv[1:])
Expand Down
49 changes: 49 additions & 0 deletions tests/test_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Tests for context.py"""

import datetime
import json
import tempfile
import unittest

from cumulus.context import JobContext


class TestJobContext(unittest.TestCase):
"""Test case for JobContext"""

def test_missing_file_context(self):
context = JobContext('nope')
self.assertEqual({}, context.as_json())

def test_save_and_load(self):
with tempfile.NamedTemporaryFile(mode='w+') as f:
json.dump({
'last_successful_input_dir': '/input/dir',
}, f)
f.flush()

context = JobContext(f.name)
self.assertEqual({
'last_successful_input_dir': '/input/dir',
}, context.as_json())

context.last_successful_datetime = datetime.datetime(2008, 5, 1, 14, 30, 30, tzinfo=datetime.timezone.utc)
self.assertEqual({
'last_successful_datetime': '2008-05-01T14:30:30+00:00',
'last_successful_input_dir': '/input/dir',
}, context.as_json())

context.save()
context2 = JobContext(f.name)
self.assertEqual(context.as_json(), context2.as_json())

def test_last_successful_props(self):
context = JobContext('nope')
context.last_successful_datetime = datetime.datetime(2008, 5, 1, 14, 30, 30, tzinfo=datetime.timezone.utc)
context.last_successful_input_dir = '/input'
context.last_successful_output_dir = '/output'
self.assertEqual({
'last_successful_datetime': '2008-05-01T14:30:30+00:00',
'last_successful_input_dir': '/input',
'last_successful_output_dir': '/output',
}, context.as_json())
34 changes: 33 additions & 1 deletion tests/test_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import s3fs
from fhirclient.models.extension import Extension

from cumulus import common, config, deid, etl, store
from cumulus import common, config, context, deid, etl, store
from cumulus.loaders.i2b2 import extract

from tests.ctakesmock import CtakesMixin, fake_ctakes_extract
Expand Down Expand Up @@ -191,6 +191,38 @@ def test_comment(self):
self.assertEqual(config_file['comment'], 'Run by foo on machine bar')


class TestI2b2EtlJobContext(BaseI2b2EtlSimple):
"""Test case for the job context data"""

def setUp(self):
super().setUp()
self.context_path = os.path.join(self.phi_path, 'context.json')

def test_context_updated_on_success(self):
"""Verify that we update the success timestamp etc. when the job succeeds"""
self.run_etl()
job_context = context.JobContext(self.context_path)
self.assertEqual('2021-09-14T21:23:45+00:00', job_context.last_successful_datetime.isoformat())
self.assertEqual(self.input_path, job_context.last_successful_input_dir)
self.assertEqual(self.output_path, job_context.last_successful_output_dir)

def test_context_not_updated_on_failure(self):
"""Verify that we don't update the success timestamp etc. when the job fails"""
input_context = {
'last_successful_datetime': '2000-01-01T10:10:10+00:00',
'last_successful_input': '/input',
'last_successful_output': '/output',
}
common.write_json(self.context_path, input_context)

with mock.patch('cumulus.etl.etl_job', side_effect=ZeroDivisionError):
with self.assertRaises(ZeroDivisionError):
self.run_etl()

# Confirm we didn't change anything
self.assertEqual(input_context, common.read_json(self.context_path))


class TestI2b2EtlBatches(BaseI2b2EtlSimple):
"""Test case for etl batching"""

Expand Down

0 comments on commit eab9670

Please sign in to comment.