Skip to content

Commit

Permalink
feat: add Delta Lake support
Browse files Browse the repository at this point in the history
Pass --output-format=deltalake to enable. And make sure your AWS glue
is set up to support it.

This is not enabled by default (yet) and this commit does not yet
do anything clever with incremental bulk exports. But this is a
beginning to build upon.
  • Loading branch information
mikix committed Dec 21, 2022
1 parent e000c23 commit 58e1291
Show file tree
Hide file tree
Showing 30 changed files with 249 additions and 45 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Project specific

example-output/
example-phi-build/
/.idea/
/example-output/
/example-phi-build/

# Python gitignore https://github.com/github/gitignore/blob/main/Python.gitignore
# Byte-compiled / optimized / DLL files
Expand Down
6 changes: 4 additions & 2 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def main(args: List[str]):
parser.add_argument('dir_phi', metavar='/path/to/phi')
parser.add_argument('--input-format', default='ndjson', choices=['i2b2', 'ndjson'],
help='input format (default is ndjson)')
parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'],
parser.add_argument('--output-format', default='parquet', choices=['deltalake', 'json', 'ndjson', 'parquet'],
help='output format (default is parquet)')
parser.add_argument('--batch-size', type=int, metavar='SIZE', default=10000000,
help='how many entries to process at once and thus '
Expand Down Expand Up @@ -401,7 +401,9 @@ def main(args: List[str]):
else:
config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks)

if args.output_format == 'json':
if args.output_format == 'deltalake':
config_store = formats.DeltaLakeFormat(root_output)
elif args.output_format == 'json':
config_store = formats.JsonTreeFormat(root_output)
elif args.output_format == 'parquet':
config_store = formats.ParquetFormat(root_output)
Expand Down
1 change: 1 addition & 0 deletions cumulus/formats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Classes that know _how_ to write out results to the target folder"""

from .deltalake import DeltaLakeFormat
from .json_tree import JsonTreeFormat
from .ndjson import NdjsonFormat
from .parquet import ParquetFormat
54 changes: 33 additions & 21 deletions cumulus/formats/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,36 @@ class AthenaFormat(store.Format):
(i.e. one folder per data type, broken into large files)
"""

@abc.abstractmethod
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to the output database folder"""

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self.write_records(job, patients, 'patient', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self.write_records(job, encounters, 'encounter', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, labs, 'observation', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self.write_records(job, conditions, 'condition', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, docrefs, 'documentreference', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self.write_records(job, observations, 'symptom', batch)


class AthenaBatchedFileFormat(AthenaFormat):
"""
Stores output files as batched individual files.
i.e. a few ndjson files that hold all the rows
"""

@property
@abc.abstractmethod
def suffix(self) -> str:
Expand All @@ -39,7 +69,7 @@ def write_format(self, df: pandas.DataFrame, path: str) -> None:
#
##########################################################################################

def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> None:
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a single file"""
job.attempt += len(df)

Expand All @@ -49,36 +79,18 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No
# our files out. What we really want is some sort of blue/green deploy of data. There's no
# satisfying fix while we are writing to the same folder. (Unless we do incremental/delta
# writes and keep all old data around still.)
parent_dir = self.root.joinpath(os.path.dirname(path))
parent_dir = self.root.joinpath(dbname)
try:
self.root.rm(parent_dir, recursive=True)
except FileNotFoundError:
pass

