Skip to content

Commit

Permalink
feat: add upsert lake support
Browse files Browse the repository at this point in the history
Write to a ACID upsert data lake location by using
--output-format=delta (which is now also the default instead of
parquet, though note that the files are still stored in parquet
format inside a data lake).
  • Loading branch information
mikix committed Nov 28, 2022
1 parent 9831604 commit 9362c82
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 28 deletions.
8 changes: 5 additions & 3 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ def main(args: List[str]):
parser.add_argument('dir_phi', metavar='/path/to/phi')
parser.add_argument('--input-format', default='ndjson', choices=['i2b2', 'ndjson'],
help='input format (default is ndjson)')
parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'],
help='output format (default is parquet)')
parser.add_argument('--output-format', default='delta', choices=['delta', 'json', 'ndjson', 'parquet'],
help='output format (default is delta)')
parser.add_argument('--batch-size', type=int, metavar='SIZE', default=10000000,
help='how many entries to process at once and thus '
'how many to put in one output file (default is 10M)')
Expand Down Expand Up @@ -366,7 +366,9 @@ def main(args: List[str]):
else:
config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks)

if args.output_format == 'json':
if args.output_format == 'delta':
config_store = formats.DeltaLakeFormat(root_output)
elif args.output_format == 'json':
config_store = formats.JsonTreeFormat(root_output)
elif args.output_format == 'parquet':
config_store = formats.ParquetFormat(root_output)
Expand Down
1 change: 1 addition & 0 deletions cumulus/formats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Classes that know _how_ to write out results to the target folder"""

from .deltalake import DeltaLakeFormat
from .json_tree import JsonTreeFormat
from .ndjson import NdjsonFormat
from .parquet import ParquetFormat
54 changes: 33 additions & 21 deletions cumulus/formats/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,36 @@ class AthenaFormat(store.Format):
(i.e. one folder per data type, broken into large files)
"""

@abc.abstractmethod
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to the output database folder"""

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self.write_records(job, patients, 'patient', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self.write_records(job, encounters, 'encounter', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, labs, 'observation', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self.write_records(job, conditions, 'condition', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, docrefs, 'documentreference', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self.write_records(job, observations, 'symptom', batch)


class AthenaBatchedFileFormat(AthenaFormat):
"""
Stores output files as batched individual files.
i.e. a few ndjson files that hold all the rows
"""

@property
@abc.abstractmethod
def suffix(self) -> str:
Expand All @@ -39,7 +69,7 @@ def write_format(self, df: pandas.DataFrame, path: str) -> None:
#
##########################################################################################

def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> None:
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a single file"""
job.attempt += len(df)

Expand All @@ -49,36 +79,18 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No
# our files out. What we really want is some sort of blue/green deploy of data. There's no
# satisfying fix while we are writing to the same folder. (Unless we do incremental/delta
# writes and keep all old data around still.)
parent_dir = self.root.joinpath(os.path.dirname(path))
parent_dir = self.root.joinpath(dbname)
try:
self.root.rm(parent_dir, recursive=True)
except FileNotFoundError:
pass

try:
full_path = self.root.joinpath(f'{path}.{batch:03}.{self.suffix}')
full_path = self.root.joinpath(f'{dbname}/{dbname}.{batch:03}.{self.suffix}')
self.root.makedirs(os.path.dirname(full_path))
self.write_format(df, full_path)

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self._write_records(job, patients, 'patient/fhir_patients', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self._write_records(job, encounters, 'encounter/fhir_encounters', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, labs, 'observation/fhir_observations', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self._write_records(job, conditions, 'condition/fhir_conditions', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, docrefs, 'documentreference/fhir_documentreferences', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self._write_records(job, observations, 'symptom/fhir_symptoms', batch)
83 changes: 83 additions & 0 deletions cumulus/formats/deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
An implementation of Format that writes to a Delta Lake.
See https://delta.io/
"""

import logging

import delta
import pandas
import pyspark
from pyspark.sql.utils import AnalysisException

from cumulus import store

from .athena import AthenaFormat


# This class would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code
# uses and expects (in terms of filesystem writing).
# Spark uses hadoop and both are written in Java, so we need to write some glue code and also convert S3 args.
# There is a different Delta Lake implementation (deltalake on pypi) based off native Rust code and which talks to
# Pandas & fsspec by default. But it is missing some critical features as of this writing (like merges):
# - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850

class DeltaLakeFormat(AthenaFormat):
"""
Stores data in a delta lake.
"""
def __init__(self, root: store.Root):
super().__init__(root)
builder = pyspark.sql.SparkSession.builder \
.appName('cumulus-etl') \
.config('spark.driver.memory', '2g') \
.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
.config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
self.spark = delta.configure_spark_with_delta_pip(builder).getOrCreate()
self._configure_fs()

def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a delta lake"""
job.attempt += len(df)
full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a scheme (same as s3)

try:
updates = self.spark.createDataFrame(df)

try:
table = delta.DeltaTable.forPath(self.spark, full_path)
# if batch == 0:
# table.vacuum()
# TODO: why does this keep deleting and recreating a single row...?
table.alias('table') \
.merge(source=updates.alias('updates'), condition='table.id = updates.id') \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
except AnalysisException:
# table does not exist yet, let's make an initial version
updates.write.format('delta').save(full_path)
table = delta.DeltaTable.forPath(self.spark, full_path)

# Now generate a manifest file for Athena's benefit.
# Otherwise, Athena would scan all parquet files, even ones holding removed data.
# https://docs.delta.io/latest/delta-utility.html#generate-a-manifest-file
table.generate('symlink_format_manifest')

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def _configure_fs(self):
"""Tell spark/hadoop how to talk to S3 for us"""
fsspec_options = self.root.fsspec_options()
self.spark.conf.set('fs.s3a.sse.enabled', 'true')
self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS')
kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId')
if kms_key:
self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key)
region_name = fsspec_options.get('client_kwargs', {}).get('region_name')
if region_name:
self.spark.conf.set('fs.s3a.endpoint.region', region_name)
4 changes: 2 additions & 2 deletions cumulus/formats/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class NdjsonFormat(AthenaFormat):
class NdjsonFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat ndjson files"""

@property
Expand Down
4 changes: 2 additions & 2 deletions cumulus/formats/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class ParquetFormat(AthenaFormat):
class ParquetFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat parquet files"""

@property
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name = "cumulus"
requires-python = ">= 3.7"
dependencies = [
"ctakesclient >= 1.3",
"delta-spark >= 2.1",
# This branch includes some jwt fixes we need (and will hopefully be in mainline in future)
"fhirclient @ git+https://github.com/mikix/client-py.git@mikix/oauth2-jwt",
"jwcrypto",
Expand Down

0 comments on commit 9362c82

Please sign in to comment.