-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
246 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
"""Classes that know _how_ to write out results to the target folder""" | ||
|
||
from .deltalake import DeltaLakeFormat, IcebergFormat | ||
from .json_tree import JsonTreeFormat | ||
from .ndjson import NdjsonFormat | ||
from .parquet import ParquetFormat |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
""" | ||
An implementation of Format that writes to a Delta Lake. | ||
See https://delta.io/ | ||
""" | ||
|
||
import logging | ||
|
||
import delta | ||
import pandas | ||
import pyspark | ||
from pyspark.sql.utils import AnalysisException | ||
|
||
from cumulus import store | ||
|
||
from .athena import AthenaFormat | ||
|
||
# These classes would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code | ||
# uses and expects (in terms of filesystem writing). | ||
|
||
##### | ||
# ** Delta Lake native python support ** | ||
# The `delta-spark` package provides a wrapper around pyspark. | ||
# | ||
# There is a 1st party Delta Lake implementation (`deltalake`) based off native Rust code and which talks to | ||
# Pandas & fsspec by default. But it is missing some critical features as of this writing (like merges): | ||
# - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850 | ||
|
||
##### | ||
# ** Iceberg native python support ** | ||
# There is no pyspark wrapper, you just write pyspark code directly. | ||
# | ||
# There is a 1st party Iceberg implementation (`pyiceberg`) in native Python, which talks to Pandas & fsspec. | ||
# But it is also missing some critical features as of this writing (like upsert support) and is early-days. | ||
# - https://github.com/apache/iceberg/blob/master/python/mkdocs/docs/index.md#metadata | ||
|
||
|
||
class IcebergFormat(AthenaFormat): | ||
""" | ||
Stores data in an iceberg data lake. | ||
""" | ||
def __init__(self, root: store.Root): | ||
super().__init__(root) | ||
root_path = root.path.replace('s3://', 's3a://') # hadoop uses the s3a: scheme instead of s3: | ||
# Package links, just for reference: | ||
# - https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws | ||
# - https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-3.3 | ||
# - https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-extensions-3.3 | ||
self.spark = pyspark.sql.SparkSession.builder \ | ||
.appName('cumulus-etl') \ | ||
.config('spark.driver.memory', '2g') \ | ||
.config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,' | ||
'org.apache.iceberg:iceberg-spark-3.3_2.12:1.1.0,' | ||
'org.apache.iceberg:iceberg-spark-extensions-3.3_2.12:1.1.0') \ | ||
.config('spark.sql.catalog.cumulus', 'org.apache.iceberg.spark.SparkCatalog') \ | ||
.config('spark.sql.catalog.cumulus.type', 'hadoop') \ | ||
.config('spark.sql.catalog.cumulus.warehouse', root_path) \ | ||
.config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog') \ | ||
.config('spark.sql.defaultCatalog', 'cumulus') \ | ||
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \ | ||
.getOrCreate() | ||
self._configure_fs() | ||
|
||
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: | ||
"""Writes the whole dataframe to a delta lake""" | ||
job.attempt += len(df) | ||
|
||
try: | ||
updates = self.spark.createDataFrame(df) | ||
updates.createOrReplaceTempView('updates') | ||
|
||
try: | ||
#_table = self.spark.table(dbname).alias('table') | ||
self.spark.sql( | ||
f'MERGE INTO {dbname} table ' | ||
'USING updates ' | ||
'ON table.id = updates.id ' | ||
'WHEN MATCHED THEN UPDATE SET * ' | ||
'WHEN NOT MATCHED THEN INSERT * ' | ||
) | ||
except AnalysisException as exc: | ||
# table does not exist yet, let's make an initial version | ||
updates.writeTo(dbname).create() | ||
|
||
job.success += len(df) | ||
job.success_rate(1) | ||
except Exception: # pylint: disable=broad-except | ||
logging.exception('Could not process data records') | ||
|
||
def _configure_fs(self): | ||
"""Tell spark/hadoop how to talk to S3 for us""" | ||
fsspec_options = self.root.fsspec_options() | ||
self.spark.conf.set('fs.s3a.sse.enabled', 'true') | ||
self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS') | ||
kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId') | ||
if kms_key: | ||
self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key) | ||
region_name = fsspec_options.get('client_kwargs', {}).get('region_name') | ||
if region_name: | ||
self.spark.conf.set('fs.s3a.endpoint.region', region_name) | ||
|
||
|
||
class DeltaLakeFormat(AthenaFormat): | ||
""" | ||
Stores data in a delta lake. | ||
""" | ||
def __init__(self, root: store.Root): | ||
super().__init__(root) | ||
builder = pyspark.sql.SparkSession.builder \ | ||
.appName('cumulus-etl') \ | ||
.config('spark.driver.memory', '2g') \ | ||
.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \ | ||
.config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') | ||
self.spark = delta.configure_spark_with_delta_pip(builder).getOrCreate() | ||
self._configure_fs() | ||
|
||
def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: | ||
"""Writes the whole dataframe to a delta lake""" | ||
job.attempt += len(df) | ||
full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a scheme (same as s3) | ||
|
||
try: | ||
updates = self.spark.createDataFrame(df) | ||
|
||
try: | ||
table = delta.DeltaTable.forPath(self.spark, full_path) | ||
# if batch == 0: | ||
# table.vacuum() | ||
# TODO: why does this keep deleting and recreating a single row...? | ||
table.alias('table') \ | ||
.merge(source=updates.alias('updates'), condition='table.id = updates.id') \ | ||
.whenMatchedUpdateAll() \ | ||
.whenNotMatchedInsertAll() \ | ||
.execute() | ||
except AnalysisException: | ||
# table does not exist yet, let's make an initial version | ||
updates.write.format('delta').save(full_path) | ||
table = delta.DeltaTable.forPath(self.spark, full_path) | ||
|
||
# Now generate a manifest file for Athena's benefit. | ||
# Otherwise, Athena would scan all parquet files, even ones holding removed data. | ||
# https://docs.delta.io/latest/delta-utility.html#generate-a-manifest-file | ||
table.generate('symlink_format_manifest') | ||
|
||
job.success += len(df) | ||
job.success_rate(1) | ||
except Exception: # pylint: disable=broad-except | ||
logging.exception('Could not process data records') | ||
|
||
def _configure_fs(self): | ||
"""Tell spark/hadoop how to talk to S3 for us""" | ||
fsspec_options = self.root.fsspec_options() | ||
self.spark.conf.set('fs.s3a.sse.enabled', 'true') | ||
self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS') | ||
kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId') | ||
if kms_key: | ||
self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key) | ||
region_name = fsspec_options.get('client_kwargs', {}).get('region_name') | ||
if region_name: | ||
self.spark.conf.set('fs.s3a.endpoint.region', region_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.