Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add upsert data lake support #86

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ def main(args: List[str]):
parser.add_argument('dir_phi', metavar='/path/to/phi')
parser.add_argument('--input-format', default='ndjson', choices=['i2b2', 'ndjson'],
help='input format (default is ndjson)')
parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'],
help='output format (default is parquet)')
parser.add_argument('--output-format', default='iceberg', choices=['delta', 'iceberg', 'json', 'ndjson', 'parquet'],
help='output format (default is iceberg)')
parser.add_argument('--batch-size', type=int, metavar='SIZE', default=10000000,
help='how many entries to process at once and thus '
'how many to put in one output file (default is 10M)')
Expand Down Expand Up @@ -366,7 +366,11 @@ def main(args: List[str]):
else:
config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks)

if args.output_format == 'json':
if args.output_format == 'delta':
config_store = formats.DeltaLakeFormat(root_output)
elif args.output_format == 'iceberg':
config_store = formats.IcebergFormat(root_output)
elif args.output_format == 'json':
config_store = formats.JsonTreeFormat(root_output)
elif args.output_format == 'parquet':
config_store = formats.ParquetFormat(root_output)
Expand Down
1 change: 1 addition & 0 deletions cumulus/formats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Classes that know _how_ to write out results to the target folder"""

from .deltalake import DeltaLakeFormat, IcebergFormat
from .json_tree import JsonTreeFormat
from .ndjson import NdjsonFormat
from .parquet import ParquetFormat
54 changes: 33 additions & 21 deletions cumulus/formats/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,36 @@ class AthenaFormat(store.Format):
(i.e. one folder per data type, broken into large files)
"""

@abc.abstractmethod
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to the output database folder"""

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self.write_records(job, patients, 'patient', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self.write_records(job, encounters, 'encounter', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, labs, 'observation', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self.write_records(job, conditions, 'condition', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self.write_records(job, docrefs, 'documentreference', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self.write_records(job, observations, 'symptom', batch)


class AthenaBatchedFileFormat(AthenaFormat):
"""
Stores output files as batched individual files.
i.e. a few ndjson files that hold all the rows
"""

@property
@abc.abstractmethod
def suffix(self) -> str:
Expand All @@ -39,7 +69,7 @@ def write_format(self, df: pandas.DataFrame, path: str) -> None:
#
##########################################################################################

def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> None:
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a single file"""
job.attempt += len(df)

Expand All @@ -49,36 +79,18 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No
# our files out. What we really want is some sort of blue/green deploy of data. There's no
# satisfying fix while we are writing to the same folder. (Unless we do incremental/delta
# writes and keep all old data around still.)
parent_dir = self.root.joinpath(os.path.dirname(path))
parent_dir = self.root.joinpath(dbname)
try:
self.root.rm(parent_dir, recursive=True)
except FileNotFoundError:
pass

try:
full_path = self.root.joinpath(f'{path}.{batch:03}.{self.suffix}')
full_path = self.root.joinpath(f'{dbname}/{dbname}.{batch:03}.{self.suffix}')
self.root.makedirs(os.path.dirname(full_path))
self.write_format(df, full_path)

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None:
self._write_records(job, patients, 'patient/fhir_patients', batch)

def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None:
self._write_records(job, encounters, 'encounter/fhir_encounters', batch)

def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, labs, 'observation/fhir_observations', batch)

def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None:
self._write_records(job, conditions, 'condition/fhir_conditions', batch)

def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None:
self._write_records(job, docrefs, 'documentreference/fhir_documentreferences', batch)

