Skip to content

Commit

Permalink
Got rid of base64 encoding/decoding in ElastiKnnVectorFieldMapper (#37)
Browse files Browse the repository at this point in the history
Finally figured out how to store the ElastiKnnVectors without encoding them as base64 strings. The trick was to only read up to the given length, even though the given byte array might be longer.

This brings the Kosarak benchmark time (without caching) down to 52 seconds on my laptop. With caching is just over 30 seconds.
  • Loading branch information
alexklibisz authored Feb 15, 2020
1 parent fed9110 commit 9499527
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 47 deletions.
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Got rid of base64 encoding/decoding in ElastiKnnVectorFieldMapper. This improves ann-benchmarks performance by about 20%.
---
- Improved exact Jaccard performance by implementing a critical path in Java so that it uses primitive `int []` arrays instead of boxed integers in scala.
---
- Fixed performance regression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.apache.lucene.index._
import org.apache.lucene.search.{DocValuesFieldExistsQuery, Query, SortField}
import org.apache.lucene.util.BytesRef
import org.elasticsearch.common.settings.Settings
import com.klibisz.elastiknn.SparseBoolVector
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource
import org.elasticsearch.index.fielddata.plain.DocValuesIndexFieldData
import org.elasticsearch.index.mapper.FieldMapper.CopyTo
Expand All @@ -23,7 +22,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService
import org.elasticsearch.search.MultiValueMode
import scalapb_circe.JsonFormat

import scala.util.{Random, Success, Try, Failure}
import scala.util.{Failure, Success, Try}

/**
* Custom "elastiknn_vector" type which stores ElastiKnnVectors without modifying their order.
Expand Down Expand Up @@ -102,49 +101,47 @@ object ElastiKnnVectorFieldMapper {

private lazy val bdv: BinaryDocValues = DocValues.getBinary(reader, field)

final def getElastiKnnVector(docId: Int): Try[ElastiKnnVector] =
if (bdv.advanceExact(docId))
Success(ElastiKnnVector.parseBase64(bdv.binaryValue.utf8ToString))
else Failure(new IllegalStateException(s"Couldn't parse a valid ElastiKnnVector for document id: $docId"))
final def getElastiKnnVector(docId: Int): Try[ElastiKnnVector] = parseFromBinaryDocValues(bdv, docId)

override def getScriptValues: fielddata.ScriptDocValues[_] =
new ScriptDocValues(bdv)
override def getScriptValues: fielddata.ScriptDocValues[_] = new ScriptDocValues(bdv)

override def getBytesValues: fielddata.SortedBinaryDocValues =
throw new UnsupportedOperationException("String representation of doc values for elastiknn_vector fields is not supported")
throw new UnsupportedOperationException(s"String representation of doc values for $CONTENT_TYPE fields is not supported")

override def ramBytesUsed(): Long = 0L

override def close(): Unit = ()
}

def parseFromBinaryDocValues(binaryDocValues: BinaryDocValues, docId: Int): Try[ElastiKnnVector] = {
if (binaryDocValues.advanceExact(docId)) {
// Sometimes the bv.bytes array is longer than bv.length, so you have to call .take.
// It seems like the same array gets re-used between calls to the same shard.
val bv = binaryDocValues.binaryValue
Try(ElastiKnnVector.parseFrom(bv.bytes.take(bv.length)))
} else Failure(new IllegalStateException(s"Couldn't parse a valid $CONTENT_TYPE for document id: $docId"))
}

}

class ScriptDocValues(bdv: BinaryDocValues) extends fielddata.ScriptDocValues[Any] {

// The vector gets initialized _after_ the class is initialized (in the setNextDocId method).
// These vars will get defined in that method, and they're different for float and sparse bool vectors.
private var stored: Option[ElastiKnnVector.Vector] = None
// These vars will get defined when the vector is parsed; they're different for float and sparse bool vectors.
private var storedGet: Int => Any = identity[Int]
private var storedSize: Int = 0

// TODO: deduplicate this advanceExact and parsing logic with the logic in AtomicFieldData above.
override def setNextDocId(docId: Int): Unit =
if (bdv.advanceExact(docId)) {
// Spent a while figuring out if there's a way to decode this without converting to a string. Quite confused by
// whatever is happening in Lucene that modifies the bytes when they're stored.
stored = Some(ElastiKnnVector.parseBase64(bdv.binaryValue.utf8ToString).vector)
stored match {
case Some(ElastiKnnVector.Vector.FloatVector(v)) =>
storedGet = (i: Int) => v.values(i)
storedSize = v.values.length
case Some(ElastiKnnVector.Vector.SparseBoolVector(v)) =>
// Bit of a quirk, but calling .get(-1) will return the total number of indices.
storedGet = (i: Int) => if (i < 0) v.totalIndices else v.trueIndices(i)
storedSize = v.trueIndices.length
case _ => throw new IllegalStateException(s"Couldn't parse a valid vector, found: $stored")
}
} else None
ElastiKnnVectorFieldMapper.parseFromBinaryDocValues(bdv, docId).map(_.vector) match {
case Success(ElastiKnnVector.Vector.FloatVector(v)) =>
storedGet = (i: Int) => v.values(i)
storedSize = v.values.length
case Success(ElastiKnnVector.Vector.SparseBoolVector(v)) =>
// Bit of a quirk, but calling .get(-1) will return the total number of indices.
storedGet = (i: Int) => if (i < 0) v.totalIndices else v.trueIndices(i)
storedSize = v.trueIndices.length
case Success(ElastiKnnVector.Vector.Empty) => throw new IllegalStateException(s"Parsed an empty vector for docId $docId")
case Failure(t) => throw t
}

override def get(i: Int): Any = storedGet(i)

Expand All @@ -168,16 +165,12 @@ class ElastiKnnVectorFieldMapper(simpleName: String,
ElastiKnnVector(ElastiKnnVector.Vector.SparseBoolVector(sbv.sorted()))
case other => other
}
// IMPORTANT: for some reason if you just use the regular protobuf bytes (not base64) then you get an error like:
// "protocol message contained an invalid tag (zero)" when decoding. using base64 encoding fixes this.
val field = new BinaryDocValuesField(fieldType.name, new BytesRef(ekv.toBase64))
val field = new BinaryDocValuesField(fieldType.name, new BytesRef(ekv.toByteArray))
context.doc.addWithKey(fieldType.name, field)
}

override def parseCreateField(context: ParseContext, fields: util.List[IndexableField]): Unit =
throw new IllegalStateException("parse is implemented directly")

override def contentType(): String = {
ElastiKnnVectorFieldMapper.CONTENT_TYPE
}
override def contentType(): String = ElastiKnnVectorFieldMapper.CONTENT_TYPE
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ class ElastiKnnVectorFieldMapperSuite

implicit val rng: Random = new Random(0)

test("create a mapping with type elastiknn_vector") {
val indexName = "test-create-ekv-mapping"
for {
_ <- client.execute(deleteIndex(indexName))

createRes: Response[CreateIndexResponse] <- client.execute(createIndex(indexName))
_ = createRes.shouldBeSuccess

mappingRes <- client.execute(putMapping(Indexes(indexName)).fields(field))
_ <- mappingRes.shouldBeSuccess
} yield Succeeded
}
// test("create a mapping with type elastiknn_vector") {
// val indexName = "test-create-ekv-mapping"
// for {
// _ <- client.execute(deleteIndex(indexName))
//
// createRes: Response[CreateIndexResponse] <- client.execute(createIndex(indexName))
// _ = createRes.shouldBeSuccess
//
// mappingRes <- client.execute(putMapping(Indexes(indexName)).fields(field))
// _ <- mappingRes.shouldBeSuccess
// } yield Succeeded
// }

def index(ekvs: Seq[ElastiKnnVector], indexName: String): Future[Unit] =
for {
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0-PRE6
0.1.0-PRE7

0 comments on commit 9499527

Please sign in to comment.