Skip to content

Commit

Permalink
[SPARK-47592][CORE] Connector module: Migrate logError with variables…
Browse files Browse the repository at this point in the history
… to structured logging framework

### What changes were proposed in this pull request?
The pr aims to migrate `logError` in module `Connector` with variables to `structured logging framework`.

### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#45877 from panbingkun/SPARK-47593.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
panbingkun authored and gengliangwang committed Apr 6, 2024
1 parent 42dc815 commit 7385f19
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ object LogKey extends Enumeration {
val EXECUTOR_STATE = Value
val EXIT_CODE = Value
val FAILURES = Value
val GROUP_ID = Value
val HOST = Value
val JOB_ID = Value
val JOIN_CONDITION = Value
Expand All @@ -69,6 +70,7 @@ object LogKey extends Enumeration {
val OBJECT_ID = Value
val OLD_BLOCK_MANAGER_ID = Value
val OPTIMIZER_CLASS_NAME = Value
val OP_TYPE = Value
val PARTITION_ID = Value
val PATH = Value
val PATHS = Value
Expand All @@ -81,10 +83,13 @@ object LogKey extends Enumeration {
val REDUCE_ID = Value
val REMOTE_ADDRESS = Value
val RETRY_COUNT = Value
val RETRY_INTERVAL = Value
val RPC_ADDRESS = Value
val RULE_BATCH_NAME = Value
val RULE_NAME = Value
val RULE_NUMBER_OF_RUNS = Value
val SESSION_ID = Value
val SHARD_ID = Value
val SHUFFLE_BLOCK_INFO = Value
val SHUFFLE_ID = Value
val SHUFFLE_MERGE_ID = Value
Expand All @@ -105,6 +110,7 @@ object LogKey extends Enumeration {
val TOTAL_EFFECTIVE_TIME = Value
val TOTAL_TIME = Value
val URI = Value
val USER_ID = Value
val USER_NAME = Value
val WATERMARK_CONSTRAINT = Value
val WORKER_URL = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkEnv, SparkException, SparkThrowable}
import org.apache.spark.api.python.PythonException
import org.apache.spark.connect.proto.FetchErrorDetailsResponse
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{OP_TYPE, SESSION_ID, USER_ID}
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.service.{ExecuteEventsManager, SessionHolder, SessionKey, SparkConnectService}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -295,8 +296,8 @@ private[connect] object ErrorUtils extends Logging {
} else {
// Other errors are server RPC errors, return them as ERROR.
logError(
s"Spark Connect RPC error " +
s"during: $opType. UserId: $userId. SessionId: $sessionId.",
log"Spark Connect RPC error during: ${MDC(OP_TYPE, opType)}. " +
log"UserId: ${MDC(USER_ID, userId)}. SessionId: ${MDC(SESSION_ID, sessionId)}.",
original)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.util.{Failure, Success, Try}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.connect.proto
import org.apache.spark.internal.LogKey.PATH
import org.apache.spark.internal.MDC
import org.apache.spark.sql.catalyst.{catalog, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.{caseSensitiveResolution, Analyzer, FunctionRegistry, Resolver, TableFunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
Expand Down Expand Up @@ -174,7 +176,7 @@ class ProtoToParsedPlanTestSuite
val relativePath = inputFilePath.relativize(file)
val fileName = relativePath.getFileName.toString
if (!fileName.endsWith(".proto.bin")) {
logError(s"Skipping $fileName")
logError(log"Skipping ${MDC(PATH, fileName)}")
return
}
val name = fileName.stripSuffix(".proto.bin")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import com.github.dockerjava.zerodep.ZerodepDockerHttpClient
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.SpanSugar._

import org.apache.spark.internal.LogKey.CLASS_NAME
import org.apache.spark.internal.MDC
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.{DockerUtils, Utils}
Expand Down Expand Up @@ -221,7 +223,8 @@ abstract class DockerJDBCIntegrationSuite
}
} catch {
case NonFatal(e) =>
logError(s"Failed to initialize Docker container for ${this.getClass.getName}", e)
logError(log"Failed to initialize Docker container for " +
log"${MDC(CLASS_NAME, this.getClass.getName)}", e)
try {
afterAll()
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkContext
import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.GROUP_ID
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext }
Expand Down Expand Up @@ -192,7 +193,8 @@ object KafkaUtils extends Logging {
// driver and executor should be in different consumer groups
val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
if (null == originalGroupId) {
logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
logError(log"${MDC(GROUP_ID, ConsumerConfig.GROUP_ID_CONFIG)} is null, " +
log"you should probably set it")
}
val groupId = "spark-executor-" + originalGroupId
logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import scala.util.control.NonFatal

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{SHARD_ID, WORKER_URL}
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, SystemClock}
Expand Down Expand Up @@ -72,8 +73,9 @@ private[kinesis] class KinesisCheckpointer(
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
} catch {
case NonFatal(e) =>
logError(s"Exception: WorkerId $workerId encountered an exception while checkpointing" +
s"to finish reading a shard of $shardId.", e)
logError(log"Exception: WorkerId ${MDC(WORKER_URL, workerId)} encountered an " +
log"exception while checkpointing to finish reading a shard of " +
log"${MDC(SHARD_ID, shardId)}.", e)
// Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor
throw e
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
import com.amazonaws.services.kinesis.model.Record

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{RETRY_INTERVAL, SHARD_ID, WORKER_URL}

/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
Expand Down Expand Up @@ -89,8 +90,9 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
* This will potentially cause records since the last checkpoint to be processed
* more than once.
*/
logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
logError(log"Exception: WorkerId ${MDC(WORKER_URL, workerId)} encountered and " +
log"exception while storing or checkpointing a batch for workerId " +
log"${MDC(WORKER_URL, workerId)} and shardId ${MDC(SHARD_ID, shardId)}.", e)

/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
throw e
Expand Down Expand Up @@ -166,15 +168,16 @@ private[kinesis] object KinesisRecordProcessor extends Logging {
if numRetriesLeft > 1 =>
val backOffMillis = Random.nextInt(maxBackOffMillis)
Thread.sleep(backOffMillis)
logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
logError(log"Retryable Exception: Random " +
log"backOffMillis=${MDC(RETRY_INTERVAL, backOffMillis)}", e)
retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
/* Throw: Shutdown has been requested by the Kinesis Client Library. */
case _: ShutdownException =>
logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
throw e
/* Throw: Non-retryable exception has occurred with the Kinesis Client Library */
case _: InvalidStateException =>
logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" +
logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" +
s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e)
throw e
/* Throw: Unexpected exception has occurred */
Expand Down

0 comments on commit 7385f19

Please sign in to comment.