def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None:
self._write_records(job, observations, 'symptom/fhir_symptoms', batch)
133 changes: 133 additions & 0 deletions cumulus/formats/deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""
An implementation of Format that writes to a Delta Lake.
See https://delta.io/
"""

import logging

import delta
import pandas
import pyspark
from pyspark.sql.utils import AnalysisException

from cumulus import store

from .athena import AthenaFormat


# This class would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code
# uses and expects (in terms of filesystem writing).
# Spark uses hadoop and both are written in Java, so we need to write some glue code and also convert S3 args.
# There is a different Delta Lake implementation (deltalake on pypi) based off native Rust code and which talks to
# Pandas & fsspec by default. But it is missing some critical features as of this writing (like merges):
# - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850

class IcebergFormat(AthenaFormat):
"""
Stores data in an iceberg data lake.
"""
def __init__(self, root: store.Root):
super().__init__(root)
# Package link: https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-extensions-3.3
self.spark = pyspark.sql.SparkSession.builder \
.appName('cumulus-etl') \
.config('spark.driver.memory', '2g') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-3.3_2.12:1.0.0,'
'org.apache.iceberg:iceberg-spark-extensions-3.3_2.12:1.0.0') \
.config('spark.sql.catalog.cumulus', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.cumulus.type', 'hadoop') \
.config('spark.sql.catalog.cumulus.warehouse', root.path) \
.config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog') \
.config('spark.sql.defaultCatalog', 'cumulus') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.getOrCreate()
#self._configure_fs()

def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a delta lake"""
job.attempt += len(df)
full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a: scheme instead of s3:

try:
updates = self.spark.createDataFrame(df)
updates.createOrReplaceTempView('updates')

try:
#_table = self.spark.table(dbname).alias('table')
self.spark.sql(
f'MERGE INTO {dbname} table '
'USING updates '
'ON table.id = updates.id '
'WHEN MATCHED THEN UPDATE SET * '
'WHEN NOT MATCHED THEN INSERT * '
)
except AnalysisException as exc:
print(exc)
# table does not exist yet, let's make an initial version
updates.writeTo(dbname).create()

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')


class DeltaLakeFormat(AthenaFormat):
"""
Stores data in a delta lake.
"""
def __init__(self, root: store.Root):
super().__init__(root)
builder = pyspark.sql.SparkSession.builder \
.appName('cumulus-etl') \
.config('spark.driver.memory', '2g') \
.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
.config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
self.spark = delta.configure_spark_with_delta_pip(builder).getOrCreate()
self._configure_fs()

def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None:
"""Writes the whole dataframe to a delta lake"""
job.attempt += len(df)
full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a scheme (same as s3)

try:
updates = self.spark.createDataFrame(df)

try:
table = delta.DeltaTable.forPath(self.spark, full_path)
# if batch == 0:
# table.vacuum()
# TODO: why does this keep deleting and recreating a single row...?
table.alias('table') \
.merge(source=updates.alias('updates'), condition='table.id = updates.id') \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
except AnalysisException:
# table does not exist yet, let's make an initial version
updates.write.format('delta').save(full_path)
table = delta.DeltaTable.forPath(self.spark, full_path)

# Now generate a manifest file for Athena's benefit.
# Otherwise, Athena would scan all parquet files, even ones holding removed data.
# https://docs.delta.io/latest/delta-utility.html#generate-a-manifest-file
table.generate('symlink_format_manifest')

job.success += len(df)
job.success_rate(1)
except Exception: # pylint: disable=broad-except
logging.exception('Could not process data records')

def _configure_fs(self):
"""Tell spark/hadoop how to talk to S3 for us"""
fsspec_options = self.root.fsspec_options()
self.spark.conf.set('fs.s3a.sse.enabled', 'true')
self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS')
kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId')
if kms_key:
self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key)
region_name = fsspec_options.get('client_kwargs', {}).get('region_name')
if region_name:
self.spark.conf.set('fs.s3a.endpoint.region', region_name)
4 changes: 2 additions & 2 deletions cumulus/formats/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class NdjsonFormat(AthenaFormat):
class NdjsonFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat ndjson files"""

@property
Expand Down
4 changes: 2 additions & 2 deletions cumulus/formats/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pandas

from .athena import AthenaFormat
from .athena import AthenaBatchedFileFormat


class ParquetFormat(AthenaFormat):
class ParquetFormat(AthenaBatchedFileFormat):
"""Stores output files in a few flat parquet files"""

@property
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ name = "cumulus"
requires-python = ">= 3.7"
dependencies = [
"ctakesclient >= 1.3",
"delta-spark >= 2.1",
# This branch includes some jwt fixes we need (and will hopefully be in mainline in future)
"fhirclient @ git+https://github.com/mikix/client-py.git@mikix/oauth2-jwt",
"jwcrypto",
"pandas",
"pyarrow",
"pyiceberg[glue] @ git+https://github.com/apache/iceberg.git#subdirectory=python",
"s3fs",
]
authors = [
Expand Down