Skip to content

Commit

Permalink
serializers
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Dec 2, 2024
1 parent 49a97a8 commit 8abbb50
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cache.common.query;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
Expand All @@ -30,41 +32,45 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.RoaringDocIdSet;
import org.opensearch.cache.common.tier.TieredSpilloverCache;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.service.CacheService;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.indices.IRCKeyWriteableSerializer;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.OpenSearchQueryCache;
import org.opensearch.search.aggregations.bucket.composite.CompositeKey;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Predicate;
import java.util.function.ToLongBiFunction;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;


// TODO: This is a proof of concept only! This is not an elegant or clean (or even really functional) implementation.

Expand Down Expand Up @@ -136,6 +142,8 @@ public class TieredQueryCache implements QueryCache, OpenSearchQueryCache {

private final RemovalListener<ICacheKey<CompositeKey>, CacheAndCount> removalListener;

private static final Logger logger = LogManager.getLogger(TieredQueryCache.class);

// Is there any need for locks? The underlying TSC is threadsafe. I think the need for locks in original was due to LeafCache impl.


Expand Down Expand Up @@ -303,7 +311,7 @@ public void collect(int doc) throws IOException {
}
},
null);
return new CacheAndCount(new BitDocIdSet(bitSet, count[0]), count[0]);
return new CacheAndCount(new BitDocIdSet(bitSet, count[0]), count[0], maxDoc);
}

private static CacheAndCount cacheIntoRoaringDocIdSet(BulkScorer scorer, int maxDoc)
Expand All @@ -322,7 +330,7 @@ public void collect(int doc) throws IOException {
},
null);
RoaringDocIdSet cache = builder.build();
return new CacheAndCount(cache, cache.cardinality());
return new CacheAndCount(cache, cache.cardinality(), maxDoc);
}

Predicate<CompositeKey> createTSCPolicy(CompositeKeySerializer serializer) {
Expand Down Expand Up @@ -606,12 +614,12 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
}
}

