Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Delta Lake support #89

Merged
merged 1 commit into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.7", "3.8", "3.9", "3.10"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK how do we feel about this? Pyspark 3.4 officially supports py3.11. But delta-spark is still pinned to 3.3.

So... how much do we feel the need to support py3.11? Our docker image right now is still 3.10, so this doesn't affect our shipped artifacts yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i generally have no interest in being leading edge - but we should watch it.


steps:
- uses: actions/checkout@v3
Expand Down
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Project specific

example-output/
example-phi-build/
/.idea/
/example-output/
/example-phi-build/

# Python gitignore https://github.com/github/gitignore/blob/main/Python.gitignore
# Byte-compiled / optimized / DLL files
Expand Down
6 changes: 4 additions & 2 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def main(args: List[str]):
parser.add_argument('dir_phi', metavar='/path/to/phi')
parser.add_argument('--input-format', default='ndjson', choices=['i2b2', 'ndjson'],
help='input format (default is ndjson)')
parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'],
parser.add_argument('--output-format', default='parquet', choices=['deltalake', 'json', 'ndjson', 'parquet'],
help='output format (default is parquet)')
parser.add_argument('--batch-size', type=int, metavar='SIZE', default=200000,
help='how many entries to process at once and thus '
Expand Down Expand Up @@ -401,7 +401,9 @@ def main(args: List[str]):
else:
config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks)

if args.output_format == 'json':
if args.output_format == 'deltalake':
config_store = formats.DeltaLakeFormat(root_output)
elif args.output_format == 'json':
config_store = formats.JsonTreeFormat(root_output)
elif args.output_format == 'parquet':
config_store = formats.ParquetFormat(root_output)
Expand Down
1 change: 1 addition & 0 deletions cumulus/formats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Classes that know _how_ to write out results to the target folder"""

from .deltalake import DeltaLakeFormat
from .json_tree import JsonTreeFormat
from .ndjson import NdjsonFormat
from .parquet import ParquetFormat
54 changes: 33 additions & 21 deletions cumulus/formats/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,36 @@ class AthenaFormat(store.Format):
(i.e. one folder per data type, broken into large files)
"""

@abc.abstractmethod
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to the output database folder"""

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self.write_records(job, patients, 'patient', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self.write_records(job, encounters, 'encounter', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, labs, 'observation', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self.write_records(job, conditions, 'condition', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, docrefs, 'documentreference', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self.write_records(job, observations, 'symptom', batch)


class AthenaBatchedFileFormat(AthenaFormat):
"""
Stores output files as batched individual files.

i.e. a few ndjson files that hold all the rows
"""

@property
@abc.abstractmethod
def suffix(self) -> str:
Expand All @@ -39,7 +69,7 @@ def write_format(self, df: pandas.DataFrame, path: str) -> None:
#
##########################################################################################

def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> None:
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a single file"""
job.attempt += len(df)

Expand All @@ -49,36 +79,18 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No
# our files out. What we really want is some sort of blue/green deploy of data. There's no
# satisfying fix while we are writing to the same folder. (Unless we do incremental/delta
# writes and keep all old data around still.)
parent_dir = self.root.joinpath(os.path.dirname(path))
parent_dir = self.root.joinpath(dbname)
try:
self.root.rm(parent_dir, recursive=True)
except FileNotFoundError:
pass

try:
full_path = self.root.joinpath(f'{path}.{batch:03}.{self.suffix}')
full_path = self.root.joinpath(f'{dbname}/{dbname}.{batch:03}.{self.suffix}')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change right here is renaming our output files from condition/fhir_conditions.000.ndjson to condition/condition.000.ndjson. Not an important part of this PR, but it made it easier to handle all output tables the same. But it also caused a lot of file/test changes, sorry.

