Skip to content

Commit

Permalink
update scala version to 2.13 (#171)
Browse files Browse the repository at this point in the history
* update scala version to 2.13

* fix scalastyle check

* update
  • Loading branch information
nlu90 authored Nov 30, 2023
1 parent 2c35364 commit 804fe6d
Show file tree
Hide file tree
Showing 15 changed files with 43 additions and 41 deletions.
15 changes: 7 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
</parent>

<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-spark-connector_2.12</artifactId>
<version>3.4.0-SNAPSHOT</version>
<artifactId>pulsar-spark-connector_2.13</artifactId>
<version>3.4.1-SNAPSHOT</version>
<name>StreamNative :: Pulsar Spark Connector</name>
<url>https://pulsar.apache.org</url>
<inceptionYear>2019</inceptionYear>
Expand Down Expand Up @@ -66,11 +66,11 @@

<!-- dependencies -->
<!-- latest version from apache pulsar -->
<pulsar.version>2.10.2</pulsar.version>
<scala.version>2.12.17</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<pulsar.version>2.10.5</pulsar.version>
<scala.version>2.13.12</scala.version>
<scala.binary.version>2.13</scala.binary.version>
<scalatest.version>3.2.14</scalatest.version>
<spark.version>3.4.0</spark.version>
<spark.version>3.4.1</spark.version>
<commons-io.version>2.11.0</commons-io.version>
<testcontainers.version>1.18.3</testcontainers.version>

Expand All @@ -83,7 +83,7 @@
<maven-shade-plugin.version>3.4.0</maven-shade-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<scala-maven-plugin.version>4.7.2</scala-maven-plugin.version>
<scala-maven-plugin.version>4.8.1</scala-maven-plugin.version>
<scalatest-maven-plugin.version>2.2.0</scalatest-maven-plugin.version>
<scalastyle-maven-plugin.version>1.0.0</scalastyle-maven-plugin.version>
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
Expand Down Expand Up @@ -329,7 +329,6 @@
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-explaintypes</arg>
<arg>-Yno-adapted-args</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package org.apache.spark.sql.pulsar
import java.{util => ju}
import java.util.concurrent.{ExecutionException, TimeUnit}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import com.google.common.cache._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package org.apache.spark.sql.pulsar

import java.{util => ju}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.spark.internal.Logging

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import java.util.Date

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord
Expand Down
11 changes: 7 additions & 4 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[pulsar] case class PulsarHelper(
extends Closeable
with Logging {

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

protected var client: PulsarClientImpl =
PulsarClientFactory.getOrCreate(
Expand Down Expand Up @@ -177,7 +177,7 @@ private[pulsar] case class PulsarHelper(
}

def getAndCheckCompatible(schema: Option[StructType]): StructType = {
val si = getPulsarSchema
val si = getPulsarSchema()
val inferredSchema = SchemaUtils.pulsarSourceSchema(si)
require(
schema.isEmpty || inferredSchema == schema.get,
Expand Down Expand Up @@ -288,11 +288,13 @@ private[pulsar] case class PulsarHelper(
case Some((TopicSingle, value)) =>
TopicName.get(value).toString :: Nil
case Some((TopicMulti, value)) =>
value.split(",").map(_.trim).filter(_.nonEmpty).map(TopicName.get(_).toString)
value.split(",").toIndexedSeq.map(_.trim).filter(_.nonEmpty).map(TopicName.get(_).toString)
case Some((TopicPattern, value)) =>
getTopics(value)
case None =>
throw new RuntimeException("Failed to get topics from configurations")
case _ =>
throw new RuntimeException("Unrecognized topics configuration")
}

waitForTopicIfNeeded()
Expand Down Expand Up @@ -334,6 +336,7 @@ private[pulsar] case class PulsarHelper(
val shortenedTopicsPattern = Pattern.compile(topicsPattern.split("\\:\\/\\/")(1))
allTopics.asScala
.map(TopicName.get(_).toString)
.toSeq
.filter(tp => shortenedTopicsPattern.matcher(tp.split("\\:\\/\\/")(1)).matches())
}

Expand Down Expand Up @@ -532,7 +535,7 @@ class PulsarAdmissionControlHelper(adminUrl: String, conf: ju.Map[String, Object
private lazy val pulsarAdmin =
PulsarAdmin.builder().serviceHttpUrl(adminUrl).loadConf(conf).build()

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

def latestOffsetForTopicPartition(topicPartition: String,
startMessageId: MessageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package org.apache.spark.sql.pulsar
import java.nio.ByteBuffer
import java.util

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.api.schema.Field
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package org.apache.spark.sql.pulsar
import java.{util => ju}
import java.util.Optional

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.MessageId
Expand Down Expand Up @@ -76,9 +76,9 @@ private[pulsar] class PulsarSource(
accumulator
}

private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema
private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema()

override def schema(): StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)
override def schema: StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)

override def getOffset: Option[Offset] = {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -196,7 +196,7 @@ private[pulsar] class PulsarSource(
"GetBatch generating RDD of offset range: " +
offsetRanges.sortBy(_.topic).mkString(", "))

sqlContext.internalCreateDataFrame(rdd.setName("pulsar"), schema(), isStreaming = true)
sqlContext.internalCreateDataFrame(rdd.setName("pulsar"), schema, isStreaming = true)
}

override def commit(end: Offset): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package org.apache.spark.sql.pulsar
import java.{util => ju}
import java.util.function.BiConsumer

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.api.{MessageId, Producer}

Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/org/apache/spark/sql/pulsar/SchemaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package org.apache.spark.sql.pulsar
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.api.{Schema => PSchema}
import org.apache.pulsar.client.api.schema.{GenericRecord, GenericSchema}
Expand Down Expand Up @@ -132,7 +132,7 @@ private[pulsar] object SchemaUtils {
mainSchema += StructField("value", t, nullable = typeNullable.nullable)
}
mainSchema ++= metaDataFields
StructType(mainSchema)
StructType(mainSchema.toArray)
}

def si2SqlType(si: SchemaInfo): TypeNullable = {
Expand Down Expand Up @@ -200,7 +200,7 @@ private[pulsar] object SchemaUtils {
StructField(f.name, typeNullable.dataType, typeNullable.nullable)
}

TypeNullable(StructType(fields), nullable = false)
TypeNullable(StructType(fields.toArray), nullable = false)

case ARRAY =>
val typeNullable: TypeNullable =
Expand Down Expand Up @@ -242,7 +242,7 @@ private[pulsar] object SchemaUtils {
StructField(s"member$i", TypeNullable.dataType, nullable = true)
}

TypeNullable(StructType(fields), nullable = false)
TypeNullable(StructType(fields.toArray), nullable = false)
}
}
case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ConnectorInitializationSuite extends SparkFunSuite {
// This test will fail if an exception is thrown during initialization of
// the Pulsar Spark connector.
test("connector initialization test") {
val spark = SparkSession.builder
val spark = SparkSession.builder()
.appName("connector initialization test")
.master("local")
.getOrCreate()
Expand All @@ -33,10 +33,10 @@ class ConnectorInitializationSuite extends SparkFunSuite {
.option("service.url", "pulsar://testurl") // value not important, but must be set
.option("admin.url", "http://testurl") // value not important, but must be set
.option("topic", "testtopic") // value not important, but must be set
.load
.load()
} catch {
case e: java.lang.ExceptionInInitializerError =>
fail(e.getException + " was thrown during connector initialization")
fail(s"${e.getException} was thrown during connector initialization")
case e: java.lang.Throwable => logInfo(e.getMessage)
} finally {
spark.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package org.apache.spark.sql.pulsar

import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.spark.SparkException
Expand Down Expand Up @@ -62,7 +62,7 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {
.option(ServiceUrlOptionKey, serviceUrl)
.option(TopicSingle, topic)

testStream(reader.load)(makeSureGetOffsetCalled, StopStream, StartStream(), StopStream)
testStream(reader.load())(makeSureGetOffsetCalled, StopStream, StartStream(), StopStream)
}

test("input row metrics") {
Expand Down Expand Up @@ -210,9 +210,9 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {

val windowedAggregation = pulsar
.withWatermark("__publishTime", "10 seconds")
.groupBy(window($"__publishTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start") as 'window, $"count")
.groupBy(window($"__publishTime", "5 seconds") as Symbol("window"))
.agg(count("*") as Symbol("count"))
.select($"window".getField("start") as Symbol("window"), $"count")

val query = windowedAggregation.writeStream
.format("memory")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ class PulsarSinkSuite extends StreamTest with PulsarTest with SharedSparkSession
try {
val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
val row = new SpecificInternalRow(fieldTypes)
val row = new SpecificInternalRow(fieldTypes.toIndexedSeq)
row.update(0, data)
val iter = Seq.fill(1000)(converter.apply(row)).iterator
writeTask.execute(iter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ abstract class PulsarSourceSuiteBase extends PulsarSourceTest {
.start()
eventually(timeout(streamingTimeout)) {
assert(
spark.table("pulsarColumnTypes").count == 1,
spark.table("pulsarColumnTypes").count() == 1,
s"Unexpected results: ${spark.table("pulsarColumnTypes").collectAsList()}")
}
val row = spark.table("pulsarColumnTypes").head()
Expand Down
10 changes: 5 additions & 5 deletions src/test/scala/org/apache/spark/sql/pulsar/PulsarTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.nio.charset.StandardCharsets.UTF_8
import java.time.{Clock, Duration}
import java.util.{Map => JMap}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.reflect.ClassTag

Expand Down Expand Up @@ -108,13 +108,13 @@ trait PulsarTest extends BeforeAndAfterAll with BeforeAndAfterEach {
val tps = admin.namespaces().getTopics("public/default").asScala
tps.map { tp =>
(tp, PulsarSourceUtils.seekableLatestMid(admin.topics().getLastMessageId(tp)))
}
}.toSeq
}
}

/** Java-friendly function for sending messages to the Pulsar */
def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
sendMessages(topic, Map(messageToFreq.asScala.view.mapValues(_.intValue()).toSeq: _*))
}

/** Send the messages to the Pulsar */
Expand Down Expand Up @@ -154,7 +154,7 @@ trait PulsarTest extends BeforeAndAfterAll with BeforeAndAfterEach {
producer.close()
client.close()
}
offsets
offsets.toSeq
}

/** Send the array of messages to the Pulsar using specified partition */
Expand Down Expand Up @@ -185,7 +185,7 @@ trait PulsarTest extends BeforeAndAfterAll with BeforeAndAfterEach {
producer.close()
client.close()
}
offsets
offsets.toSeq
}

def sendTypedMessages[T: ClassTag](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.util
import java.util.Calendar

import scala.beans.BeanProperty
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

object SchemaData {

Expand Down

0 comments on commit 804fe6d

Please sign in to comment.