Skip to content

Commit

Permalink
TransactionalWrite and WriteIntoDeltaLikes
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Sep 30, 2024
1 parent 680356d commit 9507770
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 11 deletions.
7 changes: 3 additions & 4 deletions docs/TransactionalWrite.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,13 @@ writeFiles(

`writeFiles` is used when:

* `WriteIntoDelta` is requested to [writeAndReturnCommitData](commands/WriteIntoDelta.md#writeAndReturnCommitData)
* `DeleteCommand` is requested to [rewriteFiles](commands/delete/DeleteCommand.md#rewriteFiles)
* `MergeIntoCommandBase` is requested to [writeFiles](commands/merge/MergeIntoCommandBase.md#writeFiles)
* `UpdateCommand` is requested to [rewriteFiles](commands/update/UpdateCommand.md#rewriteFiles)
* `WriteIntoDelta` is requested to [writeAndReturnCommitData](commands/WriteIntoDelta.md#writeAndReturnCommitData), [writeFiles](commands/WriteIntoDelta.md#writeFiles)
* `DeltaSink` is requested to [add a streaming micro-batch](spark-connector/DeltaSink.md#addBatch)
* `MergeIntoCommandBase` is requested to [write data out](commands/merge/MergeIntoCommandBase.md#writeFiles)
* `OptimizeExecutor` is requested to [runOptimizeBinJob](commands/optimize/OptimizeExecutor.md#runOptimizeBinJob)
* `RemoveColumnMappingCommand` is requested to [write data out](commands/alter/RemoveColumnMappingCommand.md#writeData)
* `UpdateCommand` is requested to [rewriteFiles](commands/update/UpdateCommand.md#rewriteFiles)
* `WriteIntoDelta` is requested to [writeAndReturnCommitData](commands/WriteIntoDelta.md#writeAndReturnCommitData) (and [writeFiles](commands/WriteIntoDelta.md#writeFiles))

---

Expand Down
33 changes: 31 additions & 2 deletions docs/commands/WriteIntoDelta.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ val writeCmd = WriteIntoDelta(
writeCmd.run(spark)
```

## <span id="canOverwriteSchema"> canOverwriteSchema
## canOverwriteSchema { #canOverwriteSchema }

??? note "ImplicitMetadataOperation"

Expand All @@ -166,14 +166,43 @@ writeCmd.run(spark)
1. This `WriteIntoDelta` is [overwrite](#isOverwriteOperation) operation
1. [replaceWhere](../spark-connector/DeltaWriteOptions.md#replaceWhere) option is not defined (in the [DeltaOptions](#options))

## <span id="isOverwriteOperation"> isOverwriteOperation
## isOverwriteOperation { #isOverwriteOperation }

```scala
isOverwriteOperation: Boolean
```

`isOverwriteOperation` is `true` for the [SaveMode](#mode) to be `SaveMode.Overwrite`.

---

`isOverwriteOperation` is used when:

* `WriteIntoDelta` is requested for the [canOverwriteSchema](#canOverwriteSchema) and to [write](#write)

## writeAndReturnCommitData { #writeAndReturnCommitData }

??? note "WriteIntoDeltaLike"

```scala
writeAndReturnCommitData(
txn: OptimisticTransaction,
sparkSession: SparkSession,
clusterBySpecOpt: Option[ClusterBySpec] = None,
isTableReplace: Boolean = false): TaggedCommitData[Action]
```

`writeAndReturnCommitData` is part of the [WriteIntoDeltaLike](WriteIntoDeltaLike.md#writeAndReturnCommitData) abstraction.

`writeAndReturnCommitData`...FIXME

### writeFiles { #writeFiles }

```scala
writeFiles(
txn: OptimisticTransaction,
data: DataFrame,
options: DeltaOptions): Seq[FileAction]
```

`writeFiles`...FIXME
36 changes: 35 additions & 1 deletion docs/commands/WriteIntoDeltaLike.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
# WriteIntoDeltaLike

`WriteIntoDeltaLike` is...FIXME
`WriteIntoDeltaLike` is an [abstraction](#contract) of [commands](#implementations) that can write data out into delta tables.

## Contract (Subset)

### writeAndReturnCommitData { #writeAndReturnCommitData }

```scala
writeAndReturnCommitData(
txn: OptimisticTransaction,
sparkSession: SparkSession,
clusterBySpecOpt: Option[ClusterBySpec] = None,
isTableReplace: Boolean = false): TaggedCommitData[Action]
```

Used when:

* `CreateDeltaTableCommand` is requested to [handleCreateTableAsSelect](create-table/CreateDeltaTableCommand.md#handleCreateTableAsSelect)
* `WriteIntoDelta` is [executed](WriteIntoDelta.md#run)
* `WriteIntoDeltaLike` is requested to [write data out](#write)

## Implementations

* [WriteIntoDelta](WriteIntoDelta.md)

## Write Data Out { #write }

```scala
write(
txn: OptimisticTransaction,
sparkSession: SparkSession,
clusterBySpecOpt: Option[ClusterBySpec] = None,
isTableReplace: Boolean = false): Seq[Action]
```

`write` [writeAndReturnCommitData](#writeAndReturnCommitData) and returns the [Action](../Action.md)s.
8 changes: 4 additions & 4 deletions docs/commands/merge/MergeIntoCommandBase.md
Original file line number Diff line number Diff line change
Expand Up @@ -464,14 +464,14 @@ Usage | Metric Name | valueToReturn
------|-------------|--------------
`ClassicMergeExecutor` to [find files to rewrite](ClassicMergeExecutor.md#findTouchedFiles) | [numSourceRows](#numSourceRows) | `true`
`ClassicMergeExecutor` to [write out merge changes](ClassicMergeExecutor.md#writeAllChanges) | [numSourceRowsInSecondScan](#numSourceRowsInSecondScan) | `true`
| [numTargetRowsCopied](#numTargetRowsCopied) | `false`
&nbsp; | [numTargetRowsCopied](#numTargetRowsCopied) | `false`
`InsertOnlyMergeExecutor` to [write out inserts](InsertOnlyMergeExecutor.md#writeOnlyInserts) | [numSourceRows](#numSourceRows) or [numSourceRowsInSecondScan](#numSourceRowsInSecondScan) | `true`
`InsertOnlyMergeExecutor` to [generateInsertsOnlyOutputCols](InsertOnlyMergeExecutor.md#generateInsertsOnlyOutputCols) | [numTargetRowsInserted](#numTargetRowsInserted) | `false`
`MergeOutputGeneration` to [generateAllActionExprs](MergeOutputGeneration.md#generateAllActionExprs) | [numTargetRowsUpdated](#numTargetRowsUpdated) | `false`
| [numTargetRowsDeleted](#numTargetRowsDeleted) | `true`
| [numTargetRowsInserted](#numTargetRowsInserted) | `false`
&nbsp; | [numTargetRowsDeleted](#numTargetRowsDeleted) | `true`
&nbsp; | [numTargetRowsInserted](#numTargetRowsInserted) | `false`

## Writing Data(Frame) Out to Delta Table { #writeFiles }
## Write Data(Frame) Out to Delta Table { #writeFiles }

```scala
writeFiles(
Expand Down

0 comments on commit 9507770

Please sign in to comment.