self.root.makedirs(os.path.dirname(full_path))
self.write_format(df, full_path)

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self._write_records(job, patients, 'patient/fhir_patients', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self._write_records(job, encounters, 'encounter/fhir_encounters', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, labs, 'observation/fhir_observations', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self._write_records(job, conditions, 'condition/fhir_conditions', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, docrefs, 'documentreference/fhir_documentreferences', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self._write_records(job, observations, 'symptom/fhir_symptoms', batch)
111 changes: 111 additions & 0 deletions cumulus/formats/deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
An implementation of Format that writes to a Delta Lake.

See https://delta.io/
"""

import contextlib
import logging
import os

import delta
import pandas
import pyspark
from pyspark.sql.utils import AnalysisException

from cumulus import store

from .athena import AthenaFormat

# This class would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code
# uses and expects (in terms of filesystem writing).
#
# There is a 1st party Delta Lake implementation (`deltalake`) based off native Rust code and which talks to
# fsspec & pandas by default. But it is missing some critical features as of this writing (mostly merges):
# - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850


@contextlib.contextmanager
def _suppress_output():
"""
Totally hides stdout and stderr unless there is an error, and then stderr is printed.

This is a more powerful version of contextlib.redirect_stdout that also works for subprocesses / threads.
"""
stdout = os.dup(1)
stderr = os.dup(2)
silent = os.open(os.devnull, os.O_WRONLY)
os.dup2(silent, 1)
os.dup2(silent, 2)

try:
yield
finally:
os.dup2(stdout, 1)
os.dup2(stderr, 2)


class DeltaLakeFormat(AthenaFormat):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not proud of much of this class. From suppressing output, to the list of jars and versions, to converting fsspec s3 params to hadoop s3 params. But it does seem to technically work...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would you feel about seperating the write and merge? like - just have the etl process create the data and push to S3, and have some kind of service in AWS pick up those files and do the deltalake insertion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry can you explain the intent there? Separating out the gross Java-requiring code to another step of the process, or is there a fancy AWS thing you are thinking of that saves us overall code or something else?

I'm leery of the control we'd be giving up there. Like in the short term, I'm going to add support for the FHIR server telling us which resources have been deleted since we last exported. How might that look with what you're proposing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grosser - you'd have to set up some way to call an API or otherwise execute a query.

but doing the data lake injest in aws, you might be able to do a glue job rather than worry about hand managing it: https://aws.amazon.com/marketplace/pp/prodview-seypofzqhdueq - it might be easier overall?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, I think I prefer hand managing: more control over flow (like deleting), less AWS-dependency, and less moving parts / infrastructure. And the cost feels light -- 20 lines of "upsert or create" basically.

Are your trade-off sliders set differently, or were you musing aloud?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly musing, especially for the frequency we're running jobs at.

"""
Stores data in a delta lake.
"""
def __init__(self, root: store.Root):
super().__init__(root)

# This _suppress_output call is because pyspark is SO NOISY during session creation. Like 40 lines of trivial
# output. Progress reports of downloading the jars. Comments about default logging level and the hostname.
# I could not find a way to set the log level before the session is created. So here we just suppress
# stdout/stderr entirely.
with _suppress_output():
# Prep the builder with various config options
builder = pyspark.sql.SparkSession.builder \
.appName('cumulus-etl') \
.config('spark.driver.memory', '2g') \
.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
.config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')

# Now add delta's packages and actually build the session
self.spark = delta.configure_spark_with_delta_pip(builder, extra_packages=[
'org.apache.hadoop:hadoop-aws:3.3.4',
]).getOrCreate()

self.spark.sparkContext.setLogLevel('ERROR')
self._configure_fs()

def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a delta lake"""
job.attempt += len(df)
full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a: scheme instead of s3:

try:
updates = self.spark.createDataFrame(df)

try:
table = delta.DeltaTable.forPath(self.spark, full_path)
if batch == 0:
table.vacuum() # Clean up unused data files older than retention policy (default 7 days)
table.alias('table') \
.merge(source=updates.alias('updates'), condition='table.id = updates.id') \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
except AnalysisException:
# table does not exist yet, let's make an initial version
updates.write.save(path=full_path, format='delta')

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def _configure_fs(self):
"""Tell spark/hadoop how to talk to S3 for us"""
fsspec_options = self.root.fsspec_options()
self.spark.conf.set('fs.s3a.sse.enabled', 'true')
self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS')
kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId')
if kms_key:
self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key)
region_name = fsspec_options.get('client_kwargs', {}).get('region_name')
if region_name:
self.spark.conf.set('fs.s3a.endpoint.region', region_name)
4 changes: 2 additions & 2 deletions cumulus/formats/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class NdjsonFormat(AthenaFormat):
class NdjsonFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat ndjson files"""

@property
Expand Down
4 changes: 2 additions & 2 deletions cumulus/formats/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class ParquetFormat(AthenaFormat):
class ParquetFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat parquet files"""

@property
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name = "cumulus"
requires-python = ">= 3.7"
dependencies = [
"ctakesclient >= 1.3",
"delta-spark >= 2.1",
# This branch includes some jwt fixes we need (and will hopefully be in mainline in future)
"fhirclient @ git+https://github.com/mikix/client-py.git@mikix/oauth2-jwt",
"jwcrypto",
Expand Down
76 changes: 76 additions & 0 deletions tests/test_deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Tests for Delta Lake support"""

import os
import shutil
import tempfile
import unittest

import pandas
from pyspark.sql.utils import AnalysisException
from cumulus import config, formats, store


class TestDeltaLake(unittest.TestCase):
"""
Test case for the Delta Lake format writer.

i.e. tests for deltalake.py
"""

@classmethod
def setUpClass(cls):
super().setUpClass()
output_tempdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
cls.output_tempdir = output_tempdir
cls.output_dir = output_tempdir.name

# It is expensive to create a DeltaLakeFormat instance because of all the pyspark jar downloading etc.
# So we only do it once per class suite. (And erase all folder contents per-test)
cls.deltalake = formats.DeltaLakeFormat(store.Root(output_tempdir.name))

def setUp(self):
super().setUp()
shutil.rmtree(self.output_dir, ignore_errors=True)
self.job = config.JobSummary()

@staticmethod
def df(**kwargs) -> pandas.DataFrame:
"""
Creates a dummy DataFrame with ids & values equal to each kwarg provided.
"""
rows = [{'id': k, 'value': v} for k, v in kwargs.items()]
return pandas.DataFrame(rows)

def store(self, df: pandas.DataFrame, batch: int = 10) -> None:
"""
Writes a single batch of data to the data lake.

:param df: the data to insert
:param batch: which batch number this is, defaulting to 10 to avoid triggering any first/last batch logic
"""
self.deltalake.store_patients(self.job, df, batch)

def assert_lake_equal(self, df: pandas.DataFrame, when: int = None) -> None:
table_path = os.path.join(self.output_dir, 'patient')

reader = self.deltalake.spark.read
if when is not None:
reader = reader.option('versionAsOf', when)

table_df = reader.format('delta').load(table_path).sort('id').toPandas()
self.assertDictEqual(df.to_dict(), table_df.to_dict())

def test_creates_if_empty(self):
"""Verify that the lake is created when empty"""
# sanity check that it doesn't exist yet
with self.assertRaises(AnalysisException):
self.assert_lake_equal(self.df())

self.store(self.df(a=1))
self.assert_lake_equal(self.df(a=1))

def test_upsert(self):
"""Verify that we can update and insert data"""
self.store(self.df(a=1, b=2))
self.store(self.df(b=20, c=3))
self.assert_lake_equal(self.df(a=1, b=20, c=3))
Loading