Skip to content

Commit

Permalink
[SPARK-6431][Streaming][Kafka] Error message for partition metadata r…
Browse files Browse the repository at this point in the history
…equ...

...ests

The original reported problem was misdiagnosed; the topic just didn't exist yet.  Agreed upon solution was to improve error handling / message

Author: cody koeninger <[email protected]>

Closes apache#5454 from koeninger/spark-6431-master and squashes the following commits:

44300f8 [cody koeninger] [SPARK-6431][Streaming][Kafka] Error message for partition metadata requests
  • Loading branch information
koeninger authored and srowen committed Apr 12, 2015
1 parent ddc1743 commit 6ac8eea
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,17 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
// error codes here indicate missing / just created topic,
// repeating on a different broker wont be useful
return Right(resp.topicsMetadata.toSet)
val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)

if (respErrs.isEmpty) {
return Right(resp.topicsMetadata.toSet)
} else {
respErrs.foreach { m =>
val cause = ErrorMapping.exceptionFor(m.errorCode)
val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
errs.append(new SparkException(msg, cause))
}
}
}
Left(errs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll {

val parts = kc.getPartitions(Set(topic)).right.get
assert(parts(topicAndPartition), "didn't get partitions")

val err = kc.getPartitions(Set(topic + "BAD"))
assert(err.isLeft, "getPartitions for a nonexistant topic should be an error")
}

test("leader offset apis") {
Expand Down

0 comments on commit 6ac8eea

Please sign in to comment.