Skip to content

Commit

Permalink
[Spark] Migrate logging code to use Spark Structured Logging (#3293)
Browse files Browse the repository at this point in the history
## Description
Migrate Delta's logging code to use Spark structured logging. Structured
logging will be introduced in Spark 4.0, as highlighted in the [preview
release of Spark 4.0 | Apache
Spark](https://spark.apache.org/news/spark-4.0.0-preview1.html). Note
that the feature is turned off by default and the output log message
will remain unchanged.

Resolves #3145 

## How was this patch tested?
Existing tests.
  • Loading branch information
zedtang authored Jun 26, 2024
1 parent 8e3647a commit 2abc2a2
Show file tree
Hide file tree
Showing 46 changed files with 499 additions and 213 deletions.
26 changes: 17 additions & 9 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.actions.{Action, CheckpointMetadata, Metadata, SidecarFile, SingleAction}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
Expand All @@ -37,6 +38,7 @@ import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID}
import org.apache.hadoop.mapreduce.{Job, TaskType}

import org.apache.spark.TaskContext
import org.apache.spark.internal.MDC
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
Expand Down Expand Up @@ -274,7 +276,7 @@ trait Checkpoints extends DeltaLogging {
opType,
data = Map("exception" -> e.getMessage(), "stackTrace" -> e.getStackTrace())
)
logWarning(s"Error when writing checkpoint-related files", e)
logWarning("Error when writing checkpoint-related files", e)
val throwError = Utils.isTesting ||
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED)
if (throwError) throw e
Expand Down Expand Up @@ -379,8 +381,9 @@ trait Checkpoints extends DeltaLogging {
case _: FileNotFoundException =>
None
case NonFatal(e) if tries < 3 =>
logWarning(s"Failed to parse $LAST_CHECKPOINT. This may happen if there was an error " +
"during read operation, or a file appears to be partial. Sleeping and trying again.", e)
logWarning(log"Failed to parse ${MDC(DeltaLogKeys.PATH, LAST_CHECKPOINT)}. " +
log"This may happen if there was an error during read operation, " +
log"or a file appears to be partial. Sleeping and trying again.", e)
Thread.sleep(1000)
loadMetadataFromFile(tries + 1)
case NonFatal(e) =>
Expand All @@ -390,7 +393,8 @@ trait Checkpoints extends DeltaLogging {
data = Map("exception" -> Utils.exceptionString(e))
)

logWarning(s"$LAST_CHECKPOINT is corrupted. Will search the checkpoint files directly", e)
logWarning(log"${MDC(DeltaLogKeys.PATH, LAST_CHECKPOINT)} is corrupted. " +
log"Will search the checkpoint files directly", e)
// Hit a partial file. This could happen on Azure as overwriting _last_checkpoint file is
// not atomic. We will try to list all files to find the latest checkpoint and restore
// LastCheckpointInfo from it.
Expand Down Expand Up @@ -460,7 +464,7 @@ trait Checkpoints extends DeltaLogging {
// available checkpoint.
.filterNot(cv => cv.version < 0 || cv.version == CheckpointInstance.MaxValue.version)
.getOrElse {
logInfo(s"Try to find Delta last complete checkpoint")
logInfo("Try to find Delta last complete checkpoint")
eventData("listingFromZero") = true.toString
return findLastCompleteCheckpoint()
}
Expand All @@ -469,7 +473,8 @@ trait Checkpoints extends DeltaLogging {
eventData("upperBoundCheckpointType") = upperBoundCv.format.name
var iterations: Long = 0L
var numFilesScanned: Long = 0L
logInfo(s"Try to find Delta last complete checkpoint before version ${upperBoundCv.version}")
logInfo(log"Try to find Delta last complete checkpoint before version " +
log"${MDC(DeltaLogKeys.VERSION, upperBoundCv.version)}")
var listingEndVersion = upperBoundCv.version

// Do a backward listing from the upperBoundCv version. We list in chunks of 1000 versions.
Expand Down Expand Up @@ -512,12 +517,14 @@ trait Checkpoints extends DeltaLogging {
getLatestCompleteCheckpointFromList(checkpoints, Some(upperBoundCv.version))
eventData("numFilesScanned") = numFilesScanned.toString
if (lastCheckpoint.isDefined) {
logInfo(s"Delta checkpoint is found at version ${lastCheckpoint.get.version}")
logInfo(log"Delta checkpoint is found at version " +
log"${MDC(DeltaLogKeys.VERSION, lastCheckpoint.get.version)}")
return lastCheckpoint
}
listingEndVersion = listingEndVersion - 1000
}
logInfo(s"No checkpoint found for Delta table before version ${upperBoundCv.version}")
logInfo(log"No checkpoint found for Delta table before version " +
log"${MDC(DeltaLogKeys.VERSION, upperBoundCv.version)}")
None
}

Expand Down Expand Up @@ -1054,7 +1061,8 @@ object Checkpoints
try {
fs.delete(tempPath, false)
} catch { case NonFatal(e) =>
logWarning(s"Error while deleting the temporary checkpoint part file $tempPath", e)
logWarning(log"Error while deleting the temporary checkpoint part file " +
log"${MDC(DeltaLogKeys.PATH, tempPath)}", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.FileSizeHistogram
Expand All @@ -34,6 +35,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -99,12 +101,14 @@ trait RecordChecksum extends DeltaLogging {
eventData("operationSucceeded") = true
} catch {
case NonFatal(e) =>
logWarning(s"Failed to write the checksum for version: $version", e)
logWarning(log"Failed to write the checksum for version: " +
log"${MDC(DeltaLogKeys.VERSION, version)}", e)
stream.cancel()
}
} catch {
case NonFatal(e) =>
logWarning(s"Failed to write the checksum for version: $version", e)
logWarning(log"Failed to write the checksum for version: " +
log"${MDC(DeltaLogKeys.VERSION, version)}", e)
}
recordDeltaEvent(
deltaLog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import scala.collection.mutable
import org.apache.spark.sql.delta.RowId.RowTrackingMetadataDomain
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.coordinatedcommits.UpdatedActions
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaSparkPlanUtils.CheckDeterministicOptions
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.internal.{MDC, MessageWithContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, Or}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -653,13 +655,16 @@ private[delta] class ConflictChecker(
protected def logMetrics(): Unit = {
val totalTimeTakenMs = System.currentTimeMillis() - startTimeMs
val timingStr = timingStats.keys.toSeq.sorted.map(k => s"$k=${timingStats(k)}").mkString(",")
logInfo(s"[$logPrefix] Timing stats against $winningCommitVersion " +
s"[$timingStr, totalTimeTakenMs: $totalTimeTakenMs]")
logInfo(log"[" + logPrefix + log"] Timing stats against " +
log"${MDC(DeltaLogKeys.VERSION, winningCommitVersion)} " +
log"[${MDC(DeltaLogKeys.TIME_STATS, timingStr)}, totalTimeTakenMs: " +
log"${MDC(DeltaLogKeys.TIME_MS, totalTimeTakenMs)}]")
}

protected lazy val logPrefix: String = {
protected lazy val logPrefix: MessageWithContext = {
def truncate(uuid: String): String = uuid.split("-").head
s"[tableId=${truncate(initialCurrentTransactionInfo.readSnapshot.metadata.id)}," +
s"txnId=${truncate(initialCurrentTransactionInfo.txnId)}] "
log"[tableId=${MDC(DeltaLogKeys.TABLE_ID,
truncate(initialCurrentTransactionInfo.readSnapshot.metadata.id))}," +
log"txnId=${MDC(DeltaLogKeys.TXN_ID, truncate(initialCurrentTransactionInfo.txnId))}] "
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Futu
import scala.concurrent.duration.Duration

import org.apache.spark.sql.delta.actions.{Action, CommitInfo, CommitMarker, JobInfo, NotebookInfo}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
Expand All @@ -37,6 +38,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkEnv
import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
Expand Down Expand Up @@ -626,8 +628,10 @@ object DeltaHistoryManager extends DeltaLogging {
val prevTimestamp = commits(i).getTimestamp
assert(commits(i).getVersion < commits(i + 1).getVersion, "Unordered commits provided.")
if (prevTimestamp >= commits(i + 1).getTimestamp) {
logWarning(s"Found Delta commit ${commits(i).getVersion} with a timestamp $prevTimestamp " +
s"which is greater than the next commit timestamp ${commits(i + 1).getTimestamp}.")
logWarning(log"Found Delta commit ${MDC(DeltaLogKeys.VERSION, commits(i).getVersion)} " +
log"with a timestamp ${MDC(DeltaLogKeys.TIMESTAMP, prevTimestamp)} " +
log"which is greater than the next commit timestamp " +
log"${MDC(DeltaLogKeys.TIMESTAMP2, commits(i + 1).getTimestamp)}.")
commits(i + 1) = commits(i + 1).withTimestamp(prevTimestamp + 1).asInstanceOf[T]
}
i += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs._

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{LoggingShims, MDC}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, PartitionDirectory}
Expand All @@ -39,7 +40,7 @@ case class DeltaLogFileIndex private (
format: FileFormat,
files: Array[FileStatus])
extends FileIndex
with Logging {
with LoggingShims {

import DeltaLogFileIndex._

Expand Down Expand Up @@ -81,7 +82,7 @@ case class DeltaLogFileIndex private (
override def toString: String =
s"DeltaLogFileIndex($format, numFilesInSegment: ${files.size}, totalFileSize: $sizeInBytes)"

logInfo(s"Created $this")
logInfo(log"Created ${MDC(DeltaLogKeys.FILE_INDEX, this)}")
}

object DeltaLogFileIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.DeltaParquetFileFormat._
import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata, Protocol}
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, KeepAllRowsFilter, KeepMarkedRowsFilter}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.conf.Configuration
Expand All @@ -32,6 +33,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.util.ContextUtil

import org.apache.spark.internal.{LoggingShims, MDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataStructField
Expand Down Expand Up @@ -59,7 +61,8 @@ case class DeltaParquetFileFormat(
optimizationsEnabled: Boolean = true,
tablePath: Option[String] = None,
isCDCRead: Boolean = false)
extends ParquetFileFormat {
extends ParquetFileFormat
with LoggingShims {
// Validate either we have all arguments for DV enabled read or none of them.
if (hasTablePath) {
SparkSession.getActiveSession.map { session =>
Expand Down Expand Up @@ -513,7 +516,7 @@ case class DeltaParquetFileFormat(
case AlwaysTrue() => Some(AlwaysTrue())
case AlwaysFalse() => Some(AlwaysFalse())
case _ =>
logError(s"Failed to translate filter $filter")
logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}")
None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.internal.MDC
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
// scalastyle:on import.ordering.noEmptyLine
Expand Down Expand Up @@ -82,8 +83,8 @@ object DeltaTableIdentifier extends DeltaLogging {
catalog.databaseExists(identifier.database.get) && catalog.tableExists(identifier)
} catch {
case e: AnalysisException if gluePermissionError(e) =>
logWarning("Received an access denied error from Glue. Will check to see if this " +
s"identifier ($identifier) is path based.", e)
logWarning(log"Received an access denied error from Glue. Will check to see if this " +
log"identifier (${MDC(DeltaLogKeys.TABLE_NAME, identifier)}) is path based.", e)
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.sql.delta.FileMetadataMaterializationTracker.TaskLevelPermitAllocator
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{LoggingShims, MDC}

/**
* An instance of this class tracks and controls the materialization usage of a single command
Expand All @@ -40,7 +41,7 @@ import org.apache.spark.internal.Logging
* Accessed by the thread materializing files and by the thread releasing resources after execution.
*
*/
class FileMetadataMaterializationTracker extends Logging {
class FileMetadataMaterializationTracker extends LoggingShims {

/** The number of permits allocated from the global file materialization semaphore */
@volatile private var numPermitsFromSemaphore: Int = 0
Expand Down Expand Up @@ -79,7 +80,8 @@ class FileMetadataMaterializationTracker extends Logging {
val startTime = System.currentTimeMillis()
FileMetadataMaterializationTracker.overAllocationLock.acquire(1)
val waitTime = System.currentTimeMillis() - startTime
logInfo(s"Acquired over allocation lock for this query in $waitTime ms")
logInfo(log"Acquired over allocation lock for this query in " +
log"${MDC(DeltaLogKeys.TIME_MS, waitTime)} ms")
materializationMetrics.overAllocWaitTimeMs += waitTime
materializationMetrics.overAllocWaitCount += 1
materializationMetrics.overAllocFilesMaterializedCount += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, Protocol}
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils

import org.apache.spark.internal.MDC
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -163,14 +165,16 @@ case class IcebergCompat(

// Update Protocol and Metadata if necessary
val protocolResult = if (tblFeatureUpdates.nonEmpty) {
logInfo(s"[tableId=$tableId] IcebergCompatV1 auto-supporting table features: " +
s"${tblFeatureUpdates.map(_.name)}")
logInfo(log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, tableId)}] " +
log"IcebergCompatV1 auto-supporting table features: " +
log"${MDC(DeltaLogKeys.TABLE_FEATURES, tblFeatureUpdates.map(_.name))}")
Some(newestProtocol.merge(tblFeatureUpdates.map(Protocol.forTableFeature).toSeq: _*))
} else None

val metadataResult = if (tblPropertyUpdates.nonEmpty) {
logInfo(s"[tableId=$tableId] IcebergCompatV1 auto-setting table properties: " +
s"$tblPropertyUpdates")
logInfo(log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, tableId)}] " +
log"IcebergCompatV1 auto-setting table properties: " +
log"${MDC(DeltaLogKeys.TBL_PROPERTIES, tblPropertyUpdates)}")
val newConfiguration = newestMetadata.configuration ++ tblPropertyUpdates.toMap
var tmpNewMetadata = newestMetadata.copy(configuration = newConfiguration)

Expand Down
Loading

0 comments on commit 2abc2a2

Please sign in to comment.