Skip to content

Commit

Permalink
Merge pull request #89 from smart-on-fhir/mikix/acidlake
Browse files Browse the repository at this point in the history
feat: add Delta Lake support
  • Loading branch information
mikix authored Dec 28, 2022
2 parents 395a1f1 + 674e772 commit fa2fb01
Show file tree
Hide file tree
Showing 31 changed files with 250 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.7", "3.8", "3.9", "3.10"]

steps:
- uses: actions/checkout@v3
Expand Down
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Project specific

example-output/
example-phi-build/
/.idea/
/example-output/
/example-phi-build/

# Python gitignore https://github.com/github/gitignore/blob/main/Python.gitignore
# Byte-compiled / optimized / DLL files
Expand Down
6 changes: 4 additions & 2 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def main(args: List[str]):
parser.add_argument('dir_phi', metavar='/path/to/phi')
parser.add_argument('--input-format', default='ndjson', choices=['i2b2', 'ndjson'],
help='input format (default is ndjson)')
parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'],
parser.add_argument('--output-format', default='parquet', choices=['deltalake', 'json', 'ndjson', 'parquet'],
help='output format (default is parquet)')
parser.add_argument('--batch-size', type=int, metavar='SIZE', default=200000,
help='how many entries to process at once and thus '
Expand Down Expand Up @@ -401,7 +401,9 @@ def main(args: List[str]):
else:
config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks)

if args.output_format == 'json':
if args.output_format == 'deltalake':
config_store = formats.DeltaLakeFormat(root_output)
elif args.output_format == 'json':
config_store = formats.JsonTreeFormat(root_output)
elif args.output_format == 'parquet':
config_store = formats.ParquetFormat(root_output)
Expand Down
1 change: 1 addition & 0 deletions cumulus/formats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Classes that know _how_ to write out results to the target folder"""

from .deltalake import DeltaLakeFormat
from .json_tree import JsonTreeFormat
from .ndjson import NdjsonFormat
from .parquet import ParquetFormat
54 changes: 33 additions & 21 deletions cumulus/formats/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,36 @@ class AthenaFormat(store.Format):
(i.e. one folder per data type, broken into large files)
"""

@abc.abstractmethod
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to the output database folder"""

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self.write_records(job, patients, 'patient', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self.write_records(job, encounters, 'encounter', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, labs, 'observation', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self.write_records(job, conditions, 'condition', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, docrefs, 'documentreference', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self.write_records(job, observations, 'symptom', batch)


class AthenaBatchedFileFormat(AthenaFormat):
"""
Stores output files as batched individual files.
i.e. a few ndjson files that hold all the rows
"""

@property
@abc.abstractmethod
def suffix(self) -> str:
Expand All @@ -39,7 +69,7 @@ def write_format(self, df: pandas.DataFrame, path: str) -> None:
#
##########################################################################################

def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> None:
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a single file"""
job.attempt += len(df)

Expand All @@ -49,36 +79,18 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No
# our files out. What we really want is some sort of blue/green deploy of data. There's no
# satisfying fix while we are writing to the same folder. (Unless we do incremental/delta
# writes and keep all old data around still.)
parent_dir = self.root.joinpath(os.path.dirname(path))
parent_dir = self.root.joinpath(dbname)
try:
self.root.rm(parent_dir, recursive=True)
except FileNotFoundError:
pass