try:
full_path = self.root.joinpath(f'{path}.{batch:03}.{self.suffix}')
full_path = self.root.joinpath(f'{dbname}/{dbname}.{batch:03}.{self.suffix}')
self.root.makedirs(os.path.dirname(full_path))
self.write_format(df, full_path)

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self._write_records(job, patients, 'patient/fhir_patients', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self._write_records(job, encounters, 'encounter/fhir_encounters', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, labs, 'observation/fhir_observations', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self._write_records(job, conditions, 'condition/fhir_conditions', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, docrefs, 'documentreference/fhir_documentreferences', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self._write_records(job, observations, 'symptom/fhir_symptoms', batch)
111 changes: 111 additions & 0 deletions cumulus/formats/deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
An implementation of Format that writes to a Delta Lake.
See https://delta.io/
"""

import contextlib
import logging
import os

import delta
import pandas
import pyspark
from pyspark.sql.utils import AnalysisException

from cumulus import store

from .athena import AthenaFormat

# This class would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code
# uses and expects (in terms of filesystem writing).
#
# There is a 1st party Delta Lake implementation (`deltalake`) based off native Rust code and which talks to
# fsspec & pandas by default. But it is missing some critical features as of this writing (mostly merges):
# - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850


@contextlib.contextmanager
def _suppress_output():
"""
Totally hides stdout and stderr unless there is an error, and then stderr is printed.
This is a more powerful version of contextlib.redirect_stdout that also works for subprocesses / threads.
"""
stdout = os.dup(1)
stderr = os.dup(2)
silent = os.open(os.devnull, os.O_WRONLY)
os.dup2(silent, 1)
os.dup2(silent, 2)

try:
yield
finally:
os.dup2(stdout, 1)
os.dup2(stderr, 2)


class DeltaLakeFormat(AthenaFormat):
"""
Stores data in a delta lake.
"""
def __init__(self, root: store.Root):
super().__init__(root)

# This _suppress_output call is because pyspark is SO NOISY during session creation. Like 40 lines of trivial
# output. Progress reports of downloading the jars. Comments about default logging level and the hostname.
# I could not find a way to set the log level before the session is created. So here we just suppress
# stdout/stderr entirely.
with _suppress_output():
# Prep the builder with various config options
builder = pyspark.sql.SparkSession.builder \
.appName('cumulus-etl') \
.config('spark.driver.memory', '2g') \
.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
.config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')

# Now add delta's packages and actually build the session
self.spark = delta.configure_spark_with_delta_pip(builder, extra_packages=[
'org.apache.hadoop:hadoop-aws:3.3.4',
]).getOrCreate()

self.spark.sparkContext.setLogLevel('ERROR')
self._configure_fs()

def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a delta lake"""
job.attempt += len(df)
full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a: scheme instead of s3:

try:
updates = self.spark.createDataFrame(df)

try:
table = delta.DeltaTable.forPath(self.spark, full_path)
if batch == 0:
table.vacuum() # Clean up unused data files older than retention policy (default 7 days)
table.alias('table') \
.merge(source=updates.alias('updates'), condition='table.id = updates.id') \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
except AnalysisException:
# table does not exist yet, let's make an initial version
updates.write.save(path=full_path, format='delta')

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def _configure_fs(self):
"""Tell spark/hadoop how to talk to S3 for us"""
fsspec_options = self.root.fsspec_options()
self.spark.conf.set('fs.s3a.sse.enabled', 'true')
self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS')
kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId')
if kms_key:
self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key)
region_name = fsspec_options.get('client_kwargs', {}).get('region_name')
if region_name:
self.spark.conf.set('fs.s3a.endpoint.region', region_name)
4 changes: 2 additions & 2 deletions cumulus/formats/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class NdjsonFormat(AthenaFormat):
class NdjsonFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat ndjson files"""

@property
Expand Down
4 changes: 2 additions & 2 deletions cumulus/formats/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class ParquetFormat(AthenaFormat):
class ParquetFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat parquet files"""

@property
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name = "cumulus"
requires-python = ">= 3.7"
dependencies = [
"ctakesclient >= 1.3",
"delta-spark >= 2.1",
# This branch includes some jwt fixes we need (and will hopefully be in mainline in future)
"fhirclient @ git+https://github.com/mikix/client-py.git@mikix/oauth2-jwt",
"jwcrypto",
Expand Down
76 changes: 76 additions & 0 deletions tests/test_deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Tests for Delta Lake support"""

import os
import shutil
import tempfile
import unittest

import pandas
from pyspark.sql.utils import AnalysisException
from cumulus import config, formats, store


class TestDeltaLake(unittest.TestCase):
"""
Test case for the Delta Lake format writer.
i.e. tests for deltalake.py
"""

@classmethod
def setUpClass(cls):
super().setUpClass()
output_tempdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
cls.output_tempdir = output_tempdir
cls.output_dir = output_tempdir.name

# It is expensive to create a DeltaLakeFormat instance because of all the pyspark jar downloading etc.
# So we only do it once per class suite. (And erase all folder contents per-test)
cls.deltalake = formats.DeltaLakeFormat(store.Root(output_tempdir.name))

def setUp(self):
super().setUp()
shutil.rmtree(self.output_dir, ignore_errors=True)
self.job = config.JobSummary()

@staticmethod
def df(**kwargs) -> pandas.DataFrame:
"""
Creates a dummy DataFrame with ids & values equal to each kwarg provided.
"""
rows = [{'id': k, 'value': v} for k, v in kwargs.items()]
return pandas.DataFrame(rows)

def store(self, df: pandas.DataFrame, batch: int = 10) -> None:
"""
Writes a single batch of data to the data lake.
:param df: the data to insert
:param batch: which batch number this is, defaulting to 10 to avoid triggering any first/last batch logic
"""
self.deltalake.store_patients(self.job, df, batch)

def assert_lake_equal(self, df: pandas.DataFrame, when: int = None) -> None:
table_path = os.path.join(self.output_dir, 'patient')

reader = self.deltalake.spark.read
if when is not None:
reader = reader.option('versionAsOf', when)

table_df = reader.format('delta').load(table_path).sort('id').toPandas()
self.assertDictEqual(df.to_dict(), table_df.to_dict())

def test_creates_if_empty(self):
"""Verify that the lake is created when empty"""
# sanity check that it doesn't exist yet
with self.assertRaises(AnalysisException):
self.assert_lake_equal(self.df())

self.store(self.df(a=1))
self.assert_lake_equal(self.df(a=1))

def test_upsert(self):
"""Verify that we can update and insert data"""
self.store(self.df(a=1, b=2))
self.store(self.df(b=20, c=3))
self.assert_lake_equal(self.df(a=1, b=20, c=3))
Loading

0 comments on commit 58e1291

Please sign in to comment.