Skip to content

Commit

Permalink
parquet (feature): Use JSON type for complex nested objects by default
Browse files Browse the repository at this point in the history
  • Loading branch information
xerial committed Jan 18, 2024
1 parent 1ee3560 commit c278123
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,17 @@ object Value {
def isEmpty: Boolean = entries.isEmpty
def nonEmpty: Boolean = entries.nonEmpty
override def toJson: String = {
s"{${entries.map(x => s"${x._1.toJson}:${x._2.toJson}").mkString(",")}}"
entries
.map { kv =>
kv._1 match {
case StringValue(s) => s"${kv._1.toJson}:${kv._2.toJson}"
case _ =>
// JSON requires Map key must be a quoted UTF-8 string
val jsonKey = new StringBuilder()
appendJsonString(jsonKey, kv._1.toJson)
s"${jsonKey.result()}:${kv._2.toJson}"
}
}.mkString("{", ",", "}")
}
override def valueType: ValueType = ValueType.MAP
override def writeTo(packer: Packer): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package wvlet.airframe.parquet
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter, RecordMaterializer}
import org.apache.parquet.schema.{GroupType, MessageType}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.LogicalTypeAnnotation.stringType
import org.apache.parquet.schema.LogicalTypeAnnotation.{jsonType, stringType}
import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.codec.PrimitiveCodec.ValueCodec
import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport

import scala.jdk.CollectionConverters.*

object ParquetRecordReader extends LogSupport {
Expand Down Expand Up @@ -56,7 +57,15 @@ object ParquetRecordReader extends LogSupport {
}
private class MsgPackConverter(fieldName: String, holder: RecordBuilder) extends PrimitiveConverter {
override def addBinary(value: Binary): Unit = {
holder.add(fieldName, ValueCodec.fromMsgPack(value.getBytes))
val v = ValueCodec.fromMsgPack(value.getBytes)
holder.add(fieldName, v)
}
}
private class JsonConverter(fieldName: String, holder: RecordBuilder) extends PrimitiveConverter {
override def addBinary(value: Binary): Unit = {
val json = value.toStringUsingUTF8
warn(s"read json: ${json}")
holder.add(fieldName, json)
}
}

Expand Down Expand Up @@ -84,8 +93,10 @@ class ParquetRecordReader[A](
case PrimitiveTypeName.BOOLEAN => new BooleanConverter(f.getName, recordBuilder)
case PrimitiveTypeName.FLOAT => new FloatConverter(f.getName, recordBuilder)
case PrimitiveTypeName.DOUBLE => new DoubleConverter(f.getName, recordBuilder)
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == stringType =>
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == stringType() =>
new StringConverter(f.getName, recordBuilder)
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == jsonType() =>
new JsonConverter(f.getName, recordBuilder)
case PrimitiveTypeName.BINARY =>
new MsgPackConverter(f.getName, recordBuilder)
case _ => ???
Expand Down Expand Up @@ -130,6 +141,7 @@ class ParquetRecordReader[A](

def currentRecord: A = {
val m = recordBuilder.toMap
warn(s"currentRecord: ${m}")
codec.fromMap(m).asInstanceOf[A]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
*/
package wvlet.airframe.parquet

import org.apache.parquet.schema.LogicalTypeAnnotation.stringType
import org.apache.parquet.schema.LogicalTypeAnnotation.{jsonType, stringType}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{LogicalTypeAnnotation, MessageType, PrimitiveType, Type, Types}
import org.apache.parquet.schema.Types.{MapBuilder, PrimitiveBuilder}
import wvlet.airframe.msgpack.spi.MsgPack
import wvlet.airframe.surface.Primitive.PrimitiveSurface
import wvlet.airframe.surface.{
Alias,
ArraySurface,
OptionSurface,
Parameter,
Expand All @@ -30,11 +31,12 @@ import wvlet.airframe.surface.{
Surface
}
import wvlet.airframe.ulid.ULID
import wvlet.log.LogSupport

import java.util.UUID
import scala.jdk.CollectionConverters.*

object ParquetSchema {
object ParquetSchema extends LogSupport {

// Convert surface into primitive
private def toParquetPrimitiveTypeName(s: PrimitiveSurface): PrimitiveTypeName = {
Expand Down Expand Up @@ -77,12 +79,18 @@ object ParquetSchema {
toParquetPrimitive(p, rep)
case o: OptionSurface =>
buildParquetType(o.elementSurface, Some(Repetition.OPTIONAL))
case s: Surface if s == Surface.of[MsgPack] =>
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL))
case s: Surface if s == Surface.of[wvlet.airframe.json.Json] =>
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL)).as(jsonType())
case s: Surface if classOf[wvlet.airframe.msgpack.spi.Value].isAssignableFrom(s.rawType) =>
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL))
case s: Surface if s.isSeq || s.isArray =>
val elementSurface = s.typeArgs(0)
buildParquetType(elementSurface, Some(Repetition.REPEATED))
case m: Surface if m.isMap =>
// Encode Map[_, _] type as Binary and make it optional as Map can be empty
Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL)
// Encode Map[_, _] type as Json and make it optional as Map can be empty
Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).as(jsonType())
// case m: Surface if m.isMap =>
// val keySurface = m.typeArgs(0)
// val valueSurface = m.typeArgs(1)
Expand All @@ -101,8 +109,8 @@ object ParquetSchema {
}
groupType
case s: Surface =>
// Use MsgPack for other types
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL))
// Use JSON for other types
Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL)).as(jsonType())
}
}

