Skip to content

Commit

Permalink
[Spark] Add trait for generic executor observer thread local storage (#…
Browse files Browse the repository at this point in the history
…3307)

## Description
Add generic trait for thread local execution observers to extend and
implement.

## How was this patch tested?
Unit tests
  • Loading branch information
leonwind-db authored Jun 25, 2024
1 parent 715f45e commit e054904
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class DeltaLog private(
def startTransaction(
catalogTableOpt: Option[CatalogTable],
snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = {
TransactionExecutionObserver.threadObserver.get().startingTransaction {
TransactionExecutionObserver.getObserver.startingTransaction {
new OptimisticTransaction(this, catalogTableOpt, snapshotOpt)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite

/** Contains the execution instrumentation set via thread-local. No-op by default. */
protected[delta] var executionObserver: TransactionExecutionObserver =
TransactionExecutionObserver.threadObserver.get()
TransactionExecutionObserver.getObserver

/**
* Stores the updated metadata (if any) that will result from this txn.
Expand Down Expand Up @@ -2140,7 +2140,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
// Get thread local observer for Fuzz testing purpose.
val executionObserver = TransactionExecutionObserver.threadObserver.get()
val executionObserver = TransactionExecutionObserver.getObserver
val commitFile = util.FileNames.unsafeDeltaFile(logPath, commitVersion)
val commitFileStatus =
doCommit(logStore, hadoopConf, logPath, commitFile, commitVersion, actions)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

trait ThreadStorageExecutionObserver[T <: ChainableExecutionObserver[T]] {
/** Thread-local observer instance loaded by [[T]] */
protected val threadObserver: ThreadLocal[T] = ThreadLocal.withInitial(() => initialValue)
protected def initialValue: T

def getObserver: T = threadObserver.get()

def setObserver(observer: T): Unit = threadObserver.set(observer)

/** Instrument all executions created and completed within `thunk` with `newObserver`. */
def withObserver[S](newObserver: T)(thunk: => S): S = {
val oldObserver = threadObserver.get()
threadObserver.set(newObserver)
try {
thunk
} finally {
// reset
threadObserver.set(oldObserver)
}
}

/** Update the current thread observer with its next one. */
def advanceToNextObserver(): Unit = threadObserver.get.advanceToNextThreadObserver()
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,37 +82,15 @@ trait TransactionExecutionObserver
def transactionAborted(): Unit

override def advanceToNextThreadObserver(): Unit = {
TransactionExecutionObserver.threadObserver.set(
TransactionExecutionObserver.setObserver(
nextObserver.getOrElse(NoOpTransactionExecutionObserver))
}
}

object TransactionExecutionObserver {
/** Thread-local observer instance loaded by [[DeltaLog]] and [[OptimisticTransaction]]. */
val threadObserver: ThreadLocal[TransactionExecutionObserver] =
ThreadLocal.withInitial(() => NoOpTransactionExecutionObserver)

/**
* Instrument all transactions created and completed within `thunk` with `observer`.
*
* *Note 1:* Closing over existing transactions with `thunk` will have no effect.
* *Note 2:* Do not leak transactions created within `thunk` via the return value.
* *Note 3:* Do not create threads with new transactions within `thunk`.
* The observer information is not copied to children threads automatically.
*
* If you need more flexible usage of [[TransactionExecutionObserver]] use
* `TransactionExecutionObserver.threadObserver.set()` instead.
*/
def withObserver[T](observer: TransactionExecutionObserver)(thunk: => T): T = {
val oldObserver = threadObserver.get()
threadObserver.set(observer)
try {
thunk
} finally {
// reset
threadObserver.set(oldObserver)
}
}
object TransactionExecutionObserver
extends ThreadStorageExecutionObserver[TransactionExecutionObserver] {
override protected val initialValue: TransactionExecutionObserver =
NoOpTransactionExecutionObserver
}

/** Default observer does nothing. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ trait AbstractBatchBackfillingCommitCoordinatorClient extends CommitCoordinatorC
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
val executionObserver = TransactionExecutionObserver.threadObserver.get()
val executionObserver = TransactionExecutionObserver.getObserver
val tablePath = CoordinatedCommitsUtils.getTablePath(logPath)
if (commitVersion == 0) {
throw CommitFailedException(
Expand Down

0 comments on commit e054904

Please sign in to comment.