class CompositeKey implements Accountable {
final int leafCacheid;
static class CompositeKey implements Accountable {
final int leafCacheId;
final Query query;

CompositeKey(int leafCacheid, Query query) {
this.leafCacheid = leafCacheid;
this.leafCacheId = leafCacheid;
this.query = query;
}

Expand All @@ -620,7 +628,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CompositeKey key = (CompositeKey) o;
if (leafCacheid != key.leafCacheid) return false;
if (leafCacheId != key.leafCacheId) return false;
if (!query.equals(key.query)) return false; // TODO: Is it a valid assumption that queries correctly implement equals()?
return true;
}
Expand Down Expand Up @@ -658,16 +666,18 @@ ICacheKey<CompositeKey> getFinalKey(Query query) {

// Duplicated from LRUQC with no changes
protected static class CacheAndCount implements Accountable {
protected static final CacheAndCount EMPTY = new CacheAndCount(DocIdSet.EMPTY, 0);
protected static final CacheAndCount EMPTY = new CacheAndCount(DocIdSet.EMPTY, 0, 0);

private static final long BASE_RAM_BYTES_USED =
RamUsageEstimator.shallowSizeOfInstance(CacheAndCount.class);
private final DocIdSet cache;
private final int count;
private final int maxDoc; // TODO: this value is needed for serialization here, but wasn't needed in LRUQC.

public CacheAndCount(DocIdSet cache, int count) {
public CacheAndCount(DocIdSet cache, int count, int maxDoc) {
this.cache = cache;
this.count = count;
this.maxDoc = maxDoc;
}

public DocIdSetIterator iterator() throws IOException {
Expand All @@ -682,6 +692,35 @@ public int count() {
public long ramBytesUsed() {
return BASE_RAM_BYTES_USED + cache.ramBytesUsed();
}

@Override
public boolean equals(Object o) {
// TODO - looks like no good equals() for doc id sets. So this is probably just good to use in tests - its slow!
if (o == null) return false;
if (o.getClass() != CacheAndCount.class) return false;
CacheAndCount other = (CacheAndCount) o;
if (this.count != other.count || this.maxDoc != other.maxDoc) return false;
// TODO: check doc ids
List<Integer> thisDocIds = new ArrayList<>();
List<Integer> otherDocIds = new ArrayList<>();
try {
DocIdSetIterator iterator = this.cache.iterator();
int nextDoc = -1;
while (nextDoc != NO_MORE_DOCS) {
nextDoc = iterator.nextDoc();
thisDocIds.add(nextDoc);
}
DocIdSetIterator otherIterator = other.cache.iterator();
nextDoc = -1;
while (nextDoc != NO_MORE_DOCS) {
nextDoc = otherIterator.nextDoc();
otherDocIds.add(nextDoc);
}
} catch (IOException e) {
throw new OpenSearchException("Error iterating through doc id set: ", e);
}
return thisDocIds.equals(otherDocIds);
}
}

class TSCRemovalListener implements RemovalListener<ICacheKey<CompositeKey>, CacheAndCount> {
Expand All @@ -693,7 +732,9 @@ public void onRemoval(RemovalNotification<ICacheKey<CompositeKey>, CacheAndCount

// Various serializers below

class CompositeKeySerializer implements Serializer<CompositeKey, byte[]> {
// TODO: for this PoC just use the opensearch serializing functions. If they are slow, can address later

static class CompositeKeySerializer implements Serializer<CompositeKey, byte[]> {
final QuerySerializer serializer;

CompositeKeySerializer(QuerySerializer serializer) {
Expand All @@ -703,26 +744,48 @@ class CompositeKeySerializer implements Serializer<CompositeKey, byte[]> {
// TODO below
@Override
public byte[] serialize(CompositeKey object) {
return new byte[0];
if (object == null) return null;
try {
byte[] serializedQuery = serializer.serialize(object.query);
BytesStreamOutput os = new BytesStreamOutput();
os.writeVInt(object.leafCacheId);
os.writeVInt(serializedQuery.length); // TODO: as in ICacheKeySerializer there seems to be an issue with writeBytes
os.writeBytes(serializedQuery);
return BytesReference.toBytes(os.bytes());
} catch (IOException e) {
logger.debug("Could not write CompositeKey to byte[]");
throw new OpenSearchException(e);
}
}

@Override
public CompositeKey deserialize(byte[] bytes) {
return null;
if (bytes == null) return null;
try {
BytesStreamInput is = new BytesStreamInput(bytes, 0, bytes.length);
int id = is.readVInt();
int length = is.readVInt();
byte[] serializedQuery = new byte[length];
is.readBytes(serializedQuery, 0, length);
return new CompositeKey(id, serializer.deserialize(serializedQuery));
} catch (IOException e) {
logger.debug("Could not read CompositeKey from byte[]");
throw new OpenSearchException(e);
}
}

@Override
public boolean equals(CompositeKey object, byte[] bytes) {
return false;
return Arrays.equals(serialize(object), bytes);
}

public boolean isAllowed(CompositeKey key) {
// TODO - report yes for serializable, no for not serializable. Feed this into a policy into the TSC to control disk tier access.
// report yes for serializable, no for not serializable. Feed this into a policy into the TSC to control disk tier access.
return serializer.isAllowed(key.query);
}
}

class QuerySerializer implements Serializer<Query, byte[]> {
static class QuerySerializer implements Serializer<Query, byte[]> {
// TODO - will have to basically do one query type at a time. Not TermQuery ever lol.

@Override
Expand All @@ -746,26 +809,111 @@ public boolean isAllowed(Query query) {
}
}

class CacheAndCountSerializer implements Serializer<CacheAndCount, byte[]> {
// TODO - this object looks p serializable, but not 100% sure
// TODO: I kind of suspect the de/serialization of DocIdSet is gonna be slow, and it may be so slow it doesn't make sense to do it at all.

static class CacheAndCountSerializer implements Serializer<CacheAndCount, byte[]> {
// Theres like ... 8 different impls of DocIdSet. But I think we only use 2 possible ones in the cache. Hopefully can just do that.
// BitDocIdSet and RoaringDocIdSet
// Could possibly abuse the bits() interface DocIdSet exposes for this.
// Nope RoaringDocIdSet doesn't implement this. Maybe the iterator then.

static final byte BIT_DOC_ID_SET_BYTE = 0x01;
static final byte ROARING_DOC_ID_SET_BYTE = 0x02;

@Override
public byte[] serialize(CacheAndCount object) {
return null;
if (object == null) return null;
try {
BytesStreamOutput os = new BytesStreamOutput();
os.writeVInt(object.count);
os.writeVInt(object.maxDoc);
serializeDocIdSet(object.cache, os);
return BytesReference.toBytes(os.bytes());
} catch (IOException e) {
logger.debug("Could not write CacheAndCount to byte[]");
throw new OpenSearchException(e);
}
}

@Override
public CacheAndCount deserialize(byte[] bytes) {
return null;
if (bytes == null) return null;
try {
BytesStreamInput is = new BytesStreamInput(bytes, 0, bytes.length);
int count = is.readVInt();
int maxDoc = is.readVInt();
DocIdSet cache = deserializeDocIdSet(is, maxDoc);
return new CacheAndCount(cache, count, maxDoc);
} catch (IOException e) {
logger.debug("Could not read CacheAndCount from byte[]");
throw new OpenSearchException(e);
}
}

@Override
public boolean equals(CacheAndCount object, byte[] bytes) {
return false;
// TODO: Is this ok for our purposes? Should we instead compare the underlying doc ids, which is more forgiving?
return Arrays.equals(serialize(object), bytes);
}

private void serializeDocIdSet(DocIdSet set, BytesStreamOutput os) {
final byte classByte;
if (set.getClass() == BitDocIdSet.class) {
classByte = BIT_DOC_ID_SET_BYTE;
} else if (set.getClass() == RoaringDocIdSet.class) {
classByte = ROARING_DOC_ID_SET_BYTE;
} else {
throw new UnsupportedOperationException("Cannot serialize DocIdSet implementation " + set.getClass());
}
os.writeByte(classByte);

// TODO: For now, just write each doc id as an int until there are no more.
// TODO: overflow check?
try {
DocIdSetIterator iterator = set.iterator();
int nextDoc = -1;
while (nextDoc != NO_MORE_DOCS) {
nextDoc = iterator.nextDoc();
os.writeVInt(nextDoc);
}
os.writeVInt(NO_MORE_DOCS);
os.writeVInt(0); // Spare value for deserializer to read, to spare us from checking if nextDoc == NO_MORE_DOCS on every read
} catch (IOException e) {
throw new OpenSearchException("Error iterating through DocSetIdIterator", e);
}
}

private DocIdSet deserializeDocIdSet(BytesStreamInput is, int maxDoc) {
// TODO: This seems kinda gross...
// For BitSetDocIdSet, you provide it a BitSet, which I guess we set one int at a time.
// For RoaringDocIDSet, we can either do the same thing setting one-by-one in a RoaringDocIdSet.Builder,
// or we can provide it another DocIdSet to wrap, but idk if it handles this smartly or not.
try {
byte classByte = is.readByte();
if (classByte == BIT_DOC_ID_SET_BYTE) {
// TODO: There's several impls for underlying BitSet... I imagine these have perf implications.
// For now pick FixedBitSet semi-arbitrarily to return.
BitSet bitset = new FixedBitSet(maxDoc);
int nextDoc = is.readVInt();
int bitSetLength = 0;
while (nextDoc != NO_MORE_DOCS) {
bitset.set(nextDoc);
nextDoc = is.readVInt();
bitSetLength++;
}
return new BitDocIdSet(bitset, bitSetLength); // TODO: I *think* bitSetLength is the correct value for cost.
} else if (classByte == ROARING_DOC_ID_SET_BYTE) {
RoaringDocIdSet.Builder builder = new RoaringDocIdSet.Builder(maxDoc);
int nextDoc = is.readVInt();
while (nextDoc != NO_MORE_DOCS) {
builder.add(nextDoc);
nextDoc = is.readVInt();
}
return builder.build();
} else {
throw new UnsupportedOperationException("Unknown class byte " + classByte);
}
} catch (IOException e) {
throw new OpenSearchException("Error deserializing DocIdSet", e);
}
}
}
}
Expand Down
Loading

0 comments on commit 8abbb50

Please sign in to comment.