Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update scala version to 2.13 #171

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
</parent>

<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-spark-connector_2.12</artifactId>
<version>3.4.0-SNAPSHOT</version>
<artifactId>pulsar-spark-connector_2.13</artifactId>
<version>3.4.1-SNAPSHOT</version>
<name>StreamNative :: Pulsar Spark Connector</name>
<url>https://pulsar.apache.org</url>
<inceptionYear>2019</inceptionYear>
Expand Down Expand Up @@ -66,11 +66,11 @@

<!-- dependencies -->
<!-- latest version from apache pulsar -->
<pulsar.version>2.10.2</pulsar.version>
<scala.version>2.12.17</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<pulsar.version>2.10.5</pulsar.version>
<scala.version>2.13.12</scala.version>
<scala.binary.version>2.13</scala.binary.version>
<scalatest.version>3.2.14</scalatest.version>
<spark.version>3.4.0</spark.version>
<spark.version>3.4.1</spark.version>
<commons-io.version>2.11.0</commons-io.version>
<testcontainers.version>1.18.3</testcontainers.version>

Expand All @@ -83,7 +83,7 @@
<maven-shade-plugin.version>3.4.0</maven-shade-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<scala-maven-plugin.version>4.7.2</scala-maven-plugin.version>
<scala-maven-plugin.version>4.8.1</scala-maven-plugin.version>
<scalatest-maven-plugin.version>2.2.0</scalatest-maven-plugin.version>
<scalastyle-maven-plugin.version>1.0.0</scalastyle-maven-plugin.version>
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
Expand Down Expand Up @@ -329,7 +329,6 @@
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-explaintypes</arg>
<arg>-Yno-adapted-args</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package org.apache.spark.sql.pulsar
import java.{util => ju}
import java.util.concurrent.{ExecutionException, TimeUnit}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import com.google.common.cache._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package org.apache.spark.sql.pulsar

