diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index b9ee1de5a9c..91b12531613 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -25,6 +25,7 @@ import scala.util.control.NonFatal // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.actions.{Action, CheckpointMetadata, Metadata, SidecarFile, SingleAction} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LogStore @@ -37,6 +38,7 @@ import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID} import org.apache.hadoop.mapreduce.{Job, TaskType} import org.apache.spark.TaskContext +import org.apache.spark.internal.MDC import org.apache.spark.paths.SparkPath import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -274,7 +276,7 @@ trait Checkpoints extends DeltaLogging { opType, data = Map("exception" -> e.getMessage(), "stackTrace" -> e.getStackTrace()) ) - logWarning(s"Error when writing checkpoint-related files", e) + logWarning("Error when writing checkpoint-related files", e) val throwError = Utils.isTesting || spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED) if (throwError) throw e @@ -379,8 +381,9 @@ trait Checkpoints extends DeltaLogging { case _: FileNotFoundException => None case NonFatal(e) if tries < 3 => - logWarning(s"Failed to parse $LAST_CHECKPOINT. This may happen if there was an error " + - "during read operation, or a file appears to be partial. Sleeping and trying again.", e) + logWarning(log"Failed to parse ${MDC(DeltaLogKeys.PATH, LAST_CHECKPOINT)}. " + + log"This may happen if there was an error during read operation, " + + log"or a file appears to be partial. Sleeping and trying again.", e) Thread.sleep(1000) loadMetadataFromFile(tries + 1) case NonFatal(e) => @@ -390,7 +393,8 @@ trait Checkpoints extends DeltaLogging { data = Map("exception" -> Utils.exceptionString(e)) ) - logWarning(s"$LAST_CHECKPOINT is corrupted. Will search the checkpoint files directly", e) + logWarning(log"${MDC(DeltaLogKeys.PATH, LAST_CHECKPOINT)} is corrupted. " + + log"Will search the checkpoint files directly", e) // Hit a partial file. This could happen on Azure as overwriting _last_checkpoint file is // not atomic. We will try to list all files to find the latest checkpoint and restore // LastCheckpointInfo from it. @@ -460,7 +464,7 @@ trait Checkpoints extends DeltaLogging { // available checkpoint. .filterNot(cv => cv.version < 0 || cv.version == CheckpointInstance.MaxValue.version) .getOrElse { - logInfo(s"Try to find Delta last complete checkpoint") + logInfo("Try to find Delta last complete checkpoint") eventData("listingFromZero") = true.toString return findLastCompleteCheckpoint() } @@ -469,7 +473,8 @@ trait Checkpoints extends DeltaLogging { eventData("upperBoundCheckpointType") = upperBoundCv.format.name var iterations: Long = 0L var numFilesScanned: Long = 0L - logInfo(s"Try to find Delta last complete checkpoint before version ${upperBoundCv.version}") + logInfo(log"Try to find Delta last complete checkpoint before version " + + log"${MDC(DeltaLogKeys.VERSION, upperBoundCv.version)}") var listingEndVersion = upperBoundCv.version // Do a backward listing from the upperBoundCv version. We list in chunks of 1000 versions. @@ -512,12 +517,14 @@ trait Checkpoints extends DeltaLogging { getLatestCompleteCheckpointFromList(checkpoints, Some(upperBoundCv.version)) eventData("numFilesScanned") = numFilesScanned.toString if (lastCheckpoint.isDefined) { - logInfo(s"Delta checkpoint is found at version ${lastCheckpoint.get.version}") + logInfo(log"Delta checkpoint is found at version " + + log"${MDC(DeltaLogKeys.VERSION, lastCheckpoint.get.version)}") return lastCheckpoint } listingEndVersion = listingEndVersion - 1000 } - logInfo(s"No checkpoint found for Delta table before version ${upperBoundCv.version}") + logInfo(log"No checkpoint found for Delta table before version " + + log"${MDC(DeltaLogKeys.VERSION, upperBoundCv.version)}") None } @@ -1054,7 +1061,8 @@ object Checkpoints try { fs.delete(tempPath, false) } catch { case NonFatal(e) => - logWarning(s"Error while deleting the temporary checkpoint part file $tempPath", e) + logWarning(log"Error while deleting the temporary checkpoint part file " + + log"${MDC(DeltaLogKeys.PATH, tempPath)}", e) } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala index 1a352c99626..67f46a32e90 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import org.apache.spark.sql.delta.actions._ +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats.FileSizeHistogram @@ -34,6 +35,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.Path +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.util.Utils @@ -99,12 +101,14 @@ trait RecordChecksum extends DeltaLogging { eventData("operationSucceeded") = true } catch { case NonFatal(e) => - logWarning(s"Failed to write the checksum for version: $version", e) + logWarning(log"Failed to write the checksum for version: " + + log"${MDC(DeltaLogKeys.VERSION, version)}", e) stream.cancel() } } catch { case NonFatal(e) => - logWarning(s"Failed to write the checksum for version: $version", e) + logWarning(log"Failed to write the checksum for version: " + + log"${MDC(DeltaLogKeys.VERSION, version)}", e) } recordDeltaEvent( deltaLog, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala index 83cc97a4786..fffc919270a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala @@ -24,12 +24,14 @@ import scala.collection.mutable import org.apache.spark.sql.delta.RowId.RowTrackingMetadataDomain import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.coordinatedcommits.UpdatedActions +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.DeltaSparkPlanUtils.CheckDeterministicOptions import org.apache.spark.sql.delta.util.FileNames import org.apache.hadoop.fs.FileStatus +import org.apache.spark.internal.{MDC, MessageWithContext} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, Or} import org.apache.spark.sql.types.StructType @@ -653,13 +655,16 @@ private[delta] class ConflictChecker( protected def logMetrics(): Unit = { val totalTimeTakenMs = System.currentTimeMillis() - startTimeMs val timingStr = timingStats.keys.toSeq.sorted.map(k => s"$k=${timingStats(k)}").mkString(",") - logInfo(s"[$logPrefix] Timing stats against $winningCommitVersion " + - s"[$timingStr, totalTimeTakenMs: $totalTimeTakenMs]") + logInfo(log"[" + logPrefix + log"] Timing stats against " + + log"${MDC(DeltaLogKeys.VERSION, winningCommitVersion)} " + + log"[${MDC(DeltaLogKeys.TIME_STATS, timingStr)}, totalTimeTakenMs: " + + log"${MDC(DeltaLogKeys.TIME_MS, totalTimeTakenMs)}]") } - protected lazy val logPrefix: String = { + protected lazy val logPrefix: MessageWithContext = { def truncate(uuid: String): String = uuid.split("-").head - s"[tableId=${truncate(initialCurrentTransactionInfo.readSnapshot.metadata.id)}," + - s"txnId=${truncate(initialCurrentTransactionInfo.txnId)}] " + log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, + truncate(initialCurrentTransactionInfo.readSnapshot.metadata.id))}," + + log"txnId=${MDC(DeltaLogKeys.TXN_ID, truncate(initialCurrentTransactionInfo.txnId))}] " } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala index 77c0ff6431f..e9a31e0a057 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala @@ -26,6 +26,7 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Futu import scala.concurrent.duration.Duration import org.apache.spark.sql.delta.actions.{Action, CommitInfo, CommitMarker, JobInfo, NotebookInfo} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LogStore @@ -37,6 +38,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.SparkEnv +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -626,8 +628,10 @@ object DeltaHistoryManager extends DeltaLogging { val prevTimestamp = commits(i).getTimestamp assert(commits(i).getVersion < commits(i + 1).getVersion, "Unordered commits provided.") if (prevTimestamp >= commits(i + 1).getTimestamp) { - logWarning(s"Found Delta commit ${commits(i).getVersion} with a timestamp $prevTimestamp " + - s"which is greater than the next commit timestamp ${commits(i + 1).getTimestamp}.") + logWarning(log"Found Delta commit ${MDC(DeltaLogKeys.VERSION, commits(i).getVersion)} " + + log"with a timestamp ${MDC(DeltaLogKeys.TIMESTAMP, prevTimestamp)} " + + log"which is greater than the next commit timestamp " + + log"${MDC(DeltaLogKeys.TIMESTAMP2, commits(i + 1).getTimestamp)}.") commits(i + 1) = commits(i + 1).withTimestamp(prevTimestamp + 1).asInstanceOf[T] } i += 1 diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLogFileIndex.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLogFileIndex.scala index de111aadd3b..ed368f0f253 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLogFileIndex.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLogFileIndex.scala @@ -16,10 +16,11 @@ package org.apache.spark.sql.delta +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.util.FileNames import org.apache.hadoop.fs._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LoggingShims, MDC} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, PartitionDirectory} @@ -39,7 +40,7 @@ case class DeltaLogFileIndex private ( format: FileFormat, files: Array[FileStatus]) extends FileIndex - with Logging { + with LoggingShims { import DeltaLogFileIndex._ @@ -81,7 +82,7 @@ case class DeltaLogFileIndex private ( override def toString: String = s"DeltaLogFileIndex($format, numFilesInSegment: ${files.size}, totalFileSize: $sizeInBytes)" - logInfo(s"Created $this") + logInfo(log"Created ${MDC(DeltaLogKeys.FILE_INDEX, this)}") } object DeltaLogFileIndex { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index 67b01e55c73..9beab0fdee9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.DeltaParquetFileFormat._ import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata, Protocol} import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, KeepAllRowsFilter, KeepMarkedRowsFilter} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.hadoop.conf.Configuration @@ -32,6 +33,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.parquet.hadoop.util.ContextUtil +import org.apache.spark.internal.{LoggingShims, MDC} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataStructField @@ -59,7 +61,8 @@ case class DeltaParquetFileFormat( optimizationsEnabled: Boolean = true, tablePath: Option[String] = None, isCDCRead: Boolean = false) - extends ParquetFileFormat { + extends ParquetFileFormat + with LoggingShims { // Validate either we have all arguments for DV enabled read or none of them. if (hasTablePath) { SparkSession.getActiveSession.map { session => @@ -513,7 +516,7 @@ case class DeltaParquetFileFormat( case AlwaysTrue() => Some(AlwaysTrue()) case AlwaysFalse() => Some(AlwaysFalse()) case _ => - logError(s"Failed to translate filter $filter") + logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}") None } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTableIdentifier.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTableIdentifier.scala index cc96b07e4bb..ff323d31ef4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTableIdentifier.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTableIdentifier.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSourceUtils import org.apache.hadoop.fs.Path -import org.apache.spark.internal.Logging +import org.apache.spark.internal.MDC import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier // scalastyle:on import.ordering.noEmptyLine @@ -82,8 +83,8 @@ object DeltaTableIdentifier extends DeltaLogging { catalog.databaseExists(identifier.database.get) && catalog.tableExists(identifier) } catch { case e: AnalysisException if gluePermissionError(e) => - logWarning("Received an access denied error from Glue. Will check to see if this " + - s"identifier ($identifier) is path based.", e) + logWarning(log"Received an access denied error from Glue. Will check to see if this " + + log"identifier (${MDC(DeltaLogKeys.TABLE_NAME, identifier)}) is path based.", e) false } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/FileMetadataMaterializationTracker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/FileMetadataMaterializationTracker.scala index 8b2272717d7..73fcf503c3f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/FileMetadataMaterializationTracker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/FileMetadataMaterializationTracker.scala @@ -20,9 +20,10 @@ import java.util.concurrent.Semaphore import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.sql.delta.FileMetadataMaterializationTracker.TaskLevelPermitAllocator +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LoggingShims, MDC} /** * An instance of this class tracks and controls the materialization usage of a single command @@ -40,7 +41,7 @@ import org.apache.spark.internal.Logging * Accessed by the thread materializing files and by the thread releasing resources after execution. * */ -class FileMetadataMaterializationTracker extends Logging { +class FileMetadataMaterializationTracker extends LoggingShims { /** The number of permits allocated from the global file materialization semaphore */ @volatile private var numPermitsFromSemaphore: Int = 0 @@ -79,7 +80,8 @@ class FileMetadataMaterializationTracker extends Logging { val startTime = System.currentTimeMillis() FileMetadataMaterializationTracker.overAllocationLock.acquire(1) val waitTime = System.currentTimeMillis() - startTime - logInfo(s"Acquired over allocation lock for this query in $waitTime ms") + logInfo(log"Acquired over allocation lock for this query in " + + log"${MDC(DeltaLogKeys.TIME_MS, waitTime)} ms") materializationMetrics.overAllocWaitTimeMs += waitTime materializationMetrics.overAllocWaitCount += 1 materializationMetrics.overAllocFilesMaterializedCount += 1 diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala index 3910e37bcea..515e0be1850 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, Protocol} import org.apache.spark.sql.delta.commands.DeletionVectorUtils +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.internal.MDC import org.apache.spark.sql.types._ /** @@ -163,14 +165,16 @@ case class IcebergCompat( // Update Protocol and Metadata if necessary val protocolResult = if (tblFeatureUpdates.nonEmpty) { - logInfo(s"[tableId=$tableId] IcebergCompatV1 auto-supporting table features: " + - s"${tblFeatureUpdates.map(_.name)}") + logInfo(log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, tableId)}] " + + log"IcebergCompatV1 auto-supporting table features: " + + log"${MDC(DeltaLogKeys.TABLE_FEATURES, tblFeatureUpdates.map(_.name))}") Some(newestProtocol.merge(tblFeatureUpdates.map(Protocol.forTableFeature).toSeq: _*)) } else None val metadataResult = if (tblPropertyUpdates.nonEmpty) { - logInfo(s"[tableId=$tableId] IcebergCompatV1 auto-setting table properties: " + - s"$tblPropertyUpdates") + logInfo(log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, tableId)}] " + + log"IcebergCompatV1 auto-setting table properties: " + + log"${MDC(DeltaLogKeys.TBL_PROPERTIES, tblPropertyUpdates)}") val newConfiguration = newestMetadata.configuration ++ tblPropertyUpdates.toMap var tmpNewMetadata = newestMetadata.copy(configuration = newConfiguration) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala index 8b0610188d3..6de70ef4eb4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala @@ -23,12 +23,15 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, MINUTE, TruncationGranularity} import org.apache.spark.sql.delta.actions.{Action, Metadata} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.FileNames import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, listingPrefix, CheckpointFile, DeltaFile, UnbackfilledDeltaFile} import org.apache.commons.lang3.time.DateUtils import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.spark.internal.MDC + private[delta] object TruncationGranularity extends Enumeration { type TruncationGranularity = Value val DAY, HOUR, MINUTE = Value @@ -68,7 +71,8 @@ trait MetadataCleanup extends DeltaLogging { val fileCutOffTime = truncateDate(clock.getTimeMillis() - retentionMillis, cutoffTruncationGranularity).getTime val formattedDate = fileCutOffTime.toGMTString - logInfo(s"Starting the deletion of log files older than $formattedDate") + logInfo(log"Starting the deletion of log files older than " + + log"${MDC(DeltaLogKeys.DATE, formattedDate)}") val fs = logPath.getFileSystem(newDeltaHadoopConf()) var numDeleted = 0 @@ -81,7 +85,8 @@ trait MetadataCleanup extends DeltaLogging { // compat-checkpoint available. val v2CompatCheckpointMetrics = new V2CompatCheckpointMetrics createSinglePartCheckpointForBackwardCompat(snapshotToCleanup, v2CompatCheckpointMetrics) - logInfo(s"Compatibility checkpoint creation metrics: $v2CompatCheckpointMetrics") + logInfo(log"Compatibility checkpoint creation metrics: " + + log"${MDC(DeltaLogKeys.METRICS, v2CompatCheckpointMetrics)}") } var wasCheckpointDeleted = false var maxBackfilledVersionDeleted = -1L @@ -121,10 +126,11 @@ trait MetadataCleanup extends DeltaLogging { snapshotToCleanup, fileCutOffTime.getTime, sidecarDeletionMetrics) - logInfo(s"Sidecar deletion metrics: $sidecarDeletionMetrics") + logInfo(log"Sidecar deletion metrics: ${MDC(DeltaLogKeys.METRICS, sidecarDeletionMetrics)}") } - logInfo(s"Deleted $numDeleted log files and $numDeletedUnbackfilled unbackfilled commit " + - s"files older than $formattedDate") + logInfo(log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, numDeleted)} log files and " + + log"${MDC(DeltaLogKeys.NUM_FILES2, numDeletedUnbackfilled)} unbackfilled commit " + + log"files older than ${MDC(DeltaLogKeys.DATE, formattedDate)}") } } @@ -296,10 +302,10 @@ trait MetadataCleanup extends DeltaLogging { .collect { case file if file.getModificationTime < retentionTimestamp => file.getPath } .filterNot(path => activeSidecarFiles.contains(path.getName)) val sidecarDeletionStartTimeMs = System.currentTimeMillis() - logInfo(s"Starting the deletion of unreferenced sidecar files") + logInfo("Starting the deletion of unreferenced sidecar files") val count = deleteMultiple(fs, sidecarFilesToDelete) - logInfo(s"Deleted $count sidecar files") + logInfo(log"Deleted ${MDC(DeltaLogKeys.COUNT, count)} sidecar files") metrics.numSidecarFilesDeleted = count val endTimeMs = System.currentTimeMillis() metrics.identifyAndDeleteSidecarsTimeTakenMs = diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 88dab997532..c564479bc36 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.delta.coordinatedcommits._ import org.apache.spark.sql.delta.files._ import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, HudiConverterHook, IcebergConverterHook, PostCommitHook, UpdateCatalogFactory} import org.apache.spark.sql.delta.implicits.addFileEncoder +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -48,6 +49,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.SparkException +import org.apache.spark.internal.{MDC, MessageWithContext} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions._ @@ -643,7 +645,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite protocol, oldMetadata = snapshot.metadata, newMetadataTmp) assertMetadata(newMetadataTmp) - logInfo(s"Updated metadata from ${newMetadata.getOrElse("-")} to $newMetadataTmp") + logInfo(log"Updated metadata from " + + log"${MDC(DeltaLogKeys.METADATA_OLD, newMetadata.getOrElse("-"))} to " + + log"${MDC(DeltaLogKeys.METADATA_NEW, newMetadataTmp)}") newMetadata = Some(newMetadataTmp) } @@ -1209,7 +1213,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite val (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo) = doCommitRetryIteratively(firstAttemptVersion, currentTransactionInfo, isolationLevelToUse) - logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}") + logInfo(log"Committed delta #${MDC(DeltaLogKeys.VERSION, commitVersion)} to " + + log"${MDC(DeltaLogKeys.PATH, deltaLog.logPath)}") (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo.actions) } catch { case e: DeltaConcurrentModificationException => @@ -1565,17 +1570,20 @@ trait OptimisticTransactionImpl extends TransactionalWrite // FS -> CC conversion val (commitCoordinatorName, commitCoordinatorConf) = CoordinatedCommitsUtils.getCoordinatedCommitsConfs(finalMetadata) - logInfo(s"Table ${deltaLog.logPath} transitioning from file-system based table to " + - s"coordinated-commits table: [commit-coordinator: $commitCoordinatorName, " + - s"conf: $commitCoordinatorConf]") + logInfo(log"Table ${MDC(DeltaLogKeys.PATH, deltaLog.logPath)} transitioning from " + + log"file-system based table to coordinated-commits table: " + + log"[commit-coordinator: ${MDC(DeltaLogKeys.COORDINATOR_NAME, commitCoordinatorName)}" + + log", conf: ${MDC(DeltaLogKeys.COORDINATOR_CONF, commitCoordinatorConf)}]") newCoordinatedCommitsTableConf = Some(newCommitCoordinatorClient.registerTable( deltaLog.logPath, readVersion, finalMetadata, protocol)) case (None, Some(readCommitCoordinatorClient)) => // CC -> FS conversion val (newOwnerName, newOwnerConf) = CoordinatedCommitsUtils.getCoordinatedCommitsConfs(snapshot.metadata) - logInfo(s"Table ${deltaLog.logPath} transitioning from coordinated-commits table to " + - s"file-system table: [commit-coordinator: $newOwnerName, conf: $newOwnerConf]") + logInfo(log"Table ${MDC(DeltaLogKeys.PATH, deltaLog.logPath)} transitioning from " + + log"coordinated-commits table to file-system table: " + + log"[commit-coordinator: ${MDC(DeltaLogKeys.COORDINATOR_NAME, newOwnerName)}, " + + log"conf: ${MDC(DeltaLogKeys.COORDINATOR_CONF, newOwnerConf)}]") case (Some(newCommitCoordinatorClient), Some(readCommitCoordinatorClient)) if !readCommitCoordinatorClient.semanticsEquals(newCommitCoordinatorClient) => // CC1 -> CC2 conversion is not allowed. @@ -1615,7 +1623,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite throw DeltaErrors.invalidCommittedVersion(attemptVersion, currentSnapshot.version) } - logInfo(s"Committed delta #$attemptVersion to ${deltaLog.logPath}. Wrote $commitSize actions.") + logInfo(log"Committed delta #${MDC(DeltaLogKeys.VERSION, attemptVersion)} to " + + log"${MDC(DeltaLogKeys.PATH, deltaLog.logPath)}. Wrote " + + log"${MDC(DeltaLogKeys.NUM_ACTIONS, commitSize)} actions.") deltaLog.checkpoint(currentSnapshot) currentSnapshot @@ -2014,8 +2024,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite isolationLevel: IsolationLevel): Snapshot = { val actions = currentTransactionInfo.finalActionsToCommit logInfo( - s"Attempting to commit version $attemptVersion with ${actions.size} actions with " + - s"$isolationLevel isolation level") + log"Attempting to commit version ${MDC(DeltaLogKeys.VERSION, attemptVersion)} with " + + log"${MDC(DeltaLogKeys.NUM_ACTIONS, actions.size)} actions with " + + log"${MDC(DeltaLogKeys.ISOLATION_LEVEL, isolationLevel)} isolation level") if (readVersion > -1 && metadata.id != snapshot.metadata.id) { val msg = s"Change in the table id detected in txn. Table id for txn on table at " + @@ -2270,8 +2281,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite assert(mismatch.isEmpty, s"Expected ${mismatch.map(_._1).mkString(",")} but got ${mismatch.map(_._2).mkString(",")}") - val logPrefixStr = s"[attempt $attemptNumber]" - val txnDetailsLogStr = { + val logPrefix = log"[attempt ${MDC(DeltaLogKeys.ATTEMPT, attemptNumber)}] " + val txnDetailsLog = { var adds = 0L var removes = 0L currentTransactionInfo.actions.foreach { @@ -2279,12 +2290,17 @@ trait OptimisticTransactionImpl extends TransactionalWrite case _: RemoveFile => removes += 1 case _ => } - s"$adds adds, $removes removes, ${readPredicates.size} read predicates, " + - s"${readFiles.size} read files" + log"${MDC(DeltaLogKeys.NUM_ACTIONS, adds)} adds, " + + log"${MDC(DeltaLogKeys.NUM_ACTIONS2, removes)} removes, " + + log"${MDC(DeltaLogKeys.NUM_PREDICATES, readPredicates.size)} read predicates, " + + log"${MDC(DeltaLogKeys.NUM_FILES, readFiles.size)} read files" } - logInfo(s"$logPrefixStr Checking for conflicts with versions " + - s"[$checkVersion, $nextAttemptVersion) with current txn having $txnDetailsLogStr") + logInfo(logPrefix + + log"Checking for conflicts with versions " + + log"[${MDC(DeltaLogKeys.VERSION, checkVersion)}, " + + log"${MDC(DeltaLogKeys.VERSION2, nextAttemptVersion)}) " + + log"with current txn having " + txnDetailsLog) var updatedCurrentTransactionInfo = currentTransactionInfo (checkVersion until nextAttemptVersion) @@ -2294,13 +2310,19 @@ trait OptimisticTransactionImpl extends TransactionalWrite updatedCurrentTransactionInfo, otherCommitFileStatus, commitIsolationLevel) - logInfo(s"$logPrefixStr No conflicts in version $otherCommitVersion, " + - s"${clock.getTimeMillis() - commitAttemptStartTimeMillis} ms since start") + logInfo(logPrefix + + log"No conflicts in version ${MDC(DeltaLogKeys.VERSION, otherCommitVersion)}, " + + log"${MDC(DeltaLogKeys.DURATION, + clock.getTimeMillis() - commitAttemptStartTimeMillis)} ms since start") } - logInfo(s"$logPrefixStr No conflicts with versions [$checkVersion, $nextAttemptVersion) " + - s"with current txn having $txnDetailsLogStr, " + - s"${clock.getTimeMillis() - commitAttemptStartTimeMillis} ms since start") + logInfo(logPrefix + + log"No conflicts with versions " + + log"[${MDC(DeltaLogKeys.VERSION, checkVersion)}, " + + log"${MDC(DeltaLogKeys.VERSION2, nextAttemptVersion)}) " + + log"with current txn having " + txnDetailsLog + + log"${MDC(DeltaLogKeys.TIME_MS, clock.getTimeMillis() - commitAttemptStartTimeMillis)} " + + log"ms since start") (nextAttemptVersion, updatedCurrentTransactionInfo) } } @@ -2369,8 +2391,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite hook.run(spark, this, version, postCommitSnapshot, committedActions) } catch { case NonFatal(e) => - logWarning(s"Error when executing post-commit hook ${hook.name} " + - s"for commit $version", e) + logWarning(log"Error when executing post-commit hook " + + log"${MDC(DeltaLogKeys.HOOK_NAME, hook.name)} " + + log"for commit ${MDC(DeltaLogKeys.VERSION, version)}", e) recordDeltaEvent(deltaLog, "delta.commit.hook.failure", data = Map( "hook" -> hook.name, "version" -> version, @@ -2383,28 +2406,29 @@ trait OptimisticTransactionImpl extends TransactionalWrite private[delta] def unregisterPostCommitHooksWhere(predicate: PostCommitHook => Boolean): Unit = postCommitHooks --= postCommitHooks.filter(predicate) - protected lazy val logPrefix: String = { + protected lazy val logPrefix: MessageWithContext = { def truncate(uuid: String): String = uuid.split("-").head - s"[tableId=${truncate(snapshot.metadata.id)},txnId=${truncate(txnId)}] " + log"[tableId=${MDC(DeltaLogKeys.METADATA_ID, truncate(snapshot.metadata.id))}," + + log"txnId=${MDC(DeltaLogKeys.TXN_ID, truncate(txnId))}] " } - override def logInfo(msg: => String): Unit = { + def logInfo(msg: MessageWithContext): Unit = { super.logInfo(logPrefix + msg) } - override def logWarning(msg: => String): Unit = { + def logWarning(msg: MessageWithContext): Unit = { super.logWarning(logPrefix + msg) } - override def logWarning(msg: => String, throwable: Throwable): Unit = { + def logWarning(msg: MessageWithContext, throwable: Throwable): Unit = { super.logWarning(logPrefix + msg, throwable) } - override def logError(msg: => String): Unit = { + def logError(msg: MessageWithContext): Unit = { super.logError(logPrefix + msg) } - override def logError(msg: => String, throwable: Throwable): Unit = { + def logError(msg: MessageWithContext, throwable: Throwable): Unit = { super.logError(logPrefix + msg, throwable) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala index 55091892489..9e10bc4dd72 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala @@ -36,10 +36,10 @@ trait ProvidesUniFormConverters { self: DeltaLog => constructor.newInstance(spark) } catch { case e: ClassNotFoundException => - logError(s"Failed to find Iceberg converter class", e) + logError("Failed to find Iceberg converter class", e) throw DeltaErrors.icebergClassMissing(spark.sparkContext.getConf, e) case e: InvocationTargetException => - logError(s"Got error when creating an Iceberg converter", e) + logError("Got error when creating an Iceberg converter", e) // The better error is within the cause throw ExceptionUtils.getRootCause(e) } @@ -51,10 +51,10 @@ trait ProvidesUniFormConverters { self: DeltaLog => constructor.newInstance(spark) } catch { case e: ClassNotFoundException => - logError(s"Failed to find Hudi converter class", e) + logError("Failed to find Hudi converter class", e) throw DeltaErrors.hudiClassMissing(spark.sparkContext.getConf, e) case e: InvocationTargetException => - logError(s"Got error when creating an Hudi converter", e) + logError("Got error when creating an Hudi converter", e) // The better error is within the cause throw ExceptionUtils.getRootCause(e) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 7f8026784c2..4b114f5014a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.actions.Action.logSchema import org.apache.spark.sql.delta.coordinatedcommits.{CommitCoordinatorClient, CommitCoordinatorProvider, CoordinatedCommitsUtils, TableCommitCoordinatorClient} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -34,6 +35,7 @@ import org.apache.spark.sql.delta.util.StateCache import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.{MDC, MessageWithContext} import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StructType @@ -506,31 +508,32 @@ class Snapshot( spark.createDataFrame(spark.sparkContext.emptyRDD[Row], logSchema) - override def logInfo(msg: => String): Unit = { - super.logInfo(s"[tableId=${deltaLog.tableId}] " + msg) + def logInfo(msg: MessageWithContext): Unit = { + super.logInfo(log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, deltaLog.tableId)}] " + msg) } - override def logWarning(msg: => String): Unit = { - super.logWarning(s"[tableId=${deltaLog.tableId}] " + msg) + def logWarning(msg: MessageWithContext): Unit = { + super.logWarning(log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, deltaLog.tableId)}] " + msg) } - override def logWarning(msg: => String, throwable: Throwable): Unit = { - super.logWarning(s"[tableId=${deltaLog.tableId}] " + msg, throwable) + def logWarning(msg: MessageWithContext, throwable: Throwable): Unit = { + super.logWarning(log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, deltaLog.tableId)}] " + msg, + throwable) } - override def logError(msg: => String): Unit = { - super.logError(s"[tableId=${deltaLog.tableId}] " + msg) + def logError(msg: MessageWithContext): Unit = { + super.logError(log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, deltaLog.tableId)}] " + msg) } - override def logError(msg: => String, throwable: Throwable): Unit = { - super.logError(s"[tableId=${deltaLog.tableId}] " + msg, throwable) + def logError(msg: MessageWithContext, throwable: Throwable): Unit = { + super.logError(log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, deltaLog.tableId)}] " + msg, throwable) } override def toString: String = s"${getClass.getSimpleName}(path=$path, version=$version, metadata=$metadata, " + s"logSegment=$logSegment, checksumOpt=$checksumOpt)" - logInfo(s"Created snapshot $this") + logInfo(log"Created snapshot ${MDC(DeltaLogKeys.SNAPSHOT, this)}") init() } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 806d502189a..551e50d0af7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import com.databricks.spark.util.TagDefinitions.TAG_ASYNC import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.delta.coordinatedcommits._ +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.FileNames._ import org.apache.spark.sql.delta.util.JsonUtils @@ -38,6 +39,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.util.{ThreadUtils, Utils} @@ -181,8 +183,8 @@ trait SnapshotManagement { self: DeltaLog => } else { // If the thread pool is full, we should not submit more tasks to it. Instead, we should // run the task in the current thread. - logInfo("Getting un-backfilled commits from commit coordinator in the same " + - s"thread for table $dataPath") + logInfo(log"Getting un-backfilled commits from commit coordinator in the same " + + log"thread for table ${MDC(DeltaLogKeys.PATH, dataPath)}") recordDeltaEvent( this, CoordinatedCommitsUsageLogs.COMMIT_COORDINATOR_LISTING_THREADPOOL_FULL) CompletableFuture.completedFuture(getCommitsTask(isAsyncRequest = false)) @@ -670,9 +672,10 @@ trait SnapshotManagement { self: DeltaLog => tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], checksumOpt: Option[VersionChecksum]): Snapshot = { val startingFrom = if (!initSegment.checkpointProvider.isEmpty) { - s" starting from checkpoint version ${initSegment.checkpointProvider.version}." - } else "." - logInfo(s"Loading version ${initSegment.version}$startingFrom") + log" starting from checkpoint version " + + log"${MDC(DeltaLogKeys.START_VERSION, initSegment.checkpointProvider.version)}." + } else log"." + logInfo(log"Loading version ${MDC(DeltaLogKeys.VERSION, initSegment.version)}" + startingFrom) createSnapshotFromGivenOrEquivalentLogSegment( initSegment, tableCommitCoordinatorClientOpt) { segment => new Snapshot( @@ -738,7 +741,8 @@ trait SnapshotManagement { self: DeltaLog => verifyDeltaVersions(spark, deltaVersions, Some(cp.version + 1), Some(snapshotVersion)) } catch { case NonFatal(e) => - logWarning(s"Failed to find a valid LogSegment for $snapshotVersion", e) + logWarning(log"Failed to find a valid LogSegment for " + + log"${MDC(DeltaLogKeys.VERSION, snapshotVersion)}", e) return None } Some(LogSegment( @@ -763,7 +767,8 @@ trait SnapshotManagement { self: DeltaLog => verifyDeltaVersions(spark, deltaVersions, Some(0), Some(snapshotVersion)) } catch { case NonFatal(e) => - logWarning(s"Failed to find a valid LogSegment for $snapshotVersion", e) + logWarning(log"Failed to find a valid LogSegment for " + + log"${MDC(DeltaLogKeys.VERSION, snapshotVersion)}", e) return None } Some(LogSegment( @@ -819,7 +824,7 @@ trait SnapshotManagement { self: DeltaLog => tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt ).getOrElse { // This shouldn't be possible right after a commit - logError(s"No delta log found for the Delta table at $logPath") + logError(log"No delta log found for the Delta table at ${MDC(DeltaLogKeys.PATH, logPath)}") throw DeltaErrors.emptyDirectoryException(logPath.toString) } } @@ -849,8 +854,8 @@ trait SnapshotManagement { self: DeltaLog => if (firstError == null) { firstError = e } - logWarning(s"Failed to create a snapshot from log segment: $segment. " + - s"Trying a different checkpoint.", e) + logWarning(log"Failed to create a snapshot from log segment " + + log"${MDC(DeltaLogKeys.SEGMENT, segment)}. Trying a different checkpoint.", e) segment = getLogSegmentWithMaxExclusiveCheckpointVersion( segment.version, segment.checkpointProvider.version, @@ -860,7 +865,8 @@ trait SnapshotManagement { self: DeltaLog => } attempt += 1 case e: SparkException if firstError != null => - logWarning(s"Failed to create a snapshot from log segment: $segment", e) + logWarning(log"Failed to create a snapshot from log segment " + + log"${MDC(DeltaLogKeys.SEGMENT, segment)}", e) throw firstError } } @@ -1095,11 +1101,11 @@ trait SnapshotManagement { self: DeltaLog => tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, checksumOpt = None) previousSnapshotOpt.foreach(logMetadataTableIdChange(_, newSnapshot)) - logInfo(s"Updated snapshot to $newSnapshot") + logInfo(log"Updated snapshot to ${MDC(DeltaLogKeys.SNAPSHOT, newSnapshot)}") newSnapshot } }.getOrElse { - logInfo(s"Creating initial snapshot without metadata, because the directory is empty") + logInfo("Creating initial snapshot without metadata, because the directory is empty") new InitialSnapshot(logPath, this) } } @@ -1153,7 +1159,9 @@ trait SnapshotManagement { self: DeltaLog => newChecksumOpt: Option[VersionChecksum], tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], committedVersion: Long): Snapshot = { - logInfo(s"Creating a new snapshot v${initSegment.version} for commit version $committedVersion") + logInfo( + log"Creating a new snapshot v${MDC(DeltaLogKeys.VERSION, initSegment.version)} " + + log"for commit version ${MDC(DeltaLogKeys.VERSION2, committedVersion)}") createSnapshot( initSegment, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, @@ -1203,7 +1211,7 @@ trait SnapshotManagement { self: DeltaLog => previousSnapshot.tableCommitCoordinatorClientOpt, committedVersion) logMetadataTableIdChange(previousSnapshot, newSnapshot) - logInfo(s"Updated snapshot to $newSnapshot") + logInfo(log"Updated snapshot to ${MDC(DeltaLogKeys.SNAPSHOT, newSnapshot)}") installSnapshot(newSnapshot, updateTimestamp) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala index 153b434a104..f126c4ae361 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} import org.apache.spark.sql.delta.commands.WriteIntoDelta +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.types.{ArrayType, MapType, NullType} @@ -154,8 +156,8 @@ object UniversalFormat extends DeltaLogging { remainingSupportedFormats.mkString(",")) } - logInfo(s"[tableId=$tableId] IcebergCompat is being disabled. Auto-disabling " + - "Universal Format (Iceberg), too.") + logInfo(log"[${MDC(DeltaLogKeys.TABLE_ID, tableId)}] " + + log"IcebergCompat is being disabled. Auto-disabling Universal Format (Iceberg), too.") (None, Some(newestMetadata.copy(configuration = newConfiguration))) } else { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala index cbf8f2da680..a2789656054 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala @@ -25,9 +25,11 @@ import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol} import org.apache.spark.sql.delta.actions.Protocol.extractAutomaticallyEnabledFeatures import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.convert.{ConvertTargetTable, ConvertUtils} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.hadoop.fs.Path +import org.apache.spark.internal.MDC import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -88,7 +90,8 @@ case class CloneTableCommand( } /** Log clone command information */ - logInfo("Cloning " + sourceTable.description + s" to $targetPath") + logInfo(log"Cloning ${MDC(DeltaLogKeys.TABLE_DESC, sourceTable.description)} to " + + log"${MDC(DeltaLogKeys.PATH, targetPath)}") // scalastyle:off deltahadoopconfiguration val hdpConf = sparkSession.sessionState.newHadoopConf() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index ab2b08b8770..36544b91e4b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -28,11 +28,13 @@ import org.apache.spark.sql.delta.actions.{AddFile, Metadata} import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.VacuumCommand.{generateCandidateFileMap, getTouchedFile} import org.apache.spark.sql.delta.commands.convert.{ConvertTargetFileManifest, ConvertTargetTable, ConvertUtils} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.delta.util._ import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.internal.MDC import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{Analyzer, NoSuchTableException} @@ -297,13 +299,16 @@ abstract class ConvertToDeltaCommandBase( ConvertUtils.createAddFile( _, txn.deltaLog.dataPath, fs, conf, Some(partitionSchema), deltaPath.isDefined)) if (shouldCollectStats) { - logInfo(s"Collecting stats for a batch of ${batch.size} files; " + - s"finished $numFiles so far") + logInfo( + log"Collecting stats for a batch of ${MDC(DeltaLogKeys.NUM_FILES, batch.size)} files; " + + log"finished ${MDC(DeltaLogKeys.NUM_FILES2, numFiles)} so far" + ) numFiles += statsBatchSize performStatsCollection(spark, txn, adds) } else if (collectStats) { - logWarning(s"collectStats is set to true but ${DeltaSQLConf.DELTA_COLLECT_STATS.key}" + - s" is false. Skip statistics collection") + logWarning(log"collectStats is set to true but ${MDC(DeltaLogKeys.CONFIG, + DeltaSQLConf.DELTA_COLLECT_STATS.key)}" + + log" is false. Skip statistics collection") adds.toIterator } else { adds.toIterator diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 27b2f467302..6a341523e09 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -26,12 +26,14 @@ import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} import org.apache.spark.sql.delta.actions.DomainMetadata import org.apache.spark.sql.delta.commands.DMLUtils.TaggedCommitData import org.apache.spark.sql.delta.hooks.{HudiConverterHook, IcebergConverterHook, UpdateCatalog, UpdateCatalogFactory} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.internal.MDC import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Attribute @@ -189,7 +191,8 @@ case class CreateDeltaTableCommand( tableWithLocation: CatalogTable): Unit = { // Note that someone may have dropped and recreated the table in a separate location in the // meantime... Unfortunately we can't do anything there at the moment, because Hive sucks. - logInfo(s"Table is path-based table: $tableByPath. Update catalog with mode: $operation") + logInfo(log"Table is path-based table: ${MDC(DeltaLogKeys.IS_PATH_TABLE, tableByPath)}. " + + log"Update catalog with mode: ${MDC(DeltaLogKeys.OPERATION, operation)}") val opStartTs = TimeUnit.NANOSECONDS.toMillis(txnUsedForCommit.txnStartTimeNs) val postCommitSnapshot = deltaLog.update(checkIfUpdatedSinceTs = Some(opStartTs)) val didNotChangeMetadata = txnUsedForCommit.metadata == txnUsedForCommit.snapshot.metadata diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala index 92983443843..801a569e727 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala @@ -25,11 +25,13 @@ import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaErrors, DeltaLog import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.catalog.{DeltaTableV2, IcebergTablePlaceHolder} import org.apache.spark.sql.delta.files.TahoeBatchFileIndex +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.delta.util.DeltaFileOperations import org.apache.hadoop.fs.Path +import org.apache.spark.internal.MDC import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, NoSuchTableException, ResolvedTable, UnresolvedAttribute, UnresolvedRelation} @@ -355,7 +357,8 @@ trait DeltaCommand extends DeltaLogging { for (version <- txnVersionOpt; appId <- txnAppIdOpt) { val currentVersion = txn.txnVersion(appId) if (currentVersion >= version) { - logInfo(s"Already completed batch $version in application $appId. This will be skipped.") + logInfo(log"Already completed batch ${MDC(DeltaLogKeys.VERSION, version)} in application " + + log"${MDC(DeltaLogKeys.APP_ID, appId)}. This will be skipped.") if (isFromSessionConf && sparkSession.sessionState.conf.getConf( DeltaSQLConf.DELTA_IDEMPOTENT_DML_AUTO_RESET_ENABLED)) { // if we got txnAppId and txnVersion from the session config, we reset the diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index bcfad684056..b34ee562f9a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -27,12 +27,14 @@ import org.apache.spark.sql.delta.DeltaOperations.Operation import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescriptor, FileAction, RemoveFile} import org.apache.spark.sql.delta.commands.optimize._ import org.apache.spark.sql.delta.files.SQLMetricsReporting +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.BinPackingUtils import org.apache.spark.SparkContext import org.apache.spark.SparkContext.SPARK_JOB_GROUP_ID +import org.apache.spark.internal.MDC import org.apache.spark.sql.{AnalysisException, Encoders, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedTable} @@ -458,8 +460,9 @@ class OptimizeExecutor( true } else { val deleted = candidateSetOld -- candidateSetNew - logWarning(s"The following compacted files were deleted " + - s"during checkpoint ${deleted.mkString(",")}. Aborting the compaction.") + logWarning(log"The following compacted files were deleted " + + log"during checkpoint ${MDC(DeltaLogKeys.PATHS, deleted.mkString(","))}. " + + log"Aborting the compaction.") false } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala index 79964431b39..1ae994e1e21 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala @@ -24,9 +24,11 @@ import org.apache.spark.sql.delta.IcebergCompat.{getEnabledVersion, getIcebergCo import org.apache.spark.sql.delta.UniversalFormat.{icebergEnabled, ICEBERG_FORMAT} import org.apache.spark.sql.delta.actions.{AddFile, Protocol} import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.Utils.try_element_at +import org.apache.spark.internal.MDC import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.functions.col @@ -108,9 +110,11 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging { === icebergCompatVersion.toString) .count() val numOfAddFiles = snapshot.numOfFiles - logInfo(s"For table ${table.tableIdentifier} at version ${snapshot.version}, there are " + - s"$numOfAddFiles addFiles, and $numOfAddFilesWithTag addFiles with ICEBERG_COMPAT_VERSION=" + - s"$icebergCompatVersion tag.") + logInfo(log"For table ${MDC(DeltaLogKeys.TABLE_NAME, table.tableIdentifier)} " + + log"at version ${MDC(DeltaLogKeys.VERSION, snapshot.version)}, there are " + + log"${MDC(DeltaLogKeys.NUM_FILES, numOfAddFiles)} addFiles, and " + + log"${MDC(DeltaLogKeys.NUM_FILES2, numOfAddFilesWithTag)} addFiles with " + + log"ICEBERG_COMPAT_VERSION=${MDC(DeltaLogKeys.TAG, icebergCompatVersion)} tag.") (numOfAddFiles, numOfAddFilesWithTag) } @@ -151,8 +155,9 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging { val didUpdateIcebergCompatVersion = if (!currIcebergCompatVersionOpt.contains(targetIcebergCompatVersion)) { enableIcebergCompat(target, currIcebergCompatVersionOpt, targetVersionDeltaConfig) - logInfo(s"Update table ${target.tableIdentifier} to iceberg compat version = " + - s"$targetIcebergCompatVersion successfully.") + logInfo(log"Update table ${MDC(DeltaLogKeys.TABLE_NAME, target.tableIdentifier)} " + + log"to iceberg compat version = " + + log"${MDC(DeltaLogKeys.VERSION, targetIcebergCompatVersion)} successfully.") true } else { false @@ -166,8 +171,9 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging { // 1. The target iceberg compat version requires rewrite. // 2. Not all addFile have ICEBERG_COMPAT_VERSION=targetVersion tag val (metricsOpt, didRewrite) = if (versionChangeMayNeedRewrite && !allAddFilesHaveTag) { - logInfo(s"Reorg Table ${target.tableIdentifier} to iceberg compat version = " + - s"$targetIcebergCompatVersion need rewrite data files.") + logInfo(log"Reorg Table ${MDC(DeltaLogKeys.TABLE_NAME, target.tableIdentifier)} to " + + log"iceberg compat version = ${MDC(DeltaLogKeys.VERSION, targetIcebergCompatVersion)} " + + log"need rewrite data files.") val metrics = try { optimizeByReorg(sparkSession) } catch { @@ -175,8 +181,9 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging { throw DeltaErrors.icebergCompatDataFileRewriteFailedException( targetIcebergCompatVersion, e) } - logInfo(s"Rewrite table ${target.tableIdentifier} to iceberg compat version = " + - s"$targetIcebergCompatVersion successfully.") + logInfo(log"Rewrite table ${MDC(DeltaLogKeys.TABLE_NAME, target.tableIdentifier)} " + + log"to iceberg compat version = ${MDC(DeltaLogKeys.VERSION, + targetIcebergCompatVersion)} successfully.") (Some(metrics), true) } else { (None, false) @@ -198,8 +205,9 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging { val enableUniformConf = Map( DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.key -> ICEBERG_FORMAT) AlterTableSetPropertiesDeltaCommand(target, enableUniformConf).run(sparkSession) - logInfo(s"Enabling universal format with iceberg compat version = " + - s"$targetIcebergCompatVersion for table ${target.tableIdentifier} succeeded.") + logInfo(log"Enabling universal format with iceberg compat version = " + + log"${MDC(DeltaLogKeys.VERSION, targetIcebergCompatVersion)} for table " + + log"${MDC(DeltaLogKeys.TABLE_NAME, target.tableIdentifier)} succeeded.") } recordDeltaEvent(updatedSnapshot.deltaLog, "delta.upgradeUniform.success", data = Map( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 3a0e5ea805c..a90a4bab37e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.DeltaFileOperations import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive @@ -30,6 +31,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.MDC import org.apache.spark.paths.SparkPath import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession} import org.apache.spark.sql.execution.metric.SQLMetric @@ -218,8 +220,11 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { case Some(millis) => clock.getTimeMillis() - millis case _ => snapshot.minFileRetentionTimestamp } - logInfo(s"Starting garbage collection (dryRun = $dryRun) of untracked files older than " + - s"${new Date(deleteBeforeTimestamp).toGMTString} in $path") + logInfo(log"Starting garbage collection (dryRun = " + + log"${MDC(DeltaLogKeys.DRY_RUN, dryRun)}) of untracked " + + log"files older than ${MDC(DeltaLogKeys.DATE, + new Date(deleteBeforeTimestamp).toGMTString)} in " + + log"${MDC(DeltaLogKeys.PATH, path)}") val hadoopConf = spark.sparkContext.broadcast( new SerializableConfiguration(deltaHadoopConf)) val basePath = fs.makeQualified(path).toString @@ -343,8 +348,10 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { ) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) - logInfo(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " + - s"a total of $dirCounts directories that are safe to delete. Vacuum stats: $stats") + logInfo(log"Found ${MDC(DeltaLogKeys.NUM_FILES, numFiles)} files " + + log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " + + log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories " + + log"that are safe to delete. Vacuum stats: ${MDC(DeltaLogKeys.STATS, stats)}") return diffFiles.map(f => stringToPath(f).toString).toDF("path") } @@ -383,8 +390,10 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { numPartitionColumns = partitionColumns.size) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts)) - logInfo(s"Deleted $filesDeleted files ($sizeOfDataToDelete bytes) and directories in " + - s"a total of $dirCounts directories. Vacuum stats: $stats") + logInfo(log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, filesDeleted)} files " + + log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " + + log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories. " + + log"Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}") spark.createDataset(Seq(basePath)).toDF("path") @@ -450,8 +459,11 @@ trait VacuumCommandImpl extends DeltaCommand { sizeOfDataToDelete: Long, specifiedRetentionMillis: Option[Long], defaultRetentionMillis: Long): Unit = { - logInfo(s"Deleting untracked files and empty directories in $path. The amount of data to be " + - s"deleted is $sizeOfDataToDelete (in bytes)") + logInfo( + log"Deleting untracked files and empty directories in " + + log"${MDC(DeltaLogKeys.PATH, path)}. The amount " + + log"of data to be deleted is ${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} (in bytes)" + ) // We perform an empty commit in order to record information about the Vacuum if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 3a3e3761647..5823ebc0bcc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -30,12 +30,14 @@ import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.schema.SchemaUtils.transformSchema import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats.StatisticsCollection import org.apache.hadoop.fs.Path +import org.apache.spark.internal.MDC import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.CatalogUtils @@ -1021,8 +1023,8 @@ case class AlterTableAddConstraintDeltaCommand( throw a.copy(context = Array.empty) } - logInfo(s"Checking that $exprText is satisfied for existing data. " + - "This will require a full table scan.") + logInfo(log"Checking that ${MDC(DeltaLogKeys.EXPR, exprText)} " + + log"is satisfied for existing data. This will require a full table scan.") recordDeltaOperation( txn.snapshot.deltaLog, "delta.ddl.alter.addConstraint.checkExisting") { @@ -1099,7 +1101,8 @@ case class AlterTableClusterByDeltaCommand( val snapshot = deltaLog.update() if (clusteringColumns.isEmpty && !ClusteredTableUtils.isSupported(snapshot.protocol)) { - logInfo(s"Skipping ALTER TABLE CLUSTER BY NONE on a non-clustered table: ${table.name()}.") + logInfo(log"Skipping ALTER TABLE CLUSTER BY NONE on a non-clustered table: " + + log"${MDC(DeltaLogKeys.TABLE_NAME, table.name())}.") recordDeltaEvent( deltaLog, "delta.ddl.alter.clusterBy", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index a0cd64ce445..fedd96f5a4b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -24,12 +24,14 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat} import org.apache.spark.sql.delta.files.{CdcAddFileIndex, TahoeChangeFileIndex, TahoeFileIndexWithSnapshotDescriptor, TahoeRemoveFileIndex} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSource, DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore import org.apache.spark.sql.util.ScalaExtensions.OptionExt +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.InternalRow @@ -300,8 +302,8 @@ trait CDCReaderImpl extends DeltaLogging { } logInfo( - s"startingVersion: ${startingVersion.version}, " + - s"endingVersion: ${endingVersionOpt.map(_.version)}") + log"startingVersion: ${MDC(DeltaLogKeys.START_VERSION, startingVersion.version)}, " + + log"endingVersion: ${MDC(DeltaLogKeys.END_VERSION, endingVersionOpt.map(_.version))}") val startingSnapshot = snapshotToUse.deltaLog.getSnapshotAt(startingVersion.version) val columnMappingEnabledAtStartingVersion = diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala index 1b9adfb3344..b6237d7f04c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala @@ -95,10 +95,10 @@ trait ConvertUtilsBase extends DeltaLogging { } } catch { case e: ClassNotFoundException => - logError(s"Failed to find Iceberg class", e) + logError("Failed to find Iceberg class", e) throw DeltaErrors.icebergClassMissing(spark.sparkContext.getConf, e) case e: InvocationTargetException => - logError(s"Got error when creating an Iceberg Converter", e) + logError("Got error when creating an Iceberg Converter", e) // The better error is within the cause throw ExceptionUtils.getRootCause(e) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index 6a6134dccca..b5078f182d3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -20,11 +20,13 @@ import scala.annotation.tailrec import scala.util.control.NonFatal import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.DeltaSparkPlanUtils import org.apache.spark.SparkException +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.FileSourceOptions @@ -110,15 +112,17 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils { attempt == spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_MAX_ATTEMPTS) handleExceptionDuringAttempt(ex, isLastAttempt, deltaLog) match { case RetryHandling.Retry => - logInfo(s"Retrying MERGE with materialized source. Attempt $attempt failed.") + logInfo(log"Retrying MERGE with materialized source. Attempt " + + log"${MDC(DeltaLogKeys.ATTEMPT, attempt)} failed.") doRetry = true attempt += 1 case RetryHandling.ExhaustedRetries => - logError(s"Exhausted retries after $attempt attempts in MERGE with" + - s" materialized source. Logging latest exception.", ex) + logError(log"Exhausted retries after ${MDC(DeltaLogKeys.ATTEMPT, attempt)}" + + log" attempts in MERGE with materialized source. Logging latest exception.", ex) throw DeltaErrors.sourceMaterializationFailedRepeatedlyInMerge case RetryHandling.RethrowException => - logError(s"Fatal error in MERGE with materialized source in attempt $attempt.", ex) + logError(log"Fatal error in MERGE with materialized source in " + + log"attempt ${MDC(DeltaLogKeys.ATTEMPT, attempt)}", ex) throw ex } } finally { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.scala index 542b8b5d048..83d7ecec32f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.scala @@ -23,18 +23,21 @@ import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.TransactionExecutionObserver import org.apache.spark.sql.delta.actions.CommitInfo import org.apache.spark.sql.delta.actions.Metadata +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.FileNames import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LoggingShims, MDC} /** * An abstract [[CommitCoordinatorClient]] which triggers backfills every n commits. * - every commit version which satisfies `commitVersion % batchSize == 0` will trigger a backfill. */ -trait AbstractBatchBackfillingCommitCoordinatorClient extends CommitCoordinatorClient with Logging { +trait AbstractBatchBackfillingCommitCoordinatorClient + extends CommitCoordinatorClient + with LoggingShims { /** * Size of batch that should be backfilled. So every commit version which satisfies @@ -69,12 +72,15 @@ trait AbstractBatchBackfillingCommitCoordinatorClient extends CommitCoordinatorC throw CommitFailedException( retryable = false, conflict = false, message = "Commit version 0 must go via filesystem.") } - logInfo(s"Attempting to commit version $commitVersion on table $tablePath") + logInfo(log"Attempting to commit version " + + log"${MDC(DeltaLogKeys.VERSION, commitVersion)} on table " + + log"${MDC(DeltaLogKeys.PATH, tablePath)}") val fs = logPath.getFileSystem(hadoopConf) if (batchSize <= 1) { // Backfill until `commitVersion - 1` - logInfo(s"Making sure commits are backfilled until ${commitVersion - 1} version for" + - s" table ${tablePath.toString}") + logInfo(log"Making sure commits are backfilled until " + + log"${MDC(DeltaLogKeys.VERSION, commitVersion - 1)} version for" + + log" table ${MDC(DeltaLogKeys.PATH, tablePath.toString)}") backfillToVersion( logStore, hadoopConf, @@ -110,8 +116,9 @@ trait AbstractBatchBackfillingCommitCoordinatorClient extends CommitCoordinatorC val newCommit = commitResponse.getCommit.copy(fileStatus = targetFileStatus) commitResponse = commitResponse.copy(commit = newCommit) } else if (commitVersion % batchSize == 0 || mcToFsConversion) { - logInfo(s"Making sure commits are backfilled till $commitVersion version for" + - s"table ${tablePath.toString}") + logInfo(log"Making sure commits are backfilled till " + + log"${MDC(DeltaLogKeys.VERSION, commitVersion)} " + + log"version for table ${MDC(DeltaLogKeys.PATH, tablePath.toString)}") backfillToVersion( logStore, hadoopConf, @@ -119,7 +126,8 @@ trait AbstractBatchBackfillingCommitCoordinatorClient extends CommitCoordinatorC coordinatedCommitsTableConf, commitVersion) } - logInfo(s"Commit $commitVersion done successfully on table $tablePath") + logInfo(log"Commit ${MDC(DeltaLogKeys.VERSION, commitVersion)} done successfully on table " + + log"${MDC(DeltaLogKeys.PATH, tablePath)}") commitResponse } @@ -163,7 +171,8 @@ trait AbstractBatchBackfillingCommitCoordinatorClient extends CommitCoordinatorC version: Long, fileStatus: FileStatus): Unit = { val targetFile = FileNames.unsafeDeltaFile(logPath, version) - logInfo(s"Backfilling commit ${fileStatus.getPath} to ${targetFile.toString}") + logInfo(log"Backfilling commit ${MDC(DeltaLogKeys.PATH, fileStatus.getPath)} to " + + log"${MDC(DeltaLogKeys.PATH2, targetFile.toString)}") val commitContentIterator = logStore.readAsIterator(fileStatus, hadoopConf) try { logStore.write( @@ -174,7 +183,7 @@ trait AbstractBatchBackfillingCommitCoordinatorClient extends CommitCoordinatorC registerBackfill(logPath, version) } catch { case _: FileAlreadyExistsException => - logInfo(s"The backfilled file $targetFile already exists.") + logInfo(log"The backfilled file ${MDC(DeltaLogKeys.FILE_NAME, targetFile)} already exists.") } finally { commitContentIterator.close() } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala index 2ebcdd46850..cc0d0d540a7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala @@ -20,6 +20,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.delta.{CoordinatedCommitsTableFeature, DeltaConfig, DeltaConfigs, DeltaLog, Snapshot, SnapshotDescriptor} import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.FileNames @@ -27,6 +28,7 @@ import org.apache.spark.sql.delta.util.FileNames.{DeltaFile, UnbackfilledDeltaFi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.util.Utils @@ -293,11 +295,12 @@ object CoordinatedCommitsUtils extends DeltaLogging { actionsIter, overwrite, hadoopConf) - logInfo(s"Delta file ${unbackfilledDeltaFile.getPath.toString} backfilled to path" + - s" ${backfilledFilePath.toString}.") + logInfo(log"Delta file ${MDC(DeltaLogKeys.PATH, unbackfilledDeltaFile.getPath.toString)} " + + log"backfilled to path ${MDC(DeltaLogKeys.PATH2, backfilledFilePath.toString)}.") } else { numAlreadyBackfilledFiles += 1 - logInfo(s"Delta file ${unbackfilledDeltaFile.getPath.toString} already backfilled.") + logInfo(log"Delta file ${MDC(DeltaLogKeys.PATH, unbackfilledDeltaFile.getPath.toString)} " + + log"already backfilled.") } } recordDeltaEvent( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryCommitCoordinator.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryCommitCoordinator.scala index 4b9859cb697..9596ccb6301 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryCommitCoordinator.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryCommitCoordinator.scala @@ -22,10 +22,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.mutable import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.storage.LogStore import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession class InMemoryCommitCoordinator(val batchSize: Long) @@ -133,7 +135,8 @@ class InMemoryCommitCoordinator(val batchSize: Long) tableData.commitsMap(commitVersion) = commit tableData.updateLastRatifiedCommit(commitVersion) - logInfo(s"Added commit file ${commitFile.getPath} to commit-coordinator.") + logInfo(log"Added commit file ${MDC(DeltaLogKeys.PATH, commitFile.getPath)} " + + log"to commit-coordinator.") CommitResponse(commit) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala index 2a73b8b852a..adef00f8551 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.files import java.util.{Date, UUID} import org.apache.spark.sql.delta.DeltaOptions +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} import org.apache.hadoop.mapreduce._ @@ -26,7 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LoggingShims, MDC} import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.SparkSession @@ -51,7 +52,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} * values to data files. Specifically L123-126, L132, and L140 where it adds option * WRITE_PARTITION_COLUMNS */ -object DeltaFileFormatWriter extends Logging { +object DeltaFileFormatWriter extends LoggingShims { /** * A variable used in tests to check whether the output ordering of the query matches the @@ -295,18 +296,20 @@ object DeltaFileFormatWriter extends Logging { val ret = f val commitMsgs = ret.map(_.commitMsg) - logInfo(s"Start to commit write Job ${description.uuid}.") + logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.") val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) } - logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.") + logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} committed. " + + log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.") processStats(description.statsTrackers, ret.map(_.summary.stats), duration) - logInfo(s"Finished processing stats for write job ${description.uuid}.") + logInfo(log"Finished processing stats for write job " + + log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.") // return a set of all the partition paths that were updated during this job ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty) } catch { case cause: Throwable => - logError(s"Aborting job ${description.uuid}.", cause) + logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}", cause) committer.abortJob(job) throw cause } @@ -432,7 +435,7 @@ object DeltaFileFormatWriter extends Logging { })(catchBlock = { // If there is an error, abort the task dataWriter.abort() - logError(s"Job $jobId aborted.") + logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.") }, finallyBlock = { dataWriter.close() }) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaSourceSnapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaSourceSnapshot.scala index 316bd88a460..af112229250 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaSourceSnapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaSourceSnapshot.scala @@ -20,10 +20,12 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.delta.{DeltaLog, DeltaTableUtils, Snapshot} import org.apache.spark.sql.delta.actions.SingleAction +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.sources.IndexedFile import org.apache.spark.sql.delta.stats.DataSkippingReader import org.apache.spark.sql.delta.util.StateCache +import org.apache.spark.internal.MDC import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.functions._ @@ -50,7 +52,8 @@ class DeltaSourceSnapshot( val (part, data) = filters.partition { e => DeltaTableUtils.isPredicatePartitionColumnsOnly(e, partitionCols, spark) } - logInfo(s"Classified filters: partition: $part, data: $data") + logInfo(log"Classified filters: partition: ${MDC(DeltaLogKeys.PARTITION_FILTER, part)}, " + + log"data: ${MDC(DeltaLogKeys.DATA_FILTER, data)}") (part, data) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala index 44cecac5e7e..34dfa348e46 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala @@ -21,10 +21,12 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.{DeltaOptimizeContext, OptimizeExecutor} import org.apache.spark.sql.delta.commands.optimize._ +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats.AutoCompactPartitionStats +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Expression @@ -156,7 +158,7 @@ trait AutoCompactBase extends PostCommitHook with DeltaLogging { metrics } catch { case e: Throwable => - logError("Auto Compaction failed with: " + e.getMessage) + logError(log"Auto Compaction failed with: ${MDC(DeltaLogKeys.ERROR, e.getMessage)}") recordDeltaEvent( txn.deltaLog, opType = "delta.autoCompaction.error", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala index 771ceee9f81..a8ae40df99e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala @@ -22,12 +22,14 @@ import java.net.URI import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.DeletionVectorUtils.isTableDVFree +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.DeltaFileOperations import org.apache.hadoop.fs.Path import org.apache.spark.SparkEnv +import org.apache.spark.internal.MDC import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils @@ -308,9 +310,9 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with }.collect().toSet } - logInfo(s"Generated manifest partitions for $deltaLogDataPath " + - s"[${newManifestPartitionRelativePaths.size}]:\n\t" + - newManifestPartitionRelativePaths.mkString("\n\t")) + logInfo(log"Generated manifest partitions for ${MDC(DeltaLogKeys.PATH, deltaLogDataPath)} " + + log"[${MDC(DeltaLogKeys.NUM_PARTITIONS, newManifestPartitionRelativePaths.size)}]:\n\t" + + log"${MDC(DeltaLogKeys.PATHS, newManifestPartitionRelativePaths.mkString("\n\t"))}") newManifestPartitionRelativePaths } @@ -334,8 +336,9 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with fs.delete(absPathToDelete, true) } - logInfo(s"Deleted manifest partitions [${partitionRelativePathsToDelete.size}]:\n\t" + - partitionRelativePathsToDelete.mkString("\n\t")) + logInfo(log"Deleted manifest partitions [" + + log"${MDC(DeltaLogKeys.NUM_FILES, partitionRelativePathsToDelete.size)}]:\n\t" + + log"${MDC(DeltaLogKeys.PATHS, partitionRelativePathsToDelete.mkString("\n\t"))}") } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala index dd8f295e829..6cccaf44a74 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala @@ -28,11 +28,13 @@ import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, Clus import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec import org.apache.spark.sql.delta.{DeltaConfigs, DeltaTableIdentifier, OptimisticTransactionImpl, Snapshot} import org.apache.spark.sql.delta.actions.{Action, Metadata} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.threads.DeltaThreadPool import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.spark.internal.MDC import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier @@ -196,8 +198,9 @@ trait UpdateCatalogBase extends PostCommitHook with DeltaLogging { "exceptionMsg" -> ExceptionUtils.getMessage(e), "stackTrace" -> ExceptionUtils.getStackTrace(e)) ) - logWarning(s"Failed to update the catalog for ${table.identifier} with the latest " + - s"table information.", e) + logWarning(log"Failed to update the catalog for " + + log"${MDC(DeltaLogKeys.TABLE_NAME, table.identifier)} with the latest " + + log"table information.", e) } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala index 18555946527..fb26b7d00ed 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala @@ -45,9 +45,95 @@ import org.apache.spark.internal.LogKeyShims * should be defined here for standardization. */ trait DeltaLogKeysBase { + case object APP_ID extends LogKeyShims + case object ATTEMPT extends LogKeyShims + case object BATCH_ID extends LogKeyShims + case object CLASS_NAME extends LogKeyShims + case object COLUMN_NAME extends LogKeyShims + case object COMPACTION_INFO_NEW extends LogKeyShims + case object COMPACTION_INFO_OLD extends LogKeyShims + case object CONFIG extends LogKeyShims + case object COORDINATOR_CONF extends LogKeyShims + case object COORDINATOR_NAME extends LogKeyShims + case object COUNT extends LogKeyShims + case object DATA_FILTER extends LogKeyShims + case object DATE extends LogKeyShims + case object DIR extends LogKeyShims + case object DRY_RUN extends LogKeyShims + case object DURATION extends LogKeyShims + case object END_INDEX extends LogKeyShims + case object END_OFFSET extends LogKeyShims + case object END_VERSION extends LogKeyShims + case object ERROR extends LogKeyShims case object EXECUTOR_ID extends LogKeyShims + case object EXPR extends LogKeyShims + case object FILE_ACTION_NEW extends LogKeyShims + case object FILE_ACTION_OLD extends LogKeyShims + case object FILE_INDEX extends LogKeyShims + case object FILE_NAME extends LogKeyShims + case object FILE_STATUS extends LogKeyShims + case object FILE_SYSTEM_SCHEME extends LogKeyShims + case object FILTER extends LogKeyShims + case object HOOK_NAME extends LogKeyShims + case object INDEX extends LogKeyShims + case object ISOLATION_LEVEL extends LogKeyShims + case object IS_INIT_SNAPSHOT extends LogKeyShims + case object IS_PATH_TABLE extends LogKeyShims + case object JOB_ID extends LogKeyShims case object MAX_SIZE extends LogKeyShims + case object MESSAGE extends LogKeyShims + case object METADATA_ID extends LogKeyShims + case object METADATA_NEW extends LogKeyShims + case object METADATA_OLD extends LogKeyShims + case object METRICS extends LogKeyShims case object MIN_SIZE extends LogKeyShims + case object NUM_ACTIONS extends LogKeyShims + case object NUM_ACTIONS2 extends LogKeyShims + case object NUM_BYTES extends LogKeyShims + case object NUM_DIRS extends LogKeyShims + case object NUM_FILES extends LogKeyShims + case object NUM_FILES2 extends LogKeyShims + case object NUM_PARTITIONS extends LogKeyShims + case object NUM_PREDICATES extends LogKeyShims + case object NUM_RECORDS extends LogKeyShims + case object NUM_RECORDS_ACTUAL extends LogKeyShims + case object NUM_RECORDS_EXPECTED extends LogKeyShims + case object OFFSET extends LogKeyShims + case object OPERATION extends LogKeyShims + case object OPERATION2 extends LogKeyShims + case object OP_NAME extends LogKeyShims + case object OP_TYPE extends LogKeyShims + case object PARTITION_FILTER extends LogKeyShims + case object PATH extends LogKeyShims + case object PATH2 extends LogKeyShims + case object PATHS extends LogKeyShims + case object PROTOCOL extends LogKeyShims + case object QUERY_ID extends LogKeyShims + case object RDD_ID extends LogKeyShims + case object SCHEMA extends LogKeyShims + case object SEGMENT extends LogKeyShims + case object SIZE extends LogKeyShims + case object SNAPSHOT extends LogKeyShims + case object START_INDEX extends LogKeyShims + case object START_VERSION extends LogKeyShims + case object STATS extends LogKeyShims + case object STATUS extends LogKeyShims + case object TABLE_DESC extends LogKeyShims + case object TABLE_FEATURES extends LogKeyShims + case object TABLE_ID extends LogKeyShims + case object TABLE_NAME extends LogKeyShims + case object TAG extends LogKeyShims + case object TBL_PROPERTIES extends LogKeyShims + case object THREAD_NAME extends LogKeyShims + case object TIMESTAMP extends LogKeyShims + case object TIMESTAMP2 extends LogKeyShims + case object TIME_MS extends LogKeyShims + case object TIME_STATS extends LogKeyShims + case object TXN_ID extends LogKeyShims + case object URI extends LogKeyShims + case object VACUUM_STATS extends LogKeyShims + case object VERSION extends LogKeyShims + case object VERSION2 extends LogKeyShims } object DeltaLogKeys extends DeltaLogKeysBase diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuery.scala b/spark/src/main/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuery.scala index f14c0c425ac..dfa691807fb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuery.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuery.scala @@ -111,7 +111,7 @@ trait OptimizeMetadataOnlyDeltaQuery extends Logging { Seq(InternalRow.fromSeq(rewrittenAggregationValues))) r } else { - logInfo(s"Query can't be optimized using metadata because stats are missing") + logInfo("Query can't be optimized using metadata because stats are missing") plan } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index 53503bb2cb2..dbfc6034af4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -21,9 +21,11 @@ import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{DomainMetadata, Metadata, Protocol} import org.apache.spark.sql.delta.constraints.Constraints +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.PartitionUtils +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.FileSourceGeneratedMetadataStructField import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes @@ -133,7 +135,7 @@ trait ImplicitMetadataOperation extends DeltaLogging { txn.updateMetadataForTableOverwrite(newMetadata) } else if (isNewSchema && canMergeSchema && !isNewPartitioning ) { - logInfo(s"New merged schema: ${mergedSchema.treeString}") + logInfo(log"New merged schema: ${MDC(DeltaLogKeys.SCHEMA, mergedSchema.treeString)}") recordDeltaEvent(txn.deltaLog, "delta.ddl.mergeSchema") if (rearrangeOnly) { throw DeltaErrors.unexpectedDataChangeException("Change the Delta table schema") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index 038a77050d2..87182fb8c8f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -24,12 +24,14 @@ import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMod import org.apache.spark.sql.delta.{RowCommitVersion, RowId} import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.commands.cdc.CDCReader +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaMergingUtils._ import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.util.ScalaExtensions._ +import org.apache.spark.internal.MDC import org.apache.spark.sql._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} @@ -1396,7 +1398,8 @@ def normalizeColumnNamesInDataType( } } catch { case NonFatal(e) => - logWarning(s"Failed to log undefined types for table ${deltaLog.logPath}", e) + logWarning(log"Failed to log undefined types for table " + + log"${MDC(DeltaLogKeys.PATH, deltaLog.logPath)}", e) } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala index c105eb9e94d..7a4406dab7e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala @@ -21,11 +21,13 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaOperations.StreamingUpdate import org.apache.spark.sql.delta.actions.{FileAction, SetTransaction} +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils} import org.apache.hadoop.fs.Path // scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.internal.MDC import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute @@ -86,8 +88,10 @@ case class DeltaSink( , op = streamingUpdate) } logInfo( - s"Committed transaction, batchId=${batchId}, duration=${durationMs} ms, " + - s"added ${newFiles.size} files, removed ${deletedFiles.size} files.") + log"Committed transaction, batchId=${MDC(DeltaLogKeys.BATCH_ID, batchId)}, " + + log"duration=${MDC(DeltaLogKeys.DURATION, durationMs)} ms, " + + log"added ${MDC(DeltaLogKeys.NUM_FILES, newFiles.size)} files, " + + log"removed ${MDC(DeltaLogKeys.NUM_FILES2, deletedFiles.size)} files.") val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq) } @@ -125,7 +129,8 @@ case class DeltaSink( val currentVersion = txn.txnVersion(queryId) if (currentVersion >= batchId) { - logInfo(s"Skipping already complete epoch $batchId, in query $queryId") + logInfo(log"Skipping already complete epoch ${MDC(DeltaLogKeys.BATCH_ID, batchId)}, " + + log"in query ${MDC(DeltaLogKeys.QUERY_ID, queryId)}") return false } @@ -141,8 +146,10 @@ case class DeltaSink( val totalSize = newFiles.map(_.getFileSize).sum val totalLogicalRecords = newFiles.map(_.numLogicalRecords.getOrElse(0L)).sum logInfo( - s"Wrote ${newFiles.size} files, with total size ${totalSize}, " + - s"${totalLogicalRecords} logical records, duration=${writeFilesTimeMs} ms.") + log"Wrote ${MDC(DeltaLogKeys.NUM_FILES, newFiles.size)} files, with total size " + + log"${MDC(DeltaLogKeys.NUM_BYTES, totalSize)}, " + + log"${MDC(DeltaLogKeys.NUM_RECORDS, totalLogicalRecords)} logical records, " + + log"duration=${MDC(DeltaLogKeys.DURATION, writeFilesTimeMs)} ms.") val info = DeltaOperations.StreamingUpdate(outputMode, queryId, batchId, options.userMetadata ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 593235a686b..879ea3c7b0a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.files.DeltaSourceSnapshot +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.storage.{ClosableIterator, SupportsRewinding} @@ -34,6 +35,7 @@ import org.apache.spark.sql.delta.storage.ClosableIterator._ import org.apache.spark.sql.delta.util.{DateTimeUtils, TimestampFormatter} import org.apache.hadoop.fs.FileStatus +import org.apache.spark.internal.MDC import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} @@ -189,13 +191,14 @@ trait DeltaSourceBase extends Source // Fallback for backward compat only, this should technically not be triggered .getOrElse { val config = snapshotAtSourceInit.metadata.configuration - logWarning(s"Using snapshot's table configuration: $config") + logWarning(log"Using snapshot's table configuration: " + + log"${MDC(DeltaLogKeys.CONFIG, config)}") config } ) val protocol: Protocol = customMetadata.protocol.getOrElse { val protocol = snapshotAtSourceInit.protocol - logWarning(s"Using snapshot's protocol: $protocol") + logWarning(log"Using snapshot's protocol: ${MDC(DeltaLogKeys.PROTOCOL, protocol)}") protocol } // The following are not important in stream reading @@ -257,7 +260,9 @@ trait DeltaSourceBase extends Source val offset = latestOffsetInternal(startOffsetOpt, ReadLimit.allAvailable()) lastOffsetForTriggerAvailableNow = offset lastOffsetForTriggerAvailableNow.foreach { lastOffset => - logInfo(s"lastOffset for Trigger.AvailableNow has set to ${lastOffset.json}") + + logInfo(log"lastOffset for Trigger.AvailableNow has set to " + + log"${MDC(DeltaLogKeys.OFFSET, lastOffset.json)}") } } @@ -326,9 +331,13 @@ trait DeltaSourceBase extends Source val (result, duration) = Utils.timeTakenMs { createDataFrame(filteredIndexedFiles) } - logInfo(s"Getting dataFrame for delta_log_path=${deltaLog.logPath} with " + - s"startVersion=$startVersion, startIndex=$startIndex, " + - s"isInitialSnapshot=$isInitialSnapshot, endOffset=$endOffset took timeMs=$duration ms") + logInfo(log"Getting dataFrame for delta_log_path=" + + log"${MDC(DeltaLogKeys.PATH, deltaLog.logPath)} with " + + log"startVersion=${MDC(DeltaLogKeys.START_VERSION, startVersion)}, " + + log"startIndex=${MDC(DeltaLogKeys.START_INDEX, startIndex)}, " + + log"isInitialSnapshot=${MDC(DeltaLogKeys.IS_INIT_SNAPSHOT, isInitialSnapshot)}, " + + log"endOffset=${MDC(DeltaLogKeys.END_INDEX, endOffset)} took timeMs=" + + log"${MDC(DeltaLogKeys.DURATION, duration)} ms") result } finally { fileActionsIter.close() @@ -731,7 +740,7 @@ case class DeltaSource( protected var initialState: DeltaSourceSnapshot = null protected var initialStateVersion: Long = -1L - logInfo(s"Filters being pushed down: $filters") + logInfo(log"Filters being pushed down: ${MDC(DeltaLogKeys.FILTER, filters)}") /** * Get the changes starting from (startVersion, startIndex). The start point should not be @@ -811,9 +820,12 @@ case class DeltaSource( } iter } - logInfo(s"Getting file changes for delta_log_path=${deltaLog.logPath} with " + - s"fromVersion=$fromVersion, fromIndex=$fromIndex, isInitialSnapshot=$isInitialSnapshot " + - s"took timeMs=$duration ms") + logInfo(log"Getting file changes for delta_log_path=" + + log"${MDC(DeltaLogKeys.PATH, deltaLog.logPath)} with " + + log"fromVersion=${MDC(DeltaLogKeys.START_VERSION, fromVersion)}, " + + log"fromIndex=${MDC(DeltaLogKeys.START_INDEX, fromIndex)}, " + + log"isInitialSnapshot=${MDC(DeltaLogKeys.IS_INIT_SNAPSHOT, isInitialSnapshot)} " + + log"took timeMs=${MDC(DeltaLogKeys.DURATION, duration)} ms") result } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala index cf8081057d7..0e66cf83d9f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.delta.sources import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.actions.DomainMetadata import org.apache.spark.sql.delta.commands.cdc.CDCReader +import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.internal.MDC import org.apache.spark.sql.DataFrame import org.apache.spark.util.Utils @@ -214,9 +216,13 @@ trait DeltaSourceCDCSupport { self: DeltaSource => isStreaming = true) .fileChangeDf } - logInfo(s"Getting CDC dataFrame for delta_log_path=${deltaLog.logPath} with " + - s"startVersion=$startVersion, startIndex=$startIndex, " + - s"isInitialSnapshot=$isInitialSnapshot, endOffset=$endOffset took timeMs=$duration ms") + logInfo(log"Getting CDC dataFrame for delta_log_path=" + + log"${MDC(DeltaLogKeys.PATH, deltaLog.logPath)} with " + + log"startVersion=${MDC(DeltaLogKeys.START_VERSION, startVersion)}, " + + log"startIndex=${MDC(DeltaLogKeys.START_INDEX, startIndex)}, " + + log"isInitialSnapshot=${MDC(DeltaLogKeys.IS_INIT_SNAPSHOT, isInitialSnapshot)}, " + + log"endOffset=${MDC(DeltaLogKeys.END_OFFSET, endOffset)} took timeMs=" + + log"${MDC(DeltaLogKeys.DURATION, duration)} ms") result } @@ -319,9 +325,12 @@ trait DeltaSourceCDCSupport { self: DeltaSource => } } - logInfo(s"Getting CDC file changes for delta_log_path=${deltaLog.logPath} with " + - s"fromVersion=$fromVersion, fromIndex=$fromIndex, isInitialSnapshot=$isInitialSnapshot " + - s"took timeMs=$duration ms") + logInfo(log"Getting CDC file changes for delta_log_path=" + + log"${MDC(DeltaLogKeys.PATH, deltaLog.logPath)} with " + + log"fromVersion=${MDC(DeltaLogKeys.START_VERSION, fromVersion)}, fromIndex=" + + log"${MDC(DeltaLogKeys.START_INDEX, fromIndex)}, " + + log"isInitialSnapshot=${MDC(DeltaLogKeys.IS_INIT_SNAPSHOT, isInitialSnapshot)} took timeMs=" + + log"${MDC(DeltaLogKeys.DURATION, duration)} ms") result } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/storage/DelegatingLogStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/storage/DelegatingLogStore.scala index e839f55246c..6a38bbf4b67 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/storage/DelegatingLogStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/storage/DelegatingLogStore.scala @@ -21,11 +21,13 @@ import java.util.Locale import scala.collection.mutable import org.apache.spark.sql.delta.DeltaErrors +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.SparkEnv +import org.apache.spark.internal.MDC /** @@ -72,7 +74,8 @@ class DelegatingLogStore(hadoopConf: Configuration) case lsa: LogStoreAdaptor => s"LogStoreAdapter(${lsa.logStoreImpl.getClass.getName})" case _ => logStore.getClass.getName } - logInfo(s"LogStore `$actualLogStoreClassName` is used for scheme `$scheme`") + logInfo(log"LogStore ${MDC(DeltaLogKeys.CLASS_NAME, actualLogStoreClassName)} " + + log"is used for scheme ${MDC(DeltaLogKeys.FILE_SYSTEM_SCHEME, scheme)}") logStore } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/streaming/SchemaTrackingLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/streaming/SchemaTrackingLog.scala index 3cb685f7726..7c838ecc09c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/streaming/SchemaTrackingLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/streaming/SchemaTrackingLog.scala @@ -22,9 +22,11 @@ import java.nio.charset.StandardCharsets._ import scala.io.{Source => IOSource} import scala.reflect.ClassTag +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.util.JsonUtils import com.fasterxml.jackson.annotation.JsonIgnore +import org.apache.spark.internal.{LoggingShims, MDC} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, MetadataVersionUtil} import org.apache.spark.sql.types.{DataType, StructType} @@ -101,7 +103,7 @@ class SchemaTrackingLog[T <: PartitionAndDataSchema: ClassTag: Manifest]( sparkSession: SparkSession, path: String, schemaSerializer: SchemaSerializer[T]) - extends HDFSMetadataLog[T](sparkSession, path) { + extends HDFSMetadataLog[T](sparkSession, path) with LoggingShims { import SchemaTrackingExceptions._ @@ -153,11 +155,12 @@ class SchemaTrackingLog[T <: PartitionAndDataSchema: ClassTag: Manifest]( */ def addSchemaToLog(newSchema: T): T = { // Write to schema log - logInfo(s"Writing a new metadata version $nextSeqNumToWrite in the metadata log") + logInfo(log"Writing a new metadata version " + + log"${MDC(DeltaLogKeys.VERSION, nextSeqNumToWrite)} in the metadata log") if (currentTrackedSchema.contains(newSchema)) { // Record a warning if schema has not changed - logWarning(s"Schema didn't change after schema evolution. " + - s"currentSchema = ${currentTrackedSchema}.") + logWarning(log"Schema didn't change after schema evolution. " + + log"currentSchema = ${MDC(DeltaLogKeys.SCHEMA, currentTrackedSchema)}.") return newSchema } // Similar to how MicrobatchExecution detects concurrent checkpoint updates diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala index 48946ae2235..bf1d4727fd7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala @@ -25,6 +25,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.delta.{DeltaErrors, SerializableFileStatus} import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.storage.LogStore import org.apache.commons.io.IOUtils @@ -36,6 +37,7 @@ import org.apache.parquet.hadoop.{Footer, ParquetFileReader} import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.MDC import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream @@ -105,8 +107,9 @@ object DeltaFileOperations extends DeltaLogging { // ignore the error and just return `child` as is. child case e: IllegalArgumentException => - logError(s"Failed to relativize the path ($child) " + - s"with the base path ($basePath) and the file system URI (${fs.getUri})", e) + logError(log"Failed to relativize the path ${MDC(DeltaLogKeys.PATH, child)} " + + log"with the base path ${MDC(DeltaLogKeys.PATH2, basePath)} " + + log"and the file system URI ${MDC(DeltaLogKeys.URI, fs.getUri)}", e) throw DeltaErrors.failRelativizePath(child.toString) } } else { @@ -125,7 +128,8 @@ object DeltaFileOperations extends DeltaLogging { base: Int = 100, jitter: Int = 1000): Unit = { val sleepTime = Random.nextInt(jitter) + base - logWarning(s"Sleeping for $sleepTime ms to rate limit $opName", t) + logWarning(log"Sleeping for ${MDC(DeltaLogKeys.TIME_MS, sleepTime)} ms to rate limit " + + log"${MDC(DeltaLogKeys.OP_NAME, opName)}", t) Thread.sleep(sleepTime) } @@ -150,7 +154,7 @@ object DeltaFileOperations extends DeltaLogging { listAsDirectories: Boolean = true): Iterator[SerializableFileStatus] = { def list(dir: String, tries: Int): Iterator[SerializableFileStatus] = { - logInfo(s"Listing $dir") + logInfo(log"Listing ${MDC(DeltaLogKeys.DIR, dir)}") try { val path = if (listAsDirectories) new Path(dir, "\u0000") else new Path(dir + "\u0000") logStore.listFrom(path, hadoopConf) @@ -378,7 +382,7 @@ object DeltaFileOperations extends DeltaLogging { tempPath.getFileSystem(conf).delete(tempPath, false /* = recursive */) } catch { case NonFatal(e) => - logError(s"Failed to delete $tempPath", e) + logError(log"Failed to delete ${MDC(DeltaLogKeys.PATH, tempPath)}", e) } () // Make the compiler happy } @@ -403,7 +407,8 @@ object DeltaFileOperations extends DeltaLogging { conf, currentFile, SKIP_ROW_GROUPS))) } catch { case e: RuntimeException => if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) + logWarning(log"Skipped the footer in the corrupted file: " + + log"${MDC(DeltaLogKeys.FILE_STATUS, currentFile)}", e) None } else { throw DeltaErrors.failedReadFileFooter(currentFile.toString, e) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaProgressReporter.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaProgressReporter.scala index 0cab3f46616..b888feb5aa4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaProgressReporter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaProgressReporter.scala @@ -16,11 +16,13 @@ package org.apache.spark.sql.delta.util +import org.apache.spark.sql.delta.logging.DeltaLogKeys + import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LoggingShims, MDC} import org.apache.spark.sql.SparkSession -trait DeltaProgressReporter extends Logging { +trait DeltaProgressReporter extends LoggingShims { /** * Report a log to indicate some command is running. */ @@ -28,9 +30,10 @@ trait DeltaProgressReporter extends Logging { statusCode: String, defaultMessage: String, data: Map[String, Any] = Map.empty)(body: => T): T = { - logInfo(s"$statusCode: $defaultMessage") + logInfo(log"${MDC(DeltaLogKeys.STATUS, statusCode)}: " + + log"${MDC(DeltaLogKeys.MESSAGE, defaultMessage)}") val t = withJobDescription(defaultMessage)(body) - logInfo(s"$statusCode: Done") + logInfo(log"${MDC(DeltaLogKeys.STATUS, statusCode)}: Done") t } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/SparkThreadLocalForwardingThreadPoolExecutor.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/SparkThreadLocalForwardingThreadPoolExecutor.scala index dbb126ccaea..109606dab40 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/SparkThreadLocalForwardingThreadPoolExecutor.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/SparkThreadLocalForwardingThreadPoolExecutor.scala @@ -21,8 +21,10 @@ import java.util.concurrent._ import scala.collection.JavaConverters._ +import org.apache.spark.sql.delta.logging.DeltaLogKeys + import org.apache.spark.{SparkContext, TaskContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LoggingShims, MDC} import org.apache.spark.util.{Utils => SparkUtils} /** @@ -46,7 +48,7 @@ class SparkThreadLocalForwardingThreadPoolExecutor( } -trait SparkThreadLocalCapturingHelper extends Logging { +trait SparkThreadLocalCapturingHelper extends LoggingShims { // At the time of creating this instance we capture the task context and command context. val capturedTaskContext = TaskContext.get() val sparkContext = SparkContext.getActive @@ -75,7 +77,8 @@ trait SparkThreadLocalCapturingHelper extends Logging { body } catch { case t: Throwable => - logError(s"Exception in thread ${Thread.currentThread().getName}", t) + logError(log"Exception in thread " + + log"${MDC(DeltaLogKeys.THREAD_NAME, Thread.currentThread().getName)}", t) throw t } finally { TaskContext.setTaskContext(previousTaskContext)