try:
full_path = self.root.joinpath(f'{path}.{batch:03}.{self.suffix}')
full_path = self.root.joinpath(f'{dbname}/{dbname}.{batch:03}.{self.suffix}')
self.root.makedirs(os.path.dirname(full_path))
self.write_format(df, full_path)

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self._write_records(job, patients, 'patient/fhir_patients', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self._write_records(job, encounters, 'encounter/fhir_encounters', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, labs, 'observation/fhir_observations', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self._write_records(job, conditions, 'condition/fhir_conditions', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, docrefs, 'documentreference/fhir_documentreferences', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self._write_records(job, observations, 'symptom/fhir_symptoms', batch)
111 changes: 111 additions & 0 deletions cumulus/formats/deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
An implementation of Format that writes to a Delta Lake.
See https://delta.io/
"""

import contextlib
import logging
import os

import delta
import pandas
import pyspark
from pyspark.sql.utils import AnalysisException

from cumulus import store

from .athena import AthenaFormat

# This class would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code
# uses and expects (in terms of filesystem writing).
#
# There is a 1st party Delta Lake implementation (`deltalake`) based off native Rust code and which talks to
# fsspec & pandas by default. But it is missing some critical features as of this writing (mostly merges):
# - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850


@contextlib.contextmanager
def _suppress_output():
"""
Totally hides stdout and stderr unless there is an error, and then stderr is printed.
This is a more powerful version of contextlib.redirect_stdout that also works for subprocesses / threads.
"""
stdout = os.dup(1)
stderr = os.dup(2)
silent = os.open(os.devnull, os.O_WRONLY)
os.dup2(silent, 1)
os.dup2(silent, 2)

try:
yield
finally:
os.dup2(stdout, 1)
os.dup2(stderr, 2)


class DeltaLakeFormat(AthenaFormat):
"""
Stores data in a delta lake.
"""
def __init__(self, root: store.Root):
super().__init__(root)

# This _suppress_output call is because pyspark is SO NOISY during session creation. Like 40 lines of trivial
# output. Progress reports of downloading the jars. Comments about default logging level and the hostname.
# I could not find a way to set the log level before the session is created. So here we just suppress
# stdout/stderr entirely.
with _suppress_output():
# Prep the builder with various config options
builder = pyspark.sql.SparkSession.builder \
.appName('cumulus-etl') \
.config('spark.driver.memory', '2g') \
.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
.config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')

# Now add delta's packages and actually build the session
self.spark = delta.configure_spark_with_delta_pip(builder, extra_packages=[
'org.apache.hadoop:hadoop-aws:3.3.4',
]).getOrCreate()

self.spark.sparkContext.setLogLevel('ERROR')
self._configure_fs()

def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a delta lake"""
job.attempt += len(df)
full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a: scheme instead of s3:

try:
updates = self.spark.createDataFrame(df)

try:
table = delta.DeltaTable.forPath(self.spark, full_path)
if batch == 0:
table.vacuum() # Clean up unused data files older than retention policy (default 7 days)
table.alias('table') \
.merge(source=updates.alias('updates'), condition='table.id = updates.id') \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
except AnalysisException:
# table does not exist yet, let's make an initial version
updates.write.save(path=full_path, format='delta')

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def _configure_fs(self):
"""Tell spark/hadoop how to talk to S3 for us"""
fsspec_options = self.root.fsspec_options()
self.spark.conf.set('fs.s3a.sse.enabled', 'true')
self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS')
kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId')
if kms_key:
self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key)
region_name = fsspec_options.get('client_kwargs', {}).get('region_name')
if region_name:
self.spark.conf.set('fs.s3a.endpoint.region', region_name)
4 changes: 2 additions & 2 deletions cumulus/formats/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class NdjsonFormat(AthenaFormat):
class NdjsonFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat ndjson files"""

@property
Expand Down
4 changes: 2 additions & 2 deletions cumulus/formats/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class ParquetFormat(AthenaFormat):
class ParquetFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat parquet files"""

@property
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name = "cumulus"
requires-python = ">= 3.7"
dependencies = [
"ctakesclient >= 1.3",
"delta-spark >= 2.1",
# This branch includes some jwt fixes we need (and will hopefully be in mainline in future)
"fhirclient @ git+https://github.com/mikix/client-py.git@mikix/oauth2-jwt",
"jwcrypto",
Expand Down
76 changes: 76 additions & 0 deletions tests/test_deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Tests for Delta Lake support"""

import os
import shutil
import tempfile
import unittest

import pandas
from pyspark.sql.utils import AnalysisException
from cumulus import config, formats, store


class TestDeltaLake(unittest.TestCase):
"""
Test case for the Delta Lake format writer.
i.e. tests for deltalake.py
"""

@classmethod
def setUpClass(cls):
super().setUpClass()
output_tempdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
cls.output_tempdir = output_tempdir
cls.output_dir = output_tempdir.name

# It is expensive to create a DeltaLakeFormat instance because of all the pyspark jar downloading etc.
# So we only do it once per class suite. (And erase all folder contents per-test)
cls.deltalake = formats.DeltaLakeFormat(store.Root(output_tempdir.name))

def setUp(self):
super().setUp()
shutil.rmtree(self.output_dir, ignore_errors=True)
self.job = config.JobSummary()

@staticmethod
def df(**kwargs) -> pandas.DataFrame:
"""
Creates a dummy DataFrame with ids & values equal to each kwarg provided.
"""
rows = [{'id': k, 'value': v} for k, v in kwargs.items()]
return pandas.DataFrame(rows)

def store(self, df: pandas.DataFrame, batch: int = 10) -> None:
"""
Writes a single batch of data to the data lake.
:param df: the data to insert
:param batch: which batch number this is, defaulting to 10 to avoid triggering any first/last batch logic
"""
self.deltalake.store_patients(self.job, df, batch)

def assert_lake_equal(self, df: pandas.DataFrame, when: int = None) -> None:
table_path = os.path.join(self.output_dir, 'patient')

reader = self.deltalake.spark.read
if when is not None:
reader = reader.option('versionAsOf', when)

table_df = reader.format('delta').load(table_path).sort('id').toPandas()
self.assertDictEqual(df.to_dict(), table_df.to_dict())

def test_creates_if_empty(self):
"""Verify that the lake is created when empty"""
# sanity check that it doesn't exist yet
with self.assertRaises(AnalysisException):
self.assert_lake_equal(self.df())

self.store(self.df(a=1))
self.assert_lake_equal(self.df(a=1))

def test_upsert(self):
"""Verify that we can update and insert data"""
self.store(self.df(a=1, b=2))
self.store(self.df(b=20, c=3))
self.assert_lake_equal(self.df(a=1, b=20, c=3))
Loading

0 comments on commit fa2fb01

Please sign in to comment.