Skip to content

Commit

Permalink
SerializationException for bad deserialization
Browse files Browse the repository at this point in the history
Closes #1
  • Loading branch information
NeQuissimus committed May 14, 2018
1 parent 59ee16d commit 6772cb4
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ crossScalaVersions := Seq("2.11.12", "2.12.6")

organization := "com.nequissimus"
name := "circe-kafka"
version := "1.0.2-SNAPSHOT"
version := "1.0.2"

// https://tpolecat.github.io/2017/04/25/scalac-flags.html
val scalac212Options = Seq(
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/ImplicitSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.github.ghik.silencer.silent
import io.circe.{ Decoder, Encoder }
import io.circe.parser._

import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.{ Deserializer, Serde, Serdes, Serializer }

package object kafka {
Expand All @@ -21,7 +22,9 @@ package object kafka {
implicit def deserializer[T](implicit decoder: Decoder[T]): Deserializer[T] = new Deserializer[T] {
def close(): Unit = {}
@silent def configure(config: JMap[String, _], isKey: Boolean): Unit = {}
def deserialize(topic: String, data: Array[Byte]): T = decode[T](new String(data)).fold(_ => null.asInstanceOf[T], identity)
def deserialize(topic: String, data: Array[Byte]): T = if (data eq null) null.asInstanceOf[T] else {
decode[T](new String(data)).fold(error => throw new SerializationException(error), identity)
}
}

implicit def serde[T](implicit serializer: Serializer[T], deserializer: Deserializer[T]): Serde[T] = Serdes.serdeFrom(serializer, deserializer)
Expand Down

0 comments on commit 6772cb4

Please sign in to comment.