Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Nov 7, 2024
1 parent 0c5130d commit 2af4c6e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
12 changes: 11 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

<!-- dependencies -->
<!-- latest version from apache pulsar -->
<pulsar.version>2.10.2</pulsar.version>
<pulsar.version>3.3.2</pulsar.version>
<scala.version>2.12.17</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<scalatest.version>3.2.14</scalatest.version>
Expand Down Expand Up @@ -147,6 +147,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.72</version>
</dependency>

<dependency>
<groupId>commons-io</groupId>
Expand Down Expand Up @@ -390,6 +395,7 @@
<include>org.bouncycastle*:*</include>
<include>org.lz4*:*</include>
<include>commons-io:commons-io:jar:*</include>
<include>io.opentelemetry:*</include> <!-- Add this -->
</includes>
</artifactSet>
<filters>
Expand All @@ -409,6 +415,10 @@
</filter>
</filters>
<relocations>
<relocation>
<pattern>io.opentelemetry</pattern>
<shadedPattern>org.apache.pulsar.shade.io.opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ import org.apache.pulsar.client.impl.conf.{
ProducerConfigurationData,
ReaderConfigurationData
}
import org.apache.pulsar.shade.com.fasterxml.jackson.annotation.JsonIgnore

object PulsarConfigurationUtils {

private def nonIgnoredFields[T: ClassTag] = {
classTag[T].runtimeClass.getDeclaredFields
.filter(f => !Modifier.isStatic(f.getModifiers))
.filter(f => f.getDeclaredAnnotation(classOf[JsonIgnore]) == null)
.map(_.getName)
}

Expand Down
10 changes: 7 additions & 3 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,10 @@ private[pulsar] case class PulsarHelper(
private def getTopics(topicsPattern: String): Seq[String] = {
val dest = TopicName.get(topicsPattern)
val allTopics: ju.List[String] = client.getLookup
.getTopicsUnderNamespace(dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL)
.get()
.getTopicsUnderNamespace(
// passing an empty topicsHash because we don't cache the GetTopicsResponse
dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL, topicsPattern, "")
.get().getTopics

val allNonPartitionedTopics: ju.List[String] = allTopics.asScala
.filter(t => !TopicName.get(t).isPartitioned)
Expand Down Expand Up @@ -345,7 +347,9 @@ private[pulsar] case class PulsarHelper(
while (waitList.nonEmpty) {
val topic = waitList.head
try {
client.getPartitionedTopicMetadata(topic).get()
// setting metadataAutoCreationEnabled to false, and useFallbackForNonPIP344Brokers
// to true to conform to non-breaking behavior.
client.getPartitionedTopicMetadata(topic, false, true).get()
waitList -= topic
} catch {
case NonFatal(_) =>
Expand Down

0 comments on commit 2af4c6e

Please sign in to comment.