From 37f8c47503a9e866adaccb688f53fc56f83f0388 Mon Sep 17 00:00:00 2001 From: Bernhard Date: Thu, 11 Jul 2024 12:13:35 +0200 Subject: [PATCH 1/2] add reverse converter --- .../scala/flatgraph/convert/Convert.scala | 456 ++++++++++++++---- 1 file changed, 349 insertions(+), 107 deletions(-) diff --git a/odb-convert/src/main/scala/flatgraph/convert/Convert.scala b/odb-convert/src/main/scala/flatgraph/convert/Convert.scala index dee17981..28908294 100644 --- a/odb-convert/src/main/scala/flatgraph/convert/Convert.scala +++ b/odb-convert/src/main/scala/flatgraph/convert/Convert.scala @@ -1,8 +1,10 @@ package flatgraph.convert -import flatgraph.{Edge, storage} +import flatgraph.misc.ISeq +import flatgraph.{AccessHelpers, Accessors, Edge, GNode, storage} import flatgraph.storage.{Keys, Manifest, Serialization, StorageType} -import org.msgpack.core.MessagePack +import org.msgpack.core.{MessageBufferPacker, MessagePack} +import overflowdb.Graph import overflowdb.storage.{OdbStorage, ValueTypes} import java.io.{ByteArrayOutputStream, File} @@ -13,27 +15,60 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable object Convert { + val ANON_EDGE_PROPERTY = "EDGE_PROPERTY" def main(args: Array[String]): Unit = { - if (args.length < 2) { - System.err.println("Usage: convert [inputfile] [outputfile]") - System.err.println("Error: missing input and/or output file - exiting.") + if (args.length == 2) { + convertOdbToFlatgraph(overflowDbFile = Paths.get(args(0)), outputFile = Paths.get(args(1))) + } else if (args.length == 3 && args(0) == "-r") { + convertFlatgraphToOdb(fgFile = Paths.get(args(1)), outputFile = Paths.get(args(2))) } else { - apply(overflowDbFile = Paths.get(args(0)), outputFile = Paths.get(args(1))) + System.err.println("Usage: convert [inputfileOdb] [outputfileFlatGraph]") + System.err.println("Usage: convert -r [inputfileFlatGraph] [outputfileOdb]") + System.err.println(s"""Seen: ${args.mkString(",")}""") + System.err.println("Error: missing input and/or output file - exiting.") } } - def apply(overflowDbFile: Path, outputFile: Path): Unit = { + def convertOdbToFlatgraph(overflowDbFile: Path, outputFile: Path): Unit = { val storage = overflowdb.storage.OdbStorage.createWithSpecificLocation(overflowDbFile.toFile, new overflowdb.util.StringInterner) val (nodes, strings) = readOdb(storage) writeData(outputFile.toFile, nodes, strings) } - private class NodeRefTmp(val legacyId: Long) { + def convertFlatgraphToOdb(fgFile: Path, outputFile: Path, debug: Boolean = false): Unit = { + val fg = flatgraph.storage.Deserialization.readGraph(fgFile, None, false) + + val dst = outputFile.toFile + if (dst.exists()) dst.delete() + dst.createNewFile() + val storage = overflowdb.storage.OdbStorage.createWithSpecificLocation(dst, new overflowdb.util.StringInterner) + for (node <- fg.allNodes if !AccessHelpers.isDeleted(node)) { + val bytes = packNode(node, storage) + if (debug) { + val legacyIdToNewId = mutable.HashMap[Long, NodeRefTmp]() + val stringInterner = mutable.LinkedHashMap[String, StringRef]() + val byLabel = mutable.LinkedHashMap[String, NodeStuff]() + try { + readNode(legacyIdToNewId, stringInterner, node.id(), bytes, byLabel, storage) + } catch { + case exc: Throwable => + println(s"fuck ${node.seq()} / ${node.nodeKind} / ${node.label()} / ${node.id()}") + println(exc) + throw exc + } + } + storage.persist(node.id(), packNode(node, storage)) + } + storage.close() + + } + + class NodeRefTmp(val legacyId: Long) { var newId: Long = -1L } - private class StringRef(val idx: Int, val string: String) + class StringRef(val idx: Int, val string: String) private object NodeStuff { val NODEPROPERTY = "p" @@ -43,7 +78,7 @@ object Convert { val legacyId = "legacyId" } - private class NodeStuff(val label: String, val kind: Int) { + class NodeStuff(val label: String, val kind: Int) { var nextId: Int = 0 val quantities = mutable.HashMap[(String, String), mutable.ArrayBuffer[Int]]() val values = mutable.HashMap[(String, String), mutable.ArrayBuffer[Any]]() @@ -172,121 +207,328 @@ object Convert { storage.StorageType.Ref, items.asInstanceOf[mutable.ArrayBuffer[NodeRefTmp]].map { ref => if (ref == null) 0x0000ffffffffffffL else ref.newId }.toArray ) - case Some(other) => throw new AssertionError(s"unexpected item found: $other of type ${other.getClass}") + case Some(other) => throw new AssertionError(s"unexpected item found: other of type ${other.getClass}") } } - private def readOdb(storage: overflowdb.storage.OdbStorage): (Array[NodeStuff], Array[String]) = { - val legacyIdToNewId = mutable.HashMap[Long, NodeRefTmp]() - val stringInterner = mutable.LinkedHashMap[String, StringRef]() - val byLabel = mutable.LinkedHashMap[String, NodeStuff]() - val iter = storage.allNodes().iterator - while (iter.hasNext) { - val e = iter.next() - val legacyId = e.getKey - val bytes = e.getValue - val unpacker = MessagePack.newDefaultUnpacker(bytes) - val legacyId2 = unpacker.unpackLong() - assert(legacyId2 == legacyId) - - val label = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) - val sz = byLabel.size - val nodeStuff = byLabel.getOrElseUpdate(label, new NodeStuff(label, sz)) - val ref = legacyIdToNewId.getOrElseUpdate(legacyId, new NodeRefTmp(legacyId)) - ref.newId = nodeStuff.nextId.toLong + (nodeStuff.kind.toLong << 32) - nodeStuff.nextId += 1 - nodeStuff.addX(NodeStuff.NODEPROPERTY, NodeStuff.legacyId, legacyId) - for (_ <- Range(0, unpacker.unpackMapHeader())) { - val key = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) - for (v <- unpackValue(unpacker.unpackValue().asArrayValue())) { - nodeStuff.addX(NodeStuff.NODEPROPERTY, key, v) - } + def packNode(node: GNode, storage: OdbStorage): Array[Byte] = { + val packer = MessagePack.newDefaultBufferPacker() + val graph = node.graph + val schema = graph.schema + packer.packLong(node.id()) + // println(s"node id: ${node.id()}") + packer.packInt(storage.lookupOrCreateStringToIntMapping(schema.getNodeLabel(node.nodeKind))) + // println(s"label: ${storage.lookupOrCreateStringToIntMapping(schema.getNodeLabel(node.nodeKind))}") + + var nprops = 0 + for (propertyId <- Range(0, schema.getNumberOfPropertyKinds) if Accessors.getNodeProperty(node, propertyId).nonEmpty) { + nprops += 1 + } + packer.packMapHeader(nprops) + // println(s"map header: ${nprops}") + for (propertyId <- Range(0, schema.getNumberOfPropertyKinds)) { + val values = Accessors.getNodeProperty(graph, node.nodeKind, propertyId, node.seq()) + val rawVals = graph.properties(schema.propertyOffsetArrayIndex(node.nodeKind, propertyId) + 1) + packProperty(rawVals, values, packer, storage.lookupOrCreateStringToIntMapping(schema.getPropertyLabel(node.nodeKind, propertyId))) + } + + for (inOut <- List(1, 0)) { + var edgeTypeCount = 0 + for (edgeId <- Range(0, schema.getNumberOfEdgeKinds)) { + val edges = if (inOut == 0) Accessors.getEdgesIn(node, edgeId) else Accessors.getEdgesOut(node, edgeId) + if (edges.nonEmpty) edgeTypeCount += 1 } - for (inout <- List(NodeStuff.NEIGHBOR_OUT, NodeStuff.NEIGHBOR_IN)) { - val edgeTypeCount = unpacker.unpackInt() - for (_ <- Range(0, edgeTypeCount)) { - val key = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) - val edgeCount = unpacker.unpackInt() - for (_ <- Range(0, edgeCount)) { - val adjacentId = unpacker.unpackLong - val adjacentNode = legacyIdToNewId.getOrElseUpdate(adjacentId, new NodeRefTmp(adjacentId)) - val (pkey, pval) = unpacker.unpackMapHeader() match { - case 0 => (null, null) // no property - case 1 => - val pkey = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) - val pvals = unpackValue(unpacker.unpackValue().asArrayValue()) - if (pvals.length == 0) (null, null) - else if (pvals.length == 1) (pkey, pvals.head) - else ??? - case _ => ??? // we only support one property! - } - nodeStuff.addX(inout, key, adjacentNode, pkey, pval) + packer.packInt(edgeTypeCount) + for (edgeId <- Range(0, schema.getNumberOfEdgeKinds)) { + val edges = if (inOut == 0) Accessors.getEdgesIn(node, edgeId) else Accessors.getEdgesOut(node, edgeId) + if (edges.nonEmpty) { + packer.packInt(storage.lookupOrCreateStringToIntMapping(schema.getEdgeLabel(node.nodeKind, edgeId))) + packer.packInt(edges.size) + for (edge <- edges) { + packer.packLong(if (inOut == 0) edge.src.id() else edge.dst.id()) + if (edge.property != null) { + packer.packMapHeader(1) + packer.packInt(storage.lookupOrCreateStringToIntMapping(ANON_EDGE_PROPERTY)) + packer.packArrayHeader(2) + edge.property match { + case v: java.lang.Boolean => + packer.packByte(ValueTypes.BOOLEAN.id) + packer.packBoolean(v) + case v: java.lang.Byte => + packer.packByte(ValueTypes.BYTE.id) + packer.packByte(v) + case v: java.lang.Short => + packer.packByte(ValueTypes.SHORT.id) + packer.packShort(v) + case v: java.lang.Integer => + packer.packByte(ValueTypes.INTEGER.id) + packer.packInt(v) + case v: java.lang.Long => + packer.packByte(ValueTypes.LONG.id) + packer.packLong(v) + case v: java.lang.Float => + packer.packByte(ValueTypes.FLOAT.id) + packer.packFloat(v) + case v: java.lang.Double => + packer.packByte(ValueTypes.DOUBLE.id) + packer.packDouble(v) + case v: java.lang.String => + packer.packByte(ValueTypes.STRING.id) + packer.packString(v) + case v: GNode => + packer.packByte(ValueTypes.NODE_REF.id) + packer.packLong(v.id()) + } + } else packer.packMapHeader(0) } } } } + packer.toByteArray + } - def unpackValue( - valueOrPair: org.msgpack.value.Value, - res: mutable.ArrayBuffer[Any] = mutable.ArrayBuffer[Any](), - typId: Option[ValueTypes] = None - ): mutable.ArrayBuffer[Any] = { - val (typId_, v) = typId match { - case None => - val iter = valueOrPair.asArrayValue().iterator() - (ValueTypes.lookup(iter.next.asIntegerValue.asByte), iter.next) - case Some(t) => (t, valueOrPair) - } - typId_ match { - case ValueTypes.UNKNOWN => res.addOne(null) // this is the encoding for null strings and noderefs. - case ValueTypes.BOOLEAN => res.addOne(v.asBooleanValue().getBoolean) - case ValueTypes.BYTE => res.addOne(v.asIntegerValue.asByte) - case ValueTypes.SHORT => res.addOne(v.asIntegerValue.asShort) - case ValueTypes.INTEGER => res.addOne(v.asIntegerValue.asInt) - case ValueTypes.LONG => res.addOne(v.asIntegerValue.asLong) - case ValueTypes.FLOAT => res.addOne(v.asFloatValue.toFloat) - case ValueTypes.DOUBLE => res.addOne(v.asFloatValue.toDouble) - case ValueTypes.STRING => - val s = v.asStringValue().asString() - res.addOne(stringInterner.getOrElseUpdate(s, new StringRef(stringInterner.size, s))) - case ValueTypes.NODE_REF => - val legacyId = v.asIntegerValue.asLong - res.addOne(legacyIdToNewId.get(legacyId) match { - case null => - val tmp = new NodeRefTmp(legacyId) - legacyIdToNewId.put(legacyId, tmp) - res - case exists => exists - }) - case ValueTypes.LIST | ValueTypes.ARRAY_OBJECT => - val iter = v.asArrayValue().iterator() - while (iter.hasNext) { - unpackValue(iter.next().asArrayValue(), res) + def packProperty(rawVals: Any, values: ISeq[Any], packer: MessageBufferPacker, storageId: Int): Unit = { + if (values.nonEmpty) { + packer.packInt(storageId) + // println(s"storage id: ${storageId}") + packer.packArrayHeader(2) + // println(s"array header: 2") + // println(s""""""") + + rawVals match { + case null => ??? + case _: Array[Boolean] => + val vv = values.asInstanceOf[ISeq[Boolean]] + if (vv.size == 1) { + packer.packByte(ValueTypes.BOOLEAN.id) + packer.packBoolean(vv.head) + } else { + packer.packByte(ValueTypes.ARRAY_BOOL.id) + packer.packArrayHeader(vv.size) + vv.foreach(packer.packBoolean) } - case ValueTypes.ARRAY_BOOL | ValueTypes.ARRAY_BYTE | ValueTypes.ARRAY_SHORT | ValueTypes.ARRAY_INT | ValueTypes.ARRAY_LONG | - ValueTypes.ARRAY_FLOAT | ValueTypes.ARRAY_DOUBLE => - val elementType = typId_ match { - case ValueTypes.ARRAY_BOOL => ValueTypes.BOOLEAN - case ValueTypes.ARRAY_BYTE => ValueTypes.BYTE - case ValueTypes.ARRAY_SHORT => ValueTypes.SHORT - case ValueTypes.ARRAY_INT => ValueTypes.INTEGER - case ValueTypes.ARRAY_LONG => ValueTypes.LONG - case ValueTypes.ARRAY_FLOAT => ValueTypes.FLOAT - case ValueTypes.ARRAY_DOUBLE => ValueTypes.DOUBLE - case _ => ??? + case _: Array[Byte] => + val vv = values.asInstanceOf[ISeq[Byte]] + if (vv.size == 1) { + packer.packByte(ValueTypes.BYTE.id) + packer.packByte(vv.head) + } else { + packer.packByte(ValueTypes.ARRAY_BYTE.id) + packer.packArrayHeader(vv.size) + vv.foreach(packer.packByte) } - val iter = v.asArrayValue().iterator() - while (iter.hasNext) { - unpackValue(iter.next().asArrayValue(), res, Some(elementType)) + case _: Array[Short] => + val vv = values.asInstanceOf[ISeq[Short]] + if (vv.size == 1) { + packer.packByte(ValueTypes.SHORT.id) + packer.packShort(vv.head) + } else { + packer.packByte(ValueTypes.ARRAY_SHORT.id) + packer.packArrayHeader(vv.size) + vv.foreach(packer.packShort) } + case _: Array[Int] => + val vv = values.asInstanceOf[ISeq[Int]] + if (vv.size == 1) { + packer.packByte(ValueTypes.INTEGER.id) + packer.packInt(vv.head) + } else { + packer.packByte(ValueTypes.ARRAY_INT.id) + packer.packArrayHeader(vv.size) + vv.foreach(packer.packInt) + } + case _: Array[Long] => + val vv = values.asInstanceOf[ISeq[Long]] + if (vv.size == 1) { + packer.packByte(ValueTypes.LONG.id) + packer.packLong(vv.head) + } else { + packer.packByte(ValueTypes.ARRAY_LONG.id) + packer.packArrayHeader(vv.size) + vv.foreach(packer.packLong) + } + case _: Array[Float] => + val vv = values.asInstanceOf[ISeq[Float]] + if (vv.size == 1) { + packer.packByte(ValueTypes.FLOAT.id) + packer.packFloat(vv.head) + } else { + packer.packByte(ValueTypes.ARRAY_FLOAT.id) + packer.packArrayHeader(vv.size) + vv.foreach(packer.packFloat) + } + case _: Array[Double] => + val vv = values.asInstanceOf[ISeq[Double]] + if (vv.size == 1) { + packer.packByte(ValueTypes.DOUBLE.id) + packer.packDouble(vv.head) + } else { + packer.packByte(ValueTypes.ARRAY_DOUBLE.id) + packer.packArrayHeader(vv.size) + vv.foreach(packer.packDouble) + } + case _: Array[String] | _: Array[GNode] => + if (values.size == 1) packTypedRefValue(packer, values.head, true) + else { + packer.packByte(ValueTypes.ARRAY_OBJECT.id) + packer.packArrayHeader(values.size) + values.foreach(packTypedRefValue(packer, _, false)) + } + + } - case _ => ??? + } + } + + def packTypedRefValue(packer: MessageBufferPacker, v: Any, skipHeader: Boolean): Unit = { + if (!skipHeader) packer.packArrayHeader(2) + v match { + case null => + packer.packByte(ValueTypes.UNKNOWN.id) + packer.packNil() + case str: String => + packer.packByte(ValueTypes.STRING.id) + packer.packString(str) + case gn: GNode => + packer.packByte(ValueTypes.NODE_REF.id) + packer.packLong(gn.id()) + } + } + + def readNode( + legacyIdToNewId: mutable.HashMap[Long, NodeRefTmp], + stringInterner: mutable.LinkedHashMap[String, StringRef], + legacyId: Long, + bytes: Array[Byte], + byLabel: mutable.LinkedHashMap[String, NodeStuff], + storage: OdbStorage + ): Unit = { + val unpacker = MessagePack.newDefaultUnpacker(bytes) + val legacyId2 = unpacker.unpackLong() +// println(s"unpack node id: ${legacyId2}") + assert(legacyId2 == legacyId) + + val label = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) + // println(s"unpack node type: ${label}") + + val sz = byLabel.size + val nodeStuff = byLabel.getOrElseUpdate(label, new NodeStuff(label, sz)) + val ref = legacyIdToNewId.getOrElseUpdate(legacyId, new NodeRefTmp(legacyId)) + ref.newId = nodeStuff.nextId.toLong + (nodeStuff.kind.toLong << 32) + nodeStuff.nextId += 1 + // nodeStuff.addX(NodeStuff.NODEPROPERTY, NodeStuff.legacyId, legacyId) + val nprops = unpacker.unpackMapHeader() + // println(s"unpack map header: ${nprops}") + for (_ <- Range(0, nprops)) { + val key = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) + // println(s"unpack property: ${key}") + for (v <- unpackValue(legacyIdToNewId, stringInterner, unpacker.unpackValue().asArrayValue())) { + nodeStuff.addX(NodeStuff.NODEPROPERTY, key, v) } + } + for (inout <- List(NodeStuff.NEIGHBOR_OUT, NodeStuff.NEIGHBOR_IN)) { + val edgeTypeCount = unpacker.unpackInt() + for (_ <- Range(0, edgeTypeCount)) { + val key = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) + val edgeCount = unpacker.unpackInt() + for (_ <- Range(0, edgeCount)) { + val adjacentId = unpacker.unpackLong + val adjacentNode = legacyIdToNewId.getOrElseUpdate(adjacentId, new NodeRefTmp(adjacentId)) + val (pkey, pval) = unpacker.unpackMapHeader() match { + case 0 => (null, null) // no property + case 1 => + val pkey = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) + val pvals = unpackValue(legacyIdToNewId, stringInterner, unpacker.unpackValue().asArrayValue()) + if (pvals.length == 0) (null, null) + else if (pvals.length == 1) (pkey, pvals.head) + else ??? + case _ => ??? // we only support one property! + } + nodeStuff.addX(inout, key, adjacentNode, pkey, pval) + } + } + } + + } - res + private def readOdb(storage: overflowdb.storage.OdbStorage): (Array[NodeStuff], Array[String]) = { + val legacyIdToNewId = mutable.HashMap[Long, NodeRefTmp]() + val stringInterner = mutable.LinkedHashMap[String, StringRef]() + val byLabel = mutable.LinkedHashMap[String, NodeStuff]() + val iter = storage.allNodes().iterator + while (iter.hasNext) { + val e = iter.next() + val legacyId = e.getKey + val bytes = e.getValue + readNode(legacyIdToNewId, stringInterner, legacyId, bytes, byLabel, storage) } + byLabel.valuesIterator.foreach { _.pad() } (byLabel.valuesIterator.toArray, stringInterner.keysIterator.toArray) } + def unpackValue( + legacyIdToNewId: mutable.HashMap[Long, NodeRefTmp], + stringInterner: mutable.LinkedHashMap[String, StringRef], + valueOrPair: org.msgpack.value.Value, + res: mutable.ArrayBuffer[Any] = mutable.ArrayBuffer[Any](), + typId: Option[ValueTypes] = None + ): mutable.ArrayBuffer[Any] = { + val (typId_, v) = typId match { + case None => + val iter = valueOrPair.asArrayValue().iterator() + val first = iter.next.asIntegerValue().asByte() + val remainder = iter.next() + (ValueTypes.lookup(first), remainder) + case Some(t) => (t, valueOrPair) + } + typId_ match { + case ValueTypes.UNKNOWN => res.addOne(null) // this is the encoding for null strings and noderefs. + case ValueTypes.BOOLEAN => res.addOne(v.asBooleanValue().getBoolean) + case ValueTypes.BYTE => res.addOne(v.asIntegerValue.asByte) + case ValueTypes.SHORT => res.addOne(v.asIntegerValue.asShort) + case ValueTypes.INTEGER => res.addOne(v.asIntegerValue.asInt) + case ValueTypes.LONG => res.addOne(v.asIntegerValue.asLong) + case ValueTypes.FLOAT => res.addOne(v.asFloatValue.toFloat) + case ValueTypes.DOUBLE => res.addOne(v.asFloatValue.toDouble) + case ValueTypes.STRING => + val s = v.asStringValue().asString() + res.addOne(stringInterner.getOrElseUpdate(s, new StringRef(stringInterner.size, s))) + case ValueTypes.NODE_REF => + val legacyId = v.asIntegerValue.asLong + res.addOne(legacyIdToNewId.get(legacyId) match { + case None => + val tmp = new NodeRefTmp(legacyId) + legacyIdToNewId.put(legacyId, tmp) + tmp + case Some(exists) => exists + }) + case ValueTypes.LIST | ValueTypes.ARRAY_OBJECT => + val iter = v.asArrayValue().iterator() + while (iter.hasNext) { + unpackValue(legacyIdToNewId, stringInterner, iter.next().asArrayValue(), res) + } + case ValueTypes.ARRAY_BOOL | ValueTypes.ARRAY_BYTE | ValueTypes.ARRAY_SHORT | ValueTypes.ARRAY_INT | ValueTypes.ARRAY_LONG | + ValueTypes.ARRAY_FLOAT | ValueTypes.ARRAY_DOUBLE => + val elementType = typId_ match { + case ValueTypes.ARRAY_BOOL => ValueTypes.BOOLEAN + case ValueTypes.ARRAY_BYTE => ValueTypes.BYTE + case ValueTypes.ARRAY_SHORT => ValueTypes.SHORT + case ValueTypes.ARRAY_INT => ValueTypes.INTEGER + case ValueTypes.ARRAY_LONG => ValueTypes.LONG + case ValueTypes.ARRAY_FLOAT => ValueTypes.FLOAT + case ValueTypes.ARRAY_DOUBLE => ValueTypes.DOUBLE + case _ => ??? + } + val iter = v.asArrayValue().iterator() + while (iter.hasNext) { + unpackValue(legacyIdToNewId, stringInterner, iter.next().asArrayValue(), res, Some(elementType)) + } + + case _ => ??? + } + + res + } + } From d8ffc17d523a08eba645863202e916d1c4ba5e07 Mon Sep 17 00:00:00 2001 From: Bernhard Date: Thu, 11 Jul 2024 13:58:37 +0200 Subject: [PATCH 2/2] remove swear words, add verbose/debug option --- .../flatgraph/storage/Deserialization.scala | 8 +-- .../scala/flatgraph/convert/Convert.scala | 64 ++++++++++--------- 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/flatgraph/storage/Deserialization.scala b/core/src/main/scala/flatgraph/storage/Deserialization.scala index d84131b4..25b9c045 100644 --- a/core/src/main/scala/flatgraph/storage/Deserialization.scala +++ b/core/src/main/scala/flatgraph/storage/Deserialization.scala @@ -17,7 +17,7 @@ object Deserialization { val fileChannel = new java.io.RandomAccessFile(storagePath.toAbsolutePath.toFile, "r").getChannel try { // fixme: Use convenience methods from schema to translate string->id. Fix after we get strict schema checking. - val manifest = readManifest(fileChannel) + val manifest = GraphItem.read(readManifest(fileChannel)) val pool = readPool(manifest, fileChannel) val schema = schemaMaybe.getOrElse(freeSchemaFromManifest(manifest)) val storagePathMaybe = @@ -140,7 +140,7 @@ object Deserialization { } } - private def readManifest(channel: FileChannel): GraphItem = { + def readManifest(channel: FileChannel): ujson.Value = { if (channel.size() < HeaderSize) throw new DeserializationException(s"corrupt file, expected at least $HeaderSize bytes, but only found ${channel.size()}") @@ -162,8 +162,8 @@ object Deserialization { readBytes += channel.read(manifestBytes, readBytes + manifestOffset) } manifestBytes.flip() - val jsonObj = ujson.read(manifestBytes) - GraphItem.read(jsonObj) + ujson.read(manifestBytes) + } private def readPool(manifest: GraphItem, fileChannel: FileChannel): Array[String] = { diff --git a/odb-convert/src/main/scala/flatgraph/convert/Convert.scala b/odb-convert/src/main/scala/flatgraph/convert/Convert.scala index 28908294..99a9708f 100644 --- a/odb-convert/src/main/scala/flatgraph/convert/Convert.scala +++ b/odb-convert/src/main/scala/flatgraph/convert/Convert.scala @@ -13,30 +13,48 @@ import java.nio.file.{Path, Paths} import java.nio.{ByteBuffer, ByteOrder} import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable +import scala.util.Using object Convert { val ANON_EDGE_PROPERTY = "EDGE_PROPERTY" def main(args: Array[String]): Unit = { - if (args.length == 2) { - convertOdbToFlatgraph(overflowDbFile = Paths.get(args(0)), outputFile = Paths.get(args(1))) - } else if (args.length == 3 && args(0) == "-r") { - convertFlatgraphToOdb(fgFile = Paths.get(args(1)), outputFile = Paths.get(args(2))) - } else { - System.err.println("Usage: convert [inputfileOdb] [outputfileFlatGraph]") - System.err.println("Usage: convert -r [inputfileFlatGraph] [outputfileOdb]") - System.err.println(s"""Seen: ${args.mkString(",")}""") + var dstFile: Path = null + var srcFile: Path = null + var reverse = false + var verbose = false + var tooMany = false + for (arg <- args) { + if (arg == "-r") reverse = true + else if (arg == "--verbose") verbose = true + else if (srcFile == null) srcFile = Paths.get(arg) + else if (dstFile == null) dstFile = Paths.get(arg) + else tooMany = true + } + if (tooMany || dstFile == null || srcFile == null) { + System.err.println("Usage: convert [--verbose] [inputfileOdb] [outputfileFlatGraph]") + System.err.println("Usage: convert -r [--verbose] [inputfileFlatGraph] [outputfileOdb]") System.err.println("Error: missing input and/or output file - exiting.") + } else if (reverse) { + convertFlatgraphToOdb(srcFile, dstFile, verbose = verbose, debug = true) + } else { + convertOdbToFlatgraph(srcFile, dstFile, verbose = verbose) } } - def convertOdbToFlatgraph(overflowDbFile: Path, outputFile: Path): Unit = { + def convertOdbToFlatgraph(overflowDbFile: Path, outputFile: Path, verbose: Boolean = false): Unit = { val storage = overflowdb.storage.OdbStorage.createWithSpecificLocation(overflowDbFile.toFile, new overflowdb.util.StringInterner) val (nodes, strings) = readOdb(storage) - writeData(outputFile.toFile, nodes, strings) + writeData(outputFile.toFile, nodes, strings, verbose = verbose) } - def convertFlatgraphToOdb(fgFile: Path, outputFile: Path, debug: Boolean = false): Unit = { + def convertFlatgraphToOdb(fgFile: Path, outputFile: Path, debug: Boolean = false, verbose: Boolean = false): Unit = { + if (verbose) { + Using(new java.io.RandomAccessFile(fgFile.toAbsolutePath.toFile, "r").getChannel) { channel => + val manifest = flatgraph.storage.Deserialization.readManifest(channel) + println(manifest.render(indent = 2)) + } + } val fg = flatgraph.storage.Deserialization.readGraph(fgFile, None, false) val dst = outputFile.toFile @@ -53,7 +71,7 @@ object Convert { readNode(legacyIdToNewId, stringInterner, node.id(), bytes, byLabel, storage) } catch { case exc: Throwable => - println(s"fuck ${node.seq()} / ${node.nodeKind} / ${node.label()} / ${node.id()}") + println(s"Inconsistency encountered ${node.seq()} / ${node.nodeKind} / ${node.label()} / ${node.id()}") println(exc) throw exc } @@ -61,7 +79,6 @@ object Convert { storage.persist(node.id(), packNode(node, storage)) } storage.close() - } class NodeRefTmp(val legacyId: Long) { @@ -107,7 +124,7 @@ object Convert { } } - private def writeData(filename: File, nodeStuff: Array[NodeStuff], strings: Array[String]): Unit = { + private def writeData(filename: File, nodeStuff: Array[NodeStuff], strings: Array[String], verbose: Boolean = false): Unit = { val fileAbsolute = filename.getAbsoluteFile val filePtr = new AtomicLong(16) if (!fileAbsolute.exists()) { @@ -175,15 +192,14 @@ object Convert { } val manifest = new Manifest.GraphItem(nodes.toArray, edges.toArray, properties.toArray, poolLensStored, poolBytesStored) val manifestObj = Manifest.GraphItem.write(manifest) - val buf = ByteBuffer.wrap(manifestObj.render().getBytes(StandardCharsets.UTF_8)) + if (verbose) { + println(manifestObj.render(indent = 2)) + } + val buf = ByteBuffer.wrap(manifestObj.render().getBytes(StandardCharsets.UTF_8)) while (buf.hasRemaining()) { pos += fileChannel.write(buf, pos) } fileChannel.truncate(pos) - - // tmp debug - // println(manifestObj.render(indent = 4)) - } finally { fileChannel.close() } } @@ -216,16 +232,13 @@ object Convert { val graph = node.graph val schema = graph.schema packer.packLong(node.id()) - // println(s"node id: ${node.id()}") packer.packInt(storage.lookupOrCreateStringToIntMapping(schema.getNodeLabel(node.nodeKind))) - // println(s"label: ${storage.lookupOrCreateStringToIntMapping(schema.getNodeLabel(node.nodeKind))}") var nprops = 0 for (propertyId <- Range(0, schema.getNumberOfPropertyKinds) if Accessors.getNodeProperty(node, propertyId).nonEmpty) { nprops += 1 } packer.packMapHeader(nprops) - // println(s"map header: ${nprops}") for (propertyId <- Range(0, schema.getNumberOfPropertyKinds)) { val values = Accessors.getNodeProperty(graph, node.nodeKind, propertyId, node.seq()) val rawVals = graph.properties(schema.propertyOffsetArrayIndex(node.nodeKind, propertyId) + 1) @@ -290,10 +303,7 @@ object Convert { def packProperty(rawVals: Any, values: ISeq[Any], packer: MessageBufferPacker, storageId: Int): Unit = { if (values.nonEmpty) { packer.packInt(storageId) - // println(s"storage id: ${storageId}") packer.packArrayHeader(2) - // println(s"array header: 2") - // println(s""""""") rawVals match { case null => ??? @@ -405,11 +415,9 @@ object Convert { ): Unit = { val unpacker = MessagePack.newDefaultUnpacker(bytes) val legacyId2 = unpacker.unpackLong() -// println(s"unpack node id: ${legacyId2}") assert(legacyId2 == legacyId) val label = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) - // println(s"unpack node type: ${label}") val sz = byLabel.size val nodeStuff = byLabel.getOrElseUpdate(label, new NodeStuff(label, sz)) @@ -418,10 +426,8 @@ object Convert { nodeStuff.nextId += 1 // nodeStuff.addX(NodeStuff.NODEPROPERTY, NodeStuff.legacyId, legacyId) val nprops = unpacker.unpackMapHeader() - // println(s"unpack map header: ${nprops}") for (_ <- Range(0, nprops)) { val key = storage.reverseLookupStringToIntMapping(unpacker.unpackInt()) - // println(s"unpack property: ${key}") for (v <- unpackValue(legacyIdToNewId, stringInterner, unpacker.unpackValue().asArrayValue())) { nodeStuff.addX(NodeStuff.NODEPROPERTY, key, v) }