From fa131552ab5e857f801b91a58f97b8a09bf3eecc Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Tue, 11 Apr 2023 21:59:47 -0700 Subject: [PATCH] fix scalastyle issues (#127) --- .../scala/org/apache/spark/sql/pulsar/CachedConsumer.scala | 3 +++ .../org/apache/spark/sql/pulsar/CachedPulsarClient.scala | 1 + src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala | 1 + src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala | 3 ++- .../scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala | 4 ++-- .../scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala | 1 + 6 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/CachedConsumer.scala b/src/main/scala/org/apache/spark/sql/pulsar/CachedConsumer.scala index 42261f4b..d671d079 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/CachedConsumer.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/CachedConsumer.scala @@ -14,11 +14,14 @@ package org.apache.spark.sql.pulsar import java.util.concurrent.TimeUnit + import scala.util.{Failure, Success, Try} + import com.google.common.cache._ import org.apache.pulsar.client.api.{Consumer, PulsarClient, SubscriptionInitialPosition} import org.apache.pulsar.client.api.schema.GenericRecord import org.apache.pulsar.client.impl.schema.AutoConsumeSchema + import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging diff --git a/src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala b/src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala index 671eaad2..903cc431 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala @@ -23,6 +23,7 @@ import com.google.common.cache._ import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.apache.pulsar.client.api.PulsarClient import org.apache.pulsar.client.impl.PulsarClientImpl + import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.sql.pulsar.PulsarOptions._ diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala index 453ef923..18cc9e36 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala @@ -30,6 +30,7 @@ import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace import org.apache.pulsar.common.naming.TopicName import org.apache.pulsar.common.schema.SchemaInfo import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles + import org.apache.spark.internal.Logging import org.apache.spark.sql.pulsar.PulsarOptions._ import org.apache.spark.sql.types.StructType diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala index b0784327..ae22a1c9 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala @@ -19,8 +19,9 @@ import java.util.concurrent.TimeUnit import scala.util.control.NonFatal import org.apache.pulsar.client.api.{Producer, PulsarClientException, Schema} + import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} import org.apache.spark.sql.execution.QueryExecution diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala index 5f5f2b17..df49b869 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala @@ -99,8 +99,8 @@ private[pulsar] abstract class PulsarSourceRDDBase( case (_: BatchMessageIdImpl, _: BatchMessageIdImpl) => // we seek using a batch message id, we can read next directly in `getNext()` case (_: MessageIdImpl, cbmid: BatchMessageIdImpl) => - // we seek using a message id, this is supposed to be read by previous task since it's - // inclusive for the last batch (start, end], so we skip this batch + // we seek using a message id, this is supposed to be read by previous task since + // it's inclusive for the last batch (start, end], so we skip this batch val newStart = new MessageIdImpl( cbmid.getLedgerId, cbmid.getEntryId + 1, diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala index 0395563c..4f5ee2db 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala @@ -19,6 +19,7 @@ import java.util.function.BiConsumer import scala.collection.mutable import org.apache.pulsar.client.api.{MessageId, Producer} + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} import org.apache.spark.sql.types._