Skip to content

Commit

Permalink
Uniform Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Sep 29, 2024
1 parent b98f72a commit 6ec9b47
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 8 deletions.
26 changes: 26 additions & 0 deletions docs/DeltaFileFormatWriter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# DeltaFileFormatWriter

## Write Data Out { #write }

```scala
write(
sparkSession: SparkSession,
plan: SparkPlan,
fileFormat: FileFormat,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
statsTrackers: Seq[WriteJobStatsTracker],
options: Map[String, String],
numStaticPartitionCols: Int = 0): Set[String]
```

`write`...FIXME

---

`write` is used when:

* `TransactionalWrite` is requested to [write data out](TransactionalWrite.md#writeFiles)
7 changes: 7 additions & 0 deletions docs/DeltaOptimizedWriterExec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: DeltaOptimizedWriterExec
---

# DeltaOptimizedWriterExec Physical Operator

`DeltaOptimizedWriterExec` is...FIXME
35 changes: 27 additions & 8 deletions docs/TransactionalWrite.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ hasWritten: Boolean = false

`hasWritten` is initially `false` and changes to `true` after [data is written out](#writeFiles).

## Writing Data Out { #writeFiles }
## Write Data Out { #writeFiles }

```scala
writeFiles(
Expand All @@ -81,6 +81,11 @@ writeFiles(
writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
additionalConstraints: Seq[Constraint]): Seq[FileAction] // (4)!
writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction]
writeFiles(
data: Dataset[_],
Expand All @@ -90,6 +95,7 @@ writeFiles(
1. Uses no [Constraint](constraints/Constraint.md)s
2. Uses no write-related [DeltaOptions](spark-connector/DeltaOptions.md)
3. Uses no [Constraint](constraints/Constraint.md)s
4. `isOptimize` disabled

`writeFiles` writes the given `data` (as a `Dataset`) to a [delta table](#deltaLog) and returns [AddFile](AddFile.md)s with [AddCDCFile](AddCDCFile.md)s (from the [DelayedCommitProtocol](#writeFiles-committer)).

Expand All @@ -103,6 +109,7 @@ writeFiles(
* `OptimizeExecutor` is requested to [runOptimizeBinJob](commands/optimize/OptimizeExecutor.md#runOptimizeBinJob)
* `UpdateCommand` is requested to [rewriteFiles](commands/update/UpdateCommand.md#rewriteFiles)
* `DeltaSink` is requested to [add a streaming micro-batch](spark-connector/DeltaSink.md#addBatch)
* `RemoveColumnMappingCommand` is requested to [write data out](commands/alter/RemoveColumnMappingCommand.md#writeData)

---

Expand Down Expand Up @@ -169,7 +176,16 @@ Even though it is so early, `writeFiles` turns the [hasWritten](#hasWritten) fla
!!! note
The `DeltaInvariantCheckerExec` physical operator is later used as the physical plan to for the [data to be written out](#writeFiles-FileFormatWriter).

### Step 6.3 BasicWriteJobStatsTracker { #writeFiles-statsTrackers }
### Step 6.3 DeltaOptimizedWriterExec { #writeFiles-DeltaOptimizedWriterExec }

`writeFiles` creates a [DeltaOptimizedWriterExec](DeltaOptimizedWriterExec.md) physical operator as the parent of the [DeltaInvariantCheckerExec](#writeFiles-physicalPlan) unary physical operator when all of the following hold true:

1. `isOptimize` is disabled (`false`)
1. [shouldOptimizeWrite](#shouldOptimizeWrite)

Otherwise, `writeFiles` leaves the [DeltaInvariantCheckerExec](#writeFiles-physicalPlan) unary physical operator intact.

### Step 6.4 BasicWriteJobStatsTracker { #writeFiles-statsTrackers }

`writeFiles` may or may not create a `BasicWriteJobStatsTracker` ([Spark SQL]({{ book.spark_sql }}/connectors/BasicWriteJobStatsTracker/)) based on [history.metricsEnabled](configuration-properties/index.md#history.metricsEnabled) configuration property.

Expand All @@ -183,23 +199,26 @@ Metric Name | UI Description
`numParts` | number of dynamic part
`jobCommitTime` | job commit time

### Step 6.4 Write Options { #writeFiles-options }
### Step 6.5 Write Options { #writeFiles-options }

`writeFiles` makes sure (_filters out_) that there are only the following [write options](spark-connector/DeltaOptions.md) used (from the given `writeOptions`), if specified:

* [compression](spark-connector/DeltaOptions.md#COMPRESSION)
* [maxRecordsPerFile](spark-connector/DeltaOptions.md#MAX_RECORDS_PER_FILE)

### Step 6.5 FileFormatWriter { #writeFiles-FileFormatWriter }
`writeFiles` adds one Uniform (Iceberg compatibility-specific) option:

Option | Value
-|-
[writePartitionColumns](spark-connector/DeltaOptions.md#WRITE_PARTITION_COLUMNS) | [isAnyEnabled](uniform/IcebergCompat.md#isAnyEnabled)

As the very last step within the scope of the [new execution ID](#writeFiles-deltaTransactionalWrite), `writeFiles` writes out the data (using [Spark SQL]({{ book.spark_sql }}/connectors/FileFormatWriter/#write) infrastructure).
### Step 6.6 FileFormatWriter { #writeFiles-FileFormatWriter }

??? tip "Logging"
Enable `ALL` logging level for [org.apache.spark.sql.execution.datasources.FileFormatWriter]({{ book.spark_sql }}/connectors/FileFormatWriter#logging) logger to see what happens inside.
As the very last step within the scope of the [new execution ID](#writeFiles-deltaTransactionalWrite), `writeFiles` [writes out the data](DeltaFileFormatWriter.md#write).

`writeFiles` uses the following (among the others):

* [DeltaInvariantCheckerExec](#writeFiles-physicalPlan) as the physical plan
* [DeltaInvariantCheckerExec](#writeFiles-physicalPlan) (possibly with [DeltaOptimizedWriterExec](#writeFiles-DeltaOptimizedWriterExec) parent) as the physical plan
* The [partitioning columns](#writeFiles-partitioningColumns)
* No bucketing
* [DeltaJobStatisticsTracker](#writeFiles-optionalStatsTracker) and [BasicWriteJobStatsTracker](#writeFiles-statsTrackers)
Expand Down
26 changes: 26 additions & 0 deletions docs/uniform/IcebergCompat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# IcebergCompat

## knownVersions { #knownVersions }

`IcebergCompat` defines a collection of Iceberg Compatibility-related table properties and the versions:

Table Property | ID
-|-
[delta.enableIcebergCompatV1](../table-properties/DeltaConfigs.md#ICEBERG_COMPAT_V1_ENABLED) | 1
[delta.enableIcebergCompatV2](../table-properties/DeltaConfigs.md#ICEBERG_COMPAT_V2_ENABLED) | 2

## isAnyEnabled { #isAnyEnabled }

```scala
isAnyEnabled(
metadata: Metadata): Boolean
```

`isAnyEnabled` checks if any version of the Iceberg Compatibility (table properties) is enabled (`true`) in the given [Metadata](../Metadata.md).

---

`isAnyEnabled` is used when:

* `UniversalFormat` is requested to [enforceIcebergInvariantsAndDependencies](UniversalFormat.md#enforceIcebergInvariantsAndDependencies)
* `TransactionalWrite` is requested to [write data out](../TransactionalWrite.md#writeFiles)
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,10 @@ nav:
- ColumnWithDefaultExprUtils: ColumnWithDefaultExprUtils.md
- ConflictChecker: ConflictChecker.md
- CurrentTransactionInfo: CurrentTransactionInfo.md
- DeltaFileFormatWriter: DeltaFileFormatWriter.md
- DeltaFileOperations: DeltaFileOperations.md
- DeltaJobStatisticsTracker: DeltaJobStatisticsTracker.md
- DeltaOptimizedWriterExec: DeltaOptimizedWriterExec.md
- DeltaRelation: DeltaRelation.md
- DeltaTableOperations: DeltaTableOperations.md
- DeltaTableUtils: DeltaTableUtils.md
Expand Down

0 comments on commit 6ec9b47

Please sign in to comment.