diff --git a/.gitignore b/.gitignore index 50959b43..c5352919 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,8 @@ # Project specific -example-output/ -example-phi-build/ +/.idea/ +/example-output/ +/example-phi-build/ # Python gitignore https://github.com/github/gitignore/blob/main/Python.gitignore # Byte-compiled / optimized / DLL files diff --git a/cumulus/etl.py b/cumulus/etl.py index 88a0a95c..d2273ab5 100644 --- a/cumulus/etl.py +++ b/cumulus/etl.py @@ -364,7 +364,7 @@ def main(args: List[str]): parser.add_argument('dir_phi', metavar='/path/to/phi') parser.add_argument('--input-format', default='ndjson', choices=['i2b2', 'ndjson'], help='input format (default is ndjson)') - parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'], + parser.add_argument('--output-format', default='parquet', choices=['deltalake', 'json', 'ndjson', 'parquet'], help='output format (default is parquet)') parser.add_argument('--batch-size', type=int, metavar='SIZE', default=10000000, help='how many entries to process at once and thus ' @@ -401,7 +401,9 @@ def main(args: List[str]): else: config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks) - if args.output_format == 'json': + if args.output_format == 'deltalake': + config_store = formats.DeltaLakeFormat(root_output) + elif args.output_format == 'json': config_store = formats.JsonTreeFormat(root_output) elif args.output_format == 'parquet': config_store = formats.ParquetFormat(root_output) diff --git a/cumulus/formats/__init__.py b/cumulus/formats/__init__.py index 6d2ec483..298610d2 100644 --- a/cumulus/formats/__init__.py +++ b/cumulus/formats/__init__.py @@ -1,5 +1,6 @@ """Classes that know _how_ to write out results to the target folder""" +from .deltalake import DeltaLakeFormat from .json_tree import JsonTreeFormat from .ndjson import NdjsonFormat from .parquet import ParquetFormat diff --git a/cumulus/formats/athena.py b/cumulus/formats/athena.py index 0a151818..2c50ec84 100644 --- a/cumulus/formats/athena.py +++ b/cumulus/formats/athena.py @@ -18,6 +18,36 @@ class AthenaFormat(store.Format): (i.e. one folder per data type, broken into large files) """ + @abc.abstractmethod + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: + """Writes the whole dataframe to the output database folder""" + + def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None: + self.write_records(job, patients, 'patient', batch) + + def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None: + self.write_records(job, encounters, 'encounter', batch) + + def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None: + self.write_records(job, labs, 'observation', batch) + + def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None: + self.write_records(job, conditions, 'condition', batch) + + def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None: + self.write_records(job, docrefs, 'documentreference', batch) + + def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None: + self.write_records(job, observations, 'symptom', batch) + + +class AthenaBatchedFileFormat(AthenaFormat): + """ + Stores output files as batched individual files. + + i.e. a few ndjson files that hold all the rows + """ + @property @abc.abstractmethod def suffix(self) -> str: @@ -39,7 +69,7 @@ def write_format(self, df: pandas.DataFrame, path: str) -> None: # ########################################################################################## - def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> None: + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: """Writes the whole dataframe to a single file""" job.attempt += len(df) @@ -49,14 +79,14 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No # our files out. What we really want is some sort of blue/green deploy of data. There's no # satisfying fix while we are writing to the same folder. (Unless we do incremental/delta # writes and keep all old data around still.) - parent_dir = self.root.joinpath(os.path.dirname(path)) + parent_dir = self.root.joinpath(dbname) try: self.root.rm(parent_dir, recursive=True) except FileNotFoundError: pass try: - full_path = self.root.joinpath(f'{path}.{batch:03}.{self.suffix}') + full_path = self.root.joinpath(f'{dbname}/{dbname}.{batch:03}.{self.suffix}') self.root.makedirs(os.path.dirname(full_path)) self.write_format(df, full_path) @@ -64,21 +94,3 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No job.success_rate(1) except Exception: # pylint: disable=broad-except logging.exception('Could not process data records') - - def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None: - self._write_records(job, patients, 'patient/fhir_patients', batch) - - def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None: - self._write_records(job, encounters, 'encounter/fhir_encounters', batch) - - def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None: - self._write_records(job, labs, 'observation/fhir_observations', batch) - - def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None: - self._write_records(job, conditions, 'condition/fhir_conditions', batch) - - def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None: - self._write_records(job, docrefs, 'documentreference/fhir_documentreferences', batch) - - def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None: - self._write_records(job, observations, 'symptom/fhir_symptoms', batch) diff --git a/cumulus/formats/deltalake.py b/cumulus/formats/deltalake.py new file mode 100644 index 00000000..63227dd5 --- /dev/null +++ b/cumulus/formats/deltalake.py @@ -0,0 +1,111 @@ +""" +An implementation of Format that writes to a Delta Lake. + +See https://delta.io/ +""" + +import contextlib +import logging +import os + +import delta +import pandas +import pyspark +from pyspark.sql.utils import AnalysisException + +from cumulus import store + +from .athena import AthenaFormat + +# This class would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code +# uses and expects (in terms of filesystem writing). +# +# There is a 1st party Delta Lake implementation (`deltalake`) based off native Rust code and which talks to +# fsspec & pandas by default. But it is missing some critical features as of this writing (mostly merges): +# - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850 + + +@contextlib.contextmanager +def _suppress_output(): + """ + Totally hides stdout and stderr unless there is an error, and then stderr is printed. + + This is a more powerful version of contextlib.redirect_stdout that also works for subprocesses / threads. + """ + stdout = os.dup(1) + stderr = os.dup(2) + silent = os.open(os.devnull, os.O_WRONLY) + os.dup2(silent, 1) + os.dup2(silent, 2) + + try: + yield + finally: + os.dup2(stdout, 1) + os.dup2(stderr, 2) + + +class DeltaLakeFormat(AthenaFormat): + """ + Stores data in a delta lake. + """ + def __init__(self, root: store.Root): + super().__init__(root) + + # This _suppress_output call is because pyspark is SO NOISY during session creation. Like 40 lines of trivial + # output. Progress reports of downloading the jars. Comments about default logging level and the hostname. + # I could not find a way to set the log level before the session is created. So here we just suppress + # stdout/stderr entirely. + with _suppress_output(): + # Prep the builder with various config options + builder = pyspark.sql.SparkSession.builder \ + .appName('cumulus-etl') \ + .config('spark.driver.memory', '2g') \ + .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \ + .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') + + # Now add delta's packages and actually build the session + self.spark = delta.configure_spark_with_delta_pip(builder, extra_packages=[ + 'org.apache.hadoop:hadoop-aws:3.3.4', + ]).getOrCreate() + + self.spark.sparkContext.setLogLevel('ERROR') + self._configure_fs() + + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: + """Writes the whole dataframe to a delta lake""" + job.attempt += len(df) + full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a: scheme instead of s3: + + try: + updates = self.spark.createDataFrame(df) + + try: + table = delta.DeltaTable.forPath(self.spark, full_path) + if batch == 0: + table.vacuum() # Clean up unused data files older than retention policy (default 7 days) + table.alias('table') \ + .merge(source=updates.alias('updates'), condition='table.id = updates.id') \ + .whenMatchedUpdateAll() \ + .whenNotMatchedInsertAll() \ + .execute() + except AnalysisException: + # table does not exist yet, let's make an initial version + updates.write.save(path=full_path, format='delta') + + job.success += len(df) + job.success_rate(1) + except Exception: # pylint: disable=broad-except + logging.exception('Could not process data records') + + def _configure_fs(self): + """Tell spark/hadoop how to talk to S3 for us""" + fsspec_options = self.root.fsspec_options() + self.spark.conf.set('fs.s3a.sse.enabled', 'true') + self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS') + kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId') + if kms_key: + self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key) + region_name = fsspec_options.get('client_kwargs', {}).get('region_name') + if region_name: + self.spark.conf.set('fs.s3a.endpoint.region', region_name) diff --git a/cumulus/formats/ndjson.py b/cumulus/formats/ndjson.py index 21e3f7e6..77700834 100644 --- a/cumulus/formats/ndjson.py +++ b/cumulus/formats/ndjson.py @@ -2,10 +2,10 @@ import pandas -from .athena import AthenaFormat +from .athena import AthenaBatchedFileFormat -class NdjsonFormat(AthenaFormat): +class NdjsonFormat(AthenaBatchedFileFormat): """Stores output files in a few flat ndjson files""" @property diff --git a/cumulus/formats/parquet.py b/cumulus/formats/parquet.py index b11133de..018e1c7a 100644 --- a/cumulus/formats/parquet.py +++ b/cumulus/formats/parquet.py @@ -2,10 +2,10 @@ import pandas -from .athena import AthenaFormat +from .athena import AthenaBatchedFileFormat -class ParquetFormat(AthenaFormat): +class ParquetFormat(AthenaBatchedFileFormat): """Stores output files in a few flat parquet files""" @property diff --git a/pyproject.toml b/pyproject.toml index f69e47b5..4b7a11a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,6 +3,7 @@ name = "cumulus" requires-python = ">= 3.7" dependencies = [ "ctakesclient >= 1.3", + "delta-spark >= 2.1", # This branch includes some jwt fixes we need (and will hopefully be in mainline in future) "fhirclient @ git+https://github.com/mikix/client-py.git@mikix/oauth2-jwt", "jwcrypto", diff --git a/tests/data/simple/batched-ndjson-output/condition/fhir_conditions.000.ndjson b/tests/data/simple/batched-ndjson-output/condition/condition.000.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/condition/fhir_conditions.000.ndjson rename to tests/data/simple/batched-ndjson-output/condition/condition.000.ndjson diff --git a/tests/data/simple/batched-ndjson-output/condition/fhir_conditions.001.ndjson b/tests/data/simple/batched-ndjson-output/condition/condition.001.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/condition/fhir_conditions.001.ndjson rename to tests/data/simple/batched-ndjson-output/condition/condition.001.ndjson diff --git a/tests/data/simple/batched-ndjson-output/documentreference/fhir_documentreferences.000.ndjson b/tests/data/simple/batched-ndjson-output/documentreference/documentreference.000.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/documentreference/fhir_documentreferences.000.ndjson rename to tests/data/simple/batched-ndjson-output/documentreference/documentreference.000.ndjson diff --git a/tests/data/simple/batched-ndjson-output/documentreference/fhir_documentreferences.001.ndjson b/tests/data/simple/batched-ndjson-output/documentreference/documentreference.001.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/documentreference/fhir_documentreferences.001.ndjson rename to tests/data/simple/batched-ndjson-output/documentreference/documentreference.001.ndjson diff --git a/tests/data/simple/batched-ndjson-output/encounter/fhir_encounters.000.ndjson b/tests/data/simple/batched-ndjson-output/encounter/encounter.000.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/encounter/fhir_encounters.000.ndjson rename to tests/data/simple/batched-ndjson-output/encounter/encounter.000.ndjson diff --git a/tests/data/simple/batched-ndjson-output/encounter/fhir_encounters.001.ndjson b/tests/data/simple/batched-ndjson-output/encounter/encounter.001.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/encounter/fhir_encounters.001.ndjson rename to tests/data/simple/batched-ndjson-output/encounter/encounter.001.ndjson diff --git a/tests/data/simple/batched-ndjson-output/observation/fhir_observations.000.ndjson b/tests/data/simple/batched-ndjson-output/observation/observation.000.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/observation/fhir_observations.000.ndjson rename to tests/data/simple/batched-ndjson-output/observation/observation.000.ndjson diff --git a/tests/data/simple/batched-ndjson-output/observation/fhir_observations.001.ndjson b/tests/data/simple/batched-ndjson-output/observation/observation.001.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/observation/fhir_observations.001.ndjson rename to tests/data/simple/batched-ndjson-output/observation/observation.001.ndjson diff --git a/tests/data/simple/batched-ndjson-output/patient/fhir_patients.000.ndjson b/tests/data/simple/batched-ndjson-output/patient/patient.000.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/patient/fhir_patients.000.ndjson rename to tests/data/simple/batched-ndjson-output/patient/patient.000.ndjson diff --git a/tests/data/simple/batched-ndjson-output/patient/fhir_patients.001.ndjson b/tests/data/simple/batched-ndjson-output/patient/patient.001.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/patient/fhir_patients.001.ndjson rename to tests/data/simple/batched-ndjson-output/patient/patient.001.ndjson diff --git a/tests/data/simple/batched-ndjson-output/symptom/fhir_symptoms.000.ndjson b/tests/data/simple/batched-ndjson-output/symptom/symptom.000.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/symptom/fhir_symptoms.000.ndjson rename to tests/data/simple/batched-ndjson-output/symptom/symptom.000.ndjson diff --git a/tests/data/simple/batched-ndjson-output/symptom/fhir_symptoms.001.ndjson b/tests/data/simple/batched-ndjson-output/symptom/symptom.001.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/symptom/fhir_symptoms.001.ndjson rename to tests/data/simple/batched-ndjson-output/symptom/symptom.001.ndjson diff --git a/tests/data/simple/batched-ndjson-output/symptom/fhir_symptoms.002.ndjson b/tests/data/simple/batched-ndjson-output/symptom/symptom.002.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/symptom/fhir_symptoms.002.ndjson rename to tests/data/simple/batched-ndjson-output/symptom/symptom.002.ndjson diff --git a/tests/data/simple/batched-ndjson-output/symptom/fhir_symptoms.003.ndjson b/tests/data/simple/batched-ndjson-output/symptom/symptom.003.ndjson similarity index 100% rename from tests/data/simple/batched-ndjson-output/symptom/fhir_symptoms.003.ndjson rename to tests/data/simple/batched-ndjson-output/symptom/symptom.003.ndjson diff --git a/tests/data/simple/ndjson-output/condition/fhir_conditions.000.ndjson b/tests/data/simple/ndjson-output/condition/condition.000.ndjson similarity index 100% rename from tests/data/simple/ndjson-output/condition/fhir_conditions.000.ndjson rename to tests/data/simple/ndjson-output/condition/condition.000.ndjson diff --git a/tests/data/simple/ndjson-output/documentreference/fhir_documentreferences.000.ndjson b/tests/data/simple/ndjson-output/documentreference/documentreference.000.ndjson similarity index 100% rename from tests/data/simple/ndjson-output/documentreference/fhir_documentreferences.000.ndjson rename to tests/data/simple/ndjson-output/documentreference/documentreference.000.ndjson diff --git a/tests/data/simple/ndjson-output/encounter/fhir_encounters.000.ndjson b/tests/data/simple/ndjson-output/encounter/encounter.000.ndjson similarity index 100% rename from tests/data/simple/ndjson-output/encounter/fhir_encounters.000.ndjson rename to tests/data/simple/ndjson-output/encounter/encounter.000.ndjson diff --git a/tests/data/simple/ndjson-output/observation/fhir_observations.000.ndjson b/tests/data/simple/ndjson-output/observation/observation.000.ndjson similarity index 100% rename from tests/data/simple/ndjson-output/observation/fhir_observations.000.ndjson rename to tests/data/simple/ndjson-output/observation/observation.000.ndjson diff --git a/tests/data/simple/ndjson-output/patient/fhir_patients.000.ndjson b/tests/data/simple/ndjson-output/patient/patient.000.ndjson similarity index 100% rename from tests/data/simple/ndjson-output/patient/fhir_patients.000.ndjson rename to tests/data/simple/ndjson-output/patient/patient.000.ndjson diff --git a/tests/data/simple/ndjson-output/symptom/fhir_symptoms.000.ndjson b/tests/data/simple/ndjson-output/symptom/symptom.000.ndjson similarity index 100% rename from tests/data/simple/ndjson-output/symptom/fhir_symptoms.000.ndjson rename to tests/data/simple/ndjson-output/symptom/symptom.000.ndjson diff --git a/tests/test_deltalake.py b/tests/test_deltalake.py new file mode 100644 index 00000000..49c73092 --- /dev/null +++ b/tests/test_deltalake.py @@ -0,0 +1,76 @@ +"""Tests for Delta Lake support""" + +import os +import shutil +import tempfile +import unittest + +import pandas +from pyspark.sql.utils import AnalysisException +from cumulus import config, formats, store + + +class TestDeltaLake(unittest.TestCase): + """ + Test case for the Delta Lake format writer. + + i.e. tests for deltalake.py + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + output_tempdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with + cls.output_tempdir = output_tempdir + cls.output_dir = output_tempdir.name + + # It is expensive to create a DeltaLakeFormat instance because of all the pyspark jar downloading etc. + # So we only do it once per class suite. (And erase all folder contents per-test) + cls.deltalake = formats.DeltaLakeFormat(store.Root(output_tempdir.name)) + + def setUp(self): + super().setUp() + shutil.rmtree(self.output_dir, ignore_errors=True) + self.job = config.JobSummary() + + @staticmethod + def df(**kwargs) -> pandas.DataFrame: + """ + Creates a dummy DataFrame with ids & values equal to each kwarg provided. + """ + rows = [{'id': k, 'value': v} for k, v in kwargs.items()] + return pandas.DataFrame(rows) + + def store(self, df: pandas.DataFrame, batch: int = 10) -> None: + """ + Writes a single batch of data to the data lake. + + :param df: the data to insert + :param batch: which batch number this is, defaulting to 10 to avoid triggering any first/last batch logic + """ + self.deltalake.store_patients(self.job, df, batch) + + def assert_lake_equal(self, df: pandas.DataFrame, when: int = None) -> None: + table_path = os.path.join(self.output_dir, 'patient') + + reader = self.deltalake.spark.read + if when is not None: + reader = reader.option('versionAsOf', when) + + table_df = reader.format('delta').load(table_path).sort('id').toPandas() + self.assertDictEqual(df.to_dict(), table_df.to_dict()) + + def test_creates_if_empty(self): + """Verify that the lake is created when empty""" + # sanity check that it doesn't exist yet + with self.assertRaises(AnalysisException): + self.assert_lake_equal(self.df()) + + self.store(self.df(a=1)) + self.assert_lake_equal(self.df(a=1)) + + def test_upsert(self): + """Verify that we can update and insert data""" + self.store(self.df(a=1, b=2)) + self.store(self.df(b=20, c=3)) + self.assert_lake_equal(self.df(a=1, b=20, c=3)) diff --git a/tests/test_etl.py b/tests/test_etl.py index ce0c9a12..fcbf8856 100644 --- a/tests/test_etl.py +++ b/tests/test_etl.py @@ -217,7 +217,7 @@ def fake_load_all(internal_self, resources): # Confirm we only wrote the one resource self.assertEqual({'observation', 'JobConfig'}, set(os.listdir(self.output_path))) - self.assertEqual(['fhir_observations.000.ndjson'], os.listdir(os.path.join(self.output_path, 'observation'))) + self.assertEqual(['observation.000.ndjson'], os.listdir(os.path.join(self.output_path, 'observation'))) def test_multiple_tasks(self): # Grab all observations before we mock anything @@ -234,8 +234,8 @@ def fake_load_all(internal_self, resources): # Confirm we only wrote the one resource self.assertEqual({'observation', 'patient', 'JobConfig'}, set(os.listdir(self.output_path))) - self.assertEqual(['fhir_observations.000.ndjson'], os.listdir(os.path.join(self.output_path, 'observation'))) - self.assertEqual(['fhir_patients.000.ndjson'], os.listdir(os.path.join(self.output_path, 'patient'))) + self.assertEqual(['observation.000.ndjson'], os.listdir(os.path.join(self.output_path, 'observation'))) + self.assertEqual(['patient.000.ndjson'], os.listdir(os.path.join(self.output_path, 'patient'))) class TestI2b2EtlJobConfig(BaseI2b2EtlSimple): @@ -388,12 +388,12 @@ def test_etl_job_parquet(self): self.assertEqual( { - 'condition/fhir_conditions.000.parquet', - 'documentreference/fhir_documentreferences.000.parquet', - 'encounter/fhir_encounters.000.parquet', - 'observation/fhir_observations.000.parquet', - 'patient/fhir_patients.000.parquet', - 'symptom/fhir_symptoms.000.parquet', + 'condition/condition.000.parquet', + 'documentreference/documentreference.000.parquet', + 'encounter/encounter.000.parquet', + 'observation/observation.000.parquet', + 'patient/patient.000.parquet', + 'symptom/symptom.000.parquet', }, set(all_files)) @@ -408,12 +408,12 @@ def test_etl_job_s3(self): all_files = {x for x in fs.find('mockbucket/root') if '/JobConfig/' not in x} self.assertEqual({ - 'mockbucket/root/condition/fhir_conditions.000.ndjson', - 'mockbucket/root/documentreference/fhir_documentreferences.000.ndjson', - 'mockbucket/root/encounter/fhir_encounters.000.ndjson', - 'mockbucket/root/observation/fhir_observations.000.ndjson', - 'mockbucket/root/patient/fhir_patients.000.ndjson', - 'mockbucket/root/symptom/fhir_symptoms.000.ndjson', + 'mockbucket/root/condition/condition.000.ndjson', + 'mockbucket/root/documentreference/documentreference.000.ndjson', + 'mockbucket/root/encounter/encounter.000.ndjson', + 'mockbucket/root/observation/observation.000.ndjson', + 'mockbucket/root/patient/patient.000.ndjson', + 'mockbucket/root/symptom/symptom.000.ndjson', }, all_files) # Confirm we did not accidentally create an 's3:' directory locally @@ -478,7 +478,7 @@ def test_does_not_hit_server_if_cache_exists(self): self.assertEqual(0, self.nlp_mock.call_count) # And we should see our fake cached results in the output - with open(os.path.join(self.output_path, 'symptom', 'fhir_symptoms.000.ndjson'), 'r', encoding='utf8') as f: + with open(os.path.join(self.output_path, 'symptom', 'symptom.000.ndjson'), 'r', encoding='utf8') as f: lines = f.readlines() symptoms = [json.loads(line) for line in lines] self.assertEqual(2, len(symptoms))