diff --git a/cumulus/etl.py b/cumulus/etl.py index 29bca3d1..ba5966ba 100644 --- a/cumulus/etl.py +++ b/cumulus/etl.py @@ -333,8 +333,8 @@ def main(args: List[str]): parser.add_argument('dir_phi', metavar='/path/to/phi') parser.add_argument('--input-format', default='ndjson', choices=['i2b2', 'ndjson'], help='input format (default is ndjson)') - parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'], - help='output format (default is parquet)') + parser.add_argument('--output-format', default='iceberg', choices=['delta', 'iceberg', 'json', 'ndjson', 'parquet'], + help='output format (default is iceberg)') parser.add_argument('--batch-size', type=int, metavar='SIZE', default=10000000, help='how many entries to process at once and thus ' 'how many to put in one output file (default is 10M)') @@ -366,7 +366,11 @@ def main(args: List[str]): else: config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks) - if args.output_format == 'json': + if args.output_format == 'delta': + config_store = formats.DeltaLakeFormat(root_output) + elif args.output_format == 'iceberg': + config_store = formats.IcebergFormat(root_output) + elif args.output_format == 'json': config_store = formats.JsonTreeFormat(root_output) elif args.output_format == 'parquet': config_store = formats.ParquetFormat(root_output) diff --git a/cumulus/formats/__init__.py b/cumulus/formats/__init__.py index 6d2ec483..13b252bb 100644 --- a/cumulus/formats/__init__.py +++ b/cumulus/formats/__init__.py @@ -1,5 +1,6 @@ """Classes that know _how_ to write out results to the target folder""" +from .deltalake import DeltaLakeFormat, IcebergFormat from .json_tree import JsonTreeFormat from .ndjson import NdjsonFormat from .parquet import ParquetFormat diff --git a/cumulus/formats/athena.py b/cumulus/formats/athena.py index 0a151818..2c50ec84 100644 --- a/cumulus/formats/athena.py +++ b/cumulus/formats/athena.py @@ -18,6 +18,36 @@ class AthenaFormat(store.Format): (i.e. one folder per data type, broken into large files) """ + @abc.abstractmethod + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: + """Writes the whole dataframe to the output database folder""" + + def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None: + self.write_records(job, patients, 'patient', batch) + + def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None: + self.write_records(job, encounters, 'encounter', batch) + + def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None: + self.write_records(job, labs, 'observation', batch) + + def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None: + self.write_records(job, conditions, 'condition', batch) + + def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None: + self.write_records(job, docrefs, 'documentreference', batch) + + def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None: + self.write_records(job, observations, 'symptom', batch) + + +class AthenaBatchedFileFormat(AthenaFormat): + """ + Stores output files as batched individual files. + + i.e. a few ndjson files that hold all the rows + """ + @property @abc.abstractmethod def suffix(self) -> str: @@ -39,7 +69,7 @@ def write_format(self, df: pandas.DataFrame, path: str) -> None: # ########################################################################################## - def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> None: + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: """Writes the whole dataframe to a single file""" job.attempt += len(df) @@ -49,14 +79,14 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No # our files out. What we really want is some sort of blue/green deploy of data. There's no # satisfying fix while we are writing to the same folder. (Unless we do incremental/delta # writes and keep all old data around still.) - parent_dir = self.root.joinpath(os.path.dirname(path)) + parent_dir = self.root.joinpath(dbname) try: self.root.rm(parent_dir, recursive=True) except FileNotFoundError: pass try: - full_path = self.root.joinpath(f'{path}.{batch:03}.{self.suffix}') + full_path = self.root.joinpath(f'{dbname}/{dbname}.{batch:03}.{self.suffix}') self.root.makedirs(os.path.dirname(full_path)) self.write_format(df, full_path) @@ -64,21 +94,3 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No job.success_rate(1) except Exception: # pylint: disable=broad-except logging.exception('Could not process data records') - - def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None: - self._write_records(job, patients, 'patient/fhir_patients', batch) - - def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None: - self._write_records(job, encounters, 'encounter/fhir_encounters', batch) - - def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None: - self._write_records(job, labs, 'observation/fhir_observations', batch) - - def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None: - self._write_records(job, conditions, 'condition/fhir_conditions', batch) - - def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None: - self._write_records(job, docrefs, 'documentreference/fhir_documentreferences', batch) - - def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None: - self._write_records(job, observations, 'symptom/fhir_symptoms', batch) diff --git a/cumulus/formats/deltalake.py b/cumulus/formats/deltalake.py new file mode 100644 index 00000000..bfb08804 --- /dev/null +++ b/cumulus/formats/deltalake.py @@ -0,0 +1,133 @@ +""" +An implementation of Format that writes to a Delta Lake. + +See https://delta.io/ +""" + +import logging + +import delta +import pandas +import pyspark +from pyspark.sql.utils import AnalysisException + +from cumulus import store + +from .athena import AthenaFormat + + +# This class would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code +# uses and expects (in terms of filesystem writing). +# Spark uses hadoop and both are written in Java, so we need to write some glue code and also convert S3 args. +# There is a different Delta Lake implementation (deltalake on pypi) based off native Rust code and which talks to +# Pandas & fsspec by default. But it is missing some critical features as of this writing (like merges): +# - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850 + +class IcebergFormat(AthenaFormat): + """ + Stores data in an iceberg data lake. + """ + def __init__(self, root: store.Root): + super().__init__(root) + # Package link: https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-extensions-3.3 + self.spark = pyspark.sql.SparkSession.builder \ + .appName('cumulus-etl') \ + .config('spark.driver.memory', '2g') \ + .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-3.3_2.12:1.0.0,' + 'org.apache.iceberg:iceberg-spark-extensions-3.3_2.12:1.0.0') \ + .config('spark.sql.catalog.cumulus', 'org.apache.iceberg.spark.SparkCatalog') \ + .config('spark.sql.catalog.cumulus.type', 'hadoop') \ + .config('spark.sql.catalog.cumulus.warehouse', root.path) \ + .config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog') \ + .config('spark.sql.defaultCatalog', 'cumulus') \ + .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \ + .getOrCreate() + #self._configure_fs() + + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: + """Writes the whole dataframe to a delta lake""" + job.attempt += len(df) + full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a: scheme instead of s3: + + try: + updates = self.spark.createDataFrame(df) + updates.createOrReplaceTempView('updates') + + try: + #_table = self.spark.table(dbname).alias('table') + self.spark.sql( + f'MERGE INTO {dbname} table ' + 'USING updates ' + 'ON table.id = updates.id ' + 'WHEN MATCHED THEN UPDATE SET * ' + 'WHEN NOT MATCHED THEN INSERT * ' + ) + except AnalysisException as exc: + print(exc) + # table does not exist yet, let's make an initial version + updates.writeTo(dbname).create() + + job.success += len(df) + job.success_rate(1) + except Exception: # pylint: disable=broad-except + logging.exception('Could not process data records') + + +class DeltaLakeFormat(AthenaFormat): + """ + Stores data in a delta lake. + """ + def __init__(self, root: store.Root): + super().__init__(root) + builder = pyspark.sql.SparkSession.builder \ + .appName('cumulus-etl') \ + .config('spark.driver.memory', '2g') \ + .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \ + .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') + self.spark = delta.configure_spark_with_delta_pip(builder).getOrCreate() + self._configure_fs() + + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: + """Writes the whole dataframe to a delta lake""" + job.attempt += len(df) + full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a scheme (same as s3) + + try: + updates = self.spark.createDataFrame(df) + + try: + table = delta.DeltaTable.forPath(self.spark, full_path) + # if batch == 0: + # table.vacuum() + # TODO: why does this keep deleting and recreating a single row...? + table.alias('table') \ + .merge(source=updates.alias('updates'), condition='table.id = updates.id') \ + .whenMatchedUpdateAll() \ + .whenNotMatchedInsertAll() \ + .execute() + except AnalysisException: + # table does not exist yet, let's make an initial version + updates.write.format('delta').save(full_path) + table = delta.DeltaTable.forPath(self.spark, full_path) + + # Now generate a manifest file for Athena's benefit. + # Otherwise, Athena would scan all parquet files, even ones holding removed data. + # https://docs.delta.io/latest/delta-utility.html#generate-a-manifest-file + table.generate('symlink_format_manifest') + + job.success += len(df) + job.success_rate(1) + except Exception: # pylint: disable=broad-except + logging.exception('Could not process data records') + + def _configure_fs(self): + """Tell spark/hadoop how to talk to S3 for us""" + fsspec_options = self.root.fsspec_options() + self.spark.conf.set('fs.s3a.sse.enabled', 'true') + self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS') + kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId') + if kms_key: + self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key) + region_name = fsspec_options.get('client_kwargs', {}).get('region_name') + if region_name: + self.spark.conf.set('fs.s3a.endpoint.region', region_name) diff --git a/cumulus/formats/ndjson.py b/cumulus/formats/ndjson.py index 21e3f7e6..77700834 100644 --- a/cumulus/formats/ndjson.py +++ b/cumulus/formats/ndjson.py @@ -2,10 +2,10 @@ import pandas -from .athena import AthenaFormat +from .athena import AthenaBatchedFileFormat -class NdjsonFormat(AthenaFormat): +class NdjsonFormat(AthenaBatchedFileFormat): """Stores output files in a few flat ndjson files""" @property diff --git a/cumulus/formats/parquet.py b/cumulus/formats/parquet.py index b11133de..018e1c7a 100644 --- a/cumulus/formats/parquet.py +++ b/cumulus/formats/parquet.py @@ -2,10 +2,10 @@ import pandas -from .athena import AthenaFormat +from .athena import AthenaBatchedFileFormat -class ParquetFormat(AthenaFormat): +class ParquetFormat(AthenaBatchedFileFormat): """Stores output files in a few flat parquet files""" @property diff --git a/pyproject.toml b/pyproject.toml index 2cfac23e..24f6240e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,11 +3,13 @@ name = "cumulus" requires-python = ">= 3.7" dependencies = [ "ctakesclient >= 1.3", + "delta-spark >= 2.1", # This branch includes some jwt fixes we need (and will hopefully be in mainline in future) "fhirclient @ git+https://github.com/mikix/client-py.git@mikix/oauth2-jwt", "jwcrypto", "pandas", "pyarrow", + "pyiceberg[glue] @ git+https://github.com/apache/iceberg.git#subdirectory=python", "s3fs", ] authors = [