Expand All @@ -123,6 +131,8 @@ object ParquetSchema {
Primitive.Boolean
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == stringType() =>
Primitive.String
case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == jsonType() =>
Primitive.String
case _ =>
Surface.of[MsgPack]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
package wvlet.airframe.parquet

import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.parquet.schema.LogicalTypeAnnotation.stringType
import org.apache.parquet.schema.LogicalTypeAnnotation.{jsonType, stringType}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.{MessageType, Type}
import org.apache.parquet.schema.Type.Repetition
import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.codec.{JSONCodec, MessageCodec}
import wvlet.airframe.codec.PrimitiveCodec.{
BooleanCodec,
DoubleCodec,
Expand All @@ -29,6 +29,7 @@ import wvlet.airframe.codec.PrimitiveCodec.{
ValueCodec
}
import wvlet.airframe.msgpack.spi.MsgPack
import wvlet.airframe.msgpack.spi.Value.StringValue
import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport

Expand Down Expand Up @@ -103,12 +104,20 @@ object ParquetWriteCodec extends LogSupport {
recordConsumer.addDouble(DoubleCodec.fromMsgPack(msgpack))
}
}
case PrimitiveTypeName.BINARY if tpe.getLogicalTypeAnnotation == stringType =>
case PrimitiveTypeName.BINARY if tpe.getLogicalTypeAnnotation == stringType() =>
new PrimitiveParquetCodec(codec) {
override protected def writeValue(recordConsumer: RecordConsumer, msgpack: MsgPack): Unit = {
recordConsumer.addBinary(Binary.fromString(StringCodec.fromMsgPack(msgpack)))
}
}
case PrimitiveTypeName.BINARY if tpe.getLogicalTypeAnnotation == jsonType() =>
new PrimitiveParquetCodec(codec) {
override protected def writeValue(recordConsumer: RecordConsumer, msgpack: MsgPack): Unit = {
val json = ValueCodec.fromMsgPack(msgpack).toJson.stripPrefix("\"").stripSuffix("\"")
warn(s"write json: ${json}")
recordConsumer.addBinary(Binary.fromString(json))
}
}
case _ =>
// For the other primitive type values
new PrimitiveParquetCodec(codec) {
Expand Down

0 comments on commit c278123

Please sign in to comment.