import java.{util => ju}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.spark.internal.Logging

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import java.util.Date

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord
Expand Down
11 changes: 7 additions & 4 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[pulsar] case class PulsarHelper(
extends Closeable
with Logging {

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

protected var client: PulsarClientImpl =
PulsarClientFactory.getOrCreate(
Expand Down Expand Up @@ -177,7 +177,7 @@ private[pulsar] case class PulsarHelper(
}

def getAndCheckCompatible(schema: Option[StructType]): StructType = {
val si = getPulsarSchema
val si = getPulsarSchema()
val inferredSchema = SchemaUtils.pulsarSourceSchema(si)
require(
schema.isEmpty || inferredSchema == schema.get,
Expand Down Expand Up @@ -288,11 +288,13 @@ private[pulsar] case class PulsarHelper(
case Some((TopicSingle, value)) =>
TopicName.get(value).toString :: Nil
case Some((TopicMulti, value)) =>
value.split(",").map(_.trim).filter(_.nonEmpty).map(TopicName.get(_).toString)
value.split(",").toIndexedSeq.map(_.trim).filter(_.nonEmpty).map(TopicName.get(_).toString)
case Some((TopicPattern, value)) =>
getTopics(value)
case None =>
nlu90 marked this conversation as resolved.
Show resolved Hide resolved
throw new RuntimeException("Failed to get topics from configurations")
case _ =>
throw new RuntimeException("Unrecognized topics configuration")
}

waitForTopicIfNeeded()
Expand Down Expand Up @@ -334,6 +336,7 @@ private[pulsar] case class PulsarHelper(
val shortenedTopicsPattern = Pattern.compile(topicsPattern.split("\\:\\/\\/")(1))
allTopics.asScala
.map(TopicName.get(_).toString)
.toSeq
.filter(tp => shortenedTopicsPattern.matcher(tp.split("\\:\\/\\/")(1)).matches())
}

Expand Down Expand Up @@ -532,7 +535,7 @@ class PulsarAdmissionControlHelper(adminUrl: String, conf: ju.Map[String, Object
private lazy val pulsarAdmin =
PulsarAdmin.builder().serviceHttpUrl(adminUrl).loadConf(conf).build()

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

def latestOffsetForTopicPartition(topicPartition: String,
startMessageId: MessageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package org.apache.spark.sql.pulsar
import java.nio.ByteBuffer
import java.util

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.api.schema.Field
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package org.apache.spark.sql.pulsar
import java.{util => ju}
import java.util.Optional

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.MessageId
Expand Down Expand Up @@ -76,9 +76,9 @@ private[pulsar] class PulsarSource(
accumulator
}

private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema
private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema()

override def schema(): StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)
override def schema: StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)

override def getOffset: Option[Offset] = {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -196,7 +196,7 @@ private[pulsar] class PulsarSource(
"GetBatch generating RDD of offset range: " +
offsetRanges.sortBy(_.topic).mkString(", "))

sqlContext.internalCreateDataFrame(rdd.setName("pulsar"), schema(), isStreaming = true)
sqlContext.internalCreateDataFrame(rdd.setName("pulsar"), schema, isStreaming = true)
}

override def commit(end: Offset): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package org.apache.spark.sql.pulsar
import java.{util => ju}
import java.util.function.BiConsumer

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.api.{MessageId, Producer}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package org.apache.spark.sql.pulsar
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.api.{Schema => PSchema}
import org.apache.pulsar.client.api.schema.{GenericRecord, GenericSchema}
Expand Down Expand Up @@ -132,7 +132,7 @@ private[pulsar] object SchemaUtils {
mainSchema += StructField("value", t, nullable = typeNullable.nullable)
}
mainSchema ++= metaDataFields
StructType(mainSchema)
StructType(mainSchema.toArray)
}

def si2SqlType(si: SchemaInfo): TypeNullable = {
Expand Down Expand Up @@ -200,7 +200,7 @@ private[pulsar] object SchemaUtils {
StructField(f.name, typeNullable.dataType, typeNullable.nullable)
}

TypeNullable(StructType(fields), nullable = false)
TypeNullable(StructType(fields.toArray), nullable = false)

case ARRAY =>
val typeNullable: TypeNullable =
Expand Down Expand Up @@ -242,7 +242,7 @@ private[pulsar] object SchemaUtils {
StructField(s"member$i", TypeNullable.dataType, nullable = true)
}

TypeNullable(StructType(fields), nullable = false)
TypeNullable(StructType(fields.toArray), nullable = false)
}
}
case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ConnectorInitializationSuite extends SparkFunSuite {
// This test will fail if an exception is thrown during initialization of
// the Pulsar Spark connector.
test("connector initialization test") {
val spark = SparkSession.builder
val spark = SparkSession.builder()
.appName("connector initialization test")
.master("local")
.getOrCreate()
Expand All @@ -33,10 +33,10 @@ class ConnectorInitializationSuite extends SparkFunSuite {
.option("service.url", "pulsar://testurl") // value not important, but must be set
.option("admin.url", "http://testurl") // value not important, but must be set
.option("topic", "testtopic") // value not important, but must be set
.load
.load()
} catch {
case e: java.lang.ExceptionInInitializerError =>
fail(e.getException + " was thrown during connector initialization")
fail(s"${e.getException} was thrown during connector initialization")
case e: java.lang.Throwable => logInfo(e.getMessage)
} finally {
spark.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package org.apache.spark.sql.pulsar

import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.spark.SparkException
Expand Down Expand Up @@ -62,7 +62,7 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {
.option(ServiceUrlOptionKey, serviceUrl)
.option(TopicSingle, topic)

testStream(reader.load)(makeSureGetOffsetCalled, StopStream, StartStream(), StopStream)
testStream(reader.load())(makeSureGetOffsetCalled, StopStream, StartStream(), StopStream)
}

test("input row metrics") {
Expand Down Expand Up @@ -210,9 +210,9 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {

val windowedAggregation = pulsar
.withWatermark("__publishTime", "10 seconds")
.groupBy(window($"__publishTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start") as 'window, $"count")
.groupBy(window($"__publishTime", "5 seconds") as Symbol("window"))
.agg(count("*") as Symbol("count"))
.select($"window".getField("start") as Symbol("window"), $"count")

val query = windowedAggregation.writeStream
.format("memory")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ class PulsarSinkSuite extends StreamTest with PulsarTest with SharedSparkSession
try {
val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
val row = new SpecificInternalRow(fieldTypes)
val row = new SpecificInternalRow(fieldTypes.toIndexedSeq)
row.update(0, data)
val iter = Seq.fill(1000)(converter.apply(row)).iterator
writeTask.execute(iter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ abstract class PulsarSourceSuiteBase extends PulsarSourceTest {
.start()
eventually(timeout(streamingTimeout)) {
assert(
spark.table("pulsarColumnTypes").count == 1,
spark.table("pulsarColumnTypes").count() == 1,
s"Unexpected results: ${spark.table("pulsarColumnTypes").collectAsList()}")
}
val row = spark.table("pulsarColumnTypes").head()
Expand Down
10 changes: 5 additions & 5 deletions src/test/scala/org/apache/spark/sql/pulsar/PulsarTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.nio.charset.StandardCharsets.UTF_8
import java.time.{Clock, Duration}
import java.util.{Map => JMap}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.reflect.ClassTag

Expand Down Expand Up @@ -108,13 +108,13 @@ trait PulsarTest extends BeforeAndAfterAll with BeforeAndAfterEach {
val tps = admin.namespaces().getTopics("public/default").asScala
tps.map { tp =>
(tp, PulsarSourceUtils.seekableLatestMid(admin.topics().getLastMessageId(tp)))
}
}.toSeq
}
}

/** Java-friendly function for sending messages to the Pulsar */
def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
sendMessages(topic, Map(messageToFreq.asScala.view.mapValues(_.intValue()).toSeq: _*))
}

/** Send the messages to the Pulsar */
Expand Down Expand Up @@ -154,7 +154,7 @@ trait PulsarTest extends BeforeAndAfterAll with BeforeAndAfterEach {
producer.close()
client.close()
}
offsets
offsets.toSeq
}

/** Send the array of messages to the Pulsar using specified partition */
Expand Down Expand Up @@ -185,7 +185,7 @@ trait PulsarTest extends BeforeAndAfterAll with BeforeAndAfterEach {
producer.close()
client.close()
}
offsets
offsets.toSeq
}

def sendTypedMessages[T: ClassTag](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.util
import java.util.Calendar

import scala.beans.BeanProperty
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

object SchemaData {

Expand Down