diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java index fa03605eb..7ca63fd9f 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java @@ -337,30 +337,25 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro if(!filter.includes(typeName)) { HollowListTypeReadState.discardSnapshot(in, numShards); } else { - populateTypeStateSnapshot(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema, numShards)); + populateTypeStateSnapshotWithNumShards(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema), numShards); } } else if(schema instanceof HollowSetSchema) { if(!filter.includes(typeName)) { HollowSetTypeReadState.discardSnapshot(in, numShards); } else { - populateTypeStateSnapshot(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema, numShards)); + populateTypeStateSnapshotWithNumShards(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema), numShards); } } else if(schema instanceof HollowMapSchema) { if(!filter.includes(typeName)) { HollowMapTypeReadState.discardSnapshot(in, numShards); } else { - populateTypeStateSnapshot(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema, numShards)); + populateTypeStateSnapshotWithNumShards(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema), numShards); } } return typeName; } - private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState typeState) throws IOException { - stateEngine.addTypeState(typeState); - typeState.readSnapshot(in, stateEngine.getMemoryRecycler()); - } - private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException { if (numShards<=0 || ((numShards&(numShards-1))!=0)) { throw new IllegalArgumentException("Number of shards must be a power of 2!"); diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java index f75e9dfa3..c8cd80c83 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadState.java @@ -121,8 +121,6 @@ public BitSet getPreviousOrdinals() { */ public abstract int maxOrdinal(); - public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException; - public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException; public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException; diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListDeltaHistoricalStateCreator.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListDeltaHistoricalStateCreator.java index ee378698c..3c4427305 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListDeltaHistoricalStateCreator.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListDeltaHistoricalStateCreator.java @@ -81,8 +81,7 @@ public IntMap getOrdinalMapping() { } public HollowListTypeReadState createHistoricalTypeReadState() { - HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(null, typeState.getSchema(), 1); - historicalTypeState.setCurrentData(historicalDataElements); + HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(typeState.getSchema(), historicalDataElements); return historicalTypeState; } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java index 167dc0669..b1b17af89 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java @@ -20,6 +20,7 @@ import com.netflix.hollow.api.sampling.HollowListSampler; import com.netflix.hollow.api.sampling.HollowSampler; import com.netflix.hollow.api.sampling.HollowSamplingDirector; +import com.netflix.hollow.core.memory.HollowUnsafeHandle; import com.netflix.hollow.core.memory.MemoryMode; import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; import com.netflix.hollow.core.memory.encoding.VarInt; @@ -49,115 +50,79 @@ public class HollowListTypeReadState extends HollowCollectionTypeReadState implements HollowListTypeDataAccess { private final HollowListSampler sampler; - final int shardNumberMask; - private final int shardOrdinalShift; - final HollowListTypeReadStateShard shards[]; - private int maxOrdinal; - // SNAP: TODO: move shards holder and shards volatile here - class ListTypeShardsHolder extends ShardsHolder { - @Override - public HollowTypeReadStateShard[] getShards() { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - public int getShardNumberMask() { - throw new UnsupportedOperationException("Not implemented yet"); - } - } + volatile HollowListTypeShardsHolder shardsVolatile; @Override public ShardsHolder getShardsVolatile() { - throw new UnsupportedOperationException("Not implemented yet"); + return shardsVolatile; } @Override - public HollowTypeDataElements[] createTypeDataElements(int len) { - throw new UnsupportedOperationException("Not implemented yet"); + public void updateShardsVolatile(HollowTypeReadStateShard[] shards) { + this.shardsVolatile = new HollowListTypeShardsHolder(shards); } @Override - public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) { - // schema unused - throw new UnsupportedOperationException("Not implemented yet"); + public HollowTypeDataElements[] createTypeDataElements(int len) { + return new HollowListTypeDataElements[len]; } @Override - public void updateShardsVolatile(HollowTypeReadStateShard[] shards) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - public HollowListTypeReadState(HollowReadStateEngine stateEngine, HollowListSchema schema, int numShards) { - this(stateEngine, MemoryMode.ON_HEAP, schema, numShards); + public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) { + return new HollowListTypeReadStateShard((HollowListTypeDataElements) dataElements, shardOrdinalShift); } - public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowListSchema schema, int numShards) { + public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowListSchema schema) { super(stateEngine, memoryMode, schema); this.sampler = new HollowListSampler(schema.getName(), DisabledSamplingDirector.INSTANCE); - this.shardNumberMask = numShards - 1; - this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards); - - if(numShards < 1 || 1 << shardOrdinalShift != numShards) - throw new IllegalArgumentException("Number of shards must be a power of 2!"); - - HollowListTypeReadStateShard shards[] = new HollowListTypeReadStateShard[numShards]; - for(int i=0;i 1) + if(numShards > 1) maxOrdinal = VarInt.readVInt(in); - - for(int i=0;i 1) + if(shardsVolatile.shards.length > 1) maxOrdinal = VarInt.readVInt(in); - for(int i=0; i> shardOrdinalShift, listIndex); + + HollowListTypeShardsHolder shardsHolder; + HollowListTypeReadStateShard shard; + int shardOrdinal; + int elementOrdinal; + long startElement; + long endElement; + + do { + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + shardOrdinal = ordinal >> shard.shardOrdinalShift; + + startElement = shard.dataElements.getStartElement(shardOrdinal); + endElement = shard.dataElements.getEndElement(shardOrdinal); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + long elementIndex = startElement + listIndex; + + if(elementIndex >= endElement) + throw new ArrayIndexOutOfBoundsException("Array index out of bounds: " + listIndex + ", list size: " + (endElement - startElement)); + + elementOrdinal = (int)shard.dataElements.elementData.getElementValue(elementIndex * shard.dataElements.bitsPerElement, shard.dataElements.bitsPerElement); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + return elementOrdinal; } @Override public int size(int ordinal) { sampler.recordSize(); - return shards[ordinal & shardNumberMask].size(ordinal >> shardOrdinalShift); + + HollowListTypeShardsHolder shardsHolder; + HollowListTypeReadStateShard shard; + int size; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + + size = shard.size(ordinal >> shard.shardOrdinalShift); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + return size; } @Override @@ -231,6 +237,13 @@ public HollowOrdinalIterator ordinalIterator(int ordinal) { return new HollowListOrdinalIterator(ordinal, this); } + private boolean readWasUnsafe(HollowListTypeShardsHolder shardsHolder, int ordinal, HollowListTypeReadStateShard shard) { + HollowUnsafeHandle.getUnsafe().loadFence(); + HollowListTypeShardsHolder currShardsHolder = shardsVolatile; + return shardsHolder != currShardsHolder + && (shard != currShardsHolder.shards[ordinal & currShardsHolder.shardNumberMask]); + } + @Override public HollowSampler getSampler() { return sampler; @@ -254,39 +267,40 @@ public void ignoreUpdateThreadForSampling(Thread t) { @Override protected void invalidate() { stateListeners = EMPTY_LISTENERS; - for(int i=0;i 1) - throw new UnsupportedOperationException("Cannot directly set data on sharded type state"); - shards[0].setCurrentData(data); - maxOrdinal = data.maxOrdinal; + final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards; + HollowListTypeDataElements[] elements = new HollowListTypeDataElements[shards.length]; + for (int i=0;i= endElement) - throw new ArrayIndexOutOfBoundsException("Array index out of bounds: " + listIndex + ", list size: " + (endElement - startElement)); - - elementOrdinal = (int)currentData.elementData.getElementValue(elementIndex * currentData.bitsPerElement, currentData.bitsPerElement); - } while(readWasUnsafe(currentData)); - - return elementOrdinal; - } - - public int size(int ordinal) { - HollowListTypeDataElements currentData; - int size; - - do { - currentData = this.currentDataVolatile; - long startElement = currentData.getStartElement(ordinal); - long endElement = currentData.getEndElement(ordinal); - size = (int)(endElement - startElement); - } while(readWasUnsafe(currentData)); - - return size; + @Override + public HollowListTypeDataElements getDataElements() { + return dataElements; } - void invalidate() { - setCurrentData(null); + @Override + public int getShardOrdinalShift() { + return shardOrdinalShift; } - HollowListTypeDataElements currentDataElements() { - return currentDataVolatile; + public HollowListTypeReadStateShard(HollowListTypeDataElements dataElements, int shardOrdinalShift) { + this.shardOrdinalShift = shardOrdinalShift; + this.dataElements = dataElements; } - private boolean readWasUnsafe(HollowListTypeDataElements data) { - HollowUnsafeHandle.getUnsafe().loadFence(); - return data != currentDataVolatile; - } + public int size(int ordinal) { + long startElement = dataElements.getStartElement(ordinal); + long endElement = dataElements.getEndElement(ordinal); + int size = (int)(endElement - startElement); - void setCurrentData(HollowListTypeDataElements data) { - this.currentDataVolatile = data; + return size; } - protected void applyToChecksum(HollowChecksum checksum, BitSet populatedOrdinals, int shardNumber, int numShards) { + protected void applyShardToChecksum(HollowChecksum checksum, BitSet populatedOrdinals, int shardNumber, int numShards) { int ordinal = populatedOrdinals.nextSetBit(shardNumber); while(ordinal != ORDINAL_NONE) { if((ordinal & (numShards - 1)) == shardNumber) { @@ -104,22 +71,31 @@ protected void applyToChecksum(HollowChecksum checksum, BitSet populatedOrdinals } } + private int getElementOrdinal(int ordinal, int listIndex) { + long startElement = dataElements.getStartElement(ordinal); + long endElement = dataElements.getEndElement(ordinal); + long elementIndex = startElement + listIndex; + if(elementIndex >= endElement) + throw new ArrayIndexOutOfBoundsException("Array index out of bounds: " + listIndex + ", list size: " + (endElement - startElement)); + + int elementOrdinal = (int)dataElements.elementData.getElementValue(elementIndex * dataElements.bitsPerElement, dataElements.bitsPerElement); + return elementOrdinal; + } + public long getApproximateHeapFootprintInBytes() { - HollowListTypeDataElements currentData = currentDataVolatile; - long requiredListPointerBits = ((long)currentData.maxOrdinal + 1) * currentData.bitsPerListPointer; - long requiredElementBits = currentData.totalNumberOfElements * currentData.bitsPerElement; + long requiredListPointerBits = ((long)dataElements.maxOrdinal + 1) * dataElements.bitsPerListPointer; + long requiredElementBits = dataElements.totalNumberOfElements * dataElements.bitsPerElement; long requiredBits = requiredListPointerBits + requiredElementBits; return requiredBits / 8; } public long getApproximateHoleCostInBytes(BitSet populatedOrdinals, int shardNumber, int numShards) { - HollowListTypeDataElements currentData = currentDataVolatile; long holeBits = 0; int holeOrdinal = populatedOrdinals.nextClearBit(0); - while(holeOrdinal <= currentData.maxOrdinal) { + while(holeOrdinal <= dataElements.maxOrdinal) { if((holeOrdinal & (numShards - 1)) == shardNumber) - holeBits += currentData.bitsPerListPointer; + holeBits += dataElements.bitsPerListPointer; holeOrdinal = populatedOrdinals.nextClearBit(holeOrdinal + 1); } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapDeltaHistoricalStateCreator.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapDeltaHistoricalStateCreator.java index b8e8ef763..523af972e 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapDeltaHistoricalStateCreator.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapDeltaHistoricalStateCreator.java @@ -82,8 +82,7 @@ public IntMap getOrdinalMapping() { } public HollowMapTypeReadState createHistoricalTypeReadState() { - HollowMapTypeReadState historicalTypeState = new HollowMapTypeReadState(null, typeState.getSchema(), 1); - historicalTypeState.setCurrentData(historicalDataElements); + HollowMapTypeReadState historicalTypeState = new HollowMapTypeReadState(typeState.getSchema(), historicalDataElements); return historicalTypeState; } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java index 78227dd58..7fcc25593 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java @@ -61,117 +61,81 @@ public class HollowMapTypeReadState extends HollowTypeReadState implements Hollo private final HollowMapSampler sampler; - private final int shardNumberMask; - private final int shardOrdinalShift; - final HollowMapTypeReadStateShard shards[]; - private HollowPrimaryKeyValueDeriver keyDeriver; private int maxOrdinal; - class MapTypeShardsHolder extends ShardsHolder { - @Override - public HollowTypeReadStateShard[] getShards() { - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - public int getShardNumberMask() { - throw new UnsupportedOperationException("Not implemented yet"); - } - } + volatile HollowMapTypeShardsHolder shardsVolatile; @Override public ShardsHolder getShardsVolatile() { - throw new UnsupportedOperationException("Not implemented yet"); + return shardsVolatile; } @Override - public HollowTypeDataElements[] createTypeDataElements(int len) { - throw new UnsupportedOperationException("Not implemented yet"); + public void updateShardsVolatile(HollowTypeReadStateShard[] shards) { + this.shardsVolatile = new HollowMapTypeShardsHolder(shards); } @Override - public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) { - // schema unused - throw new UnsupportedOperationException("Not implemented yet"); + public HollowTypeDataElements[] createTypeDataElements(int len) { + return new HollowMapTypeDataElements[len]; } @Override - public void updateShardsVolatile(HollowTypeReadStateShard[] shards) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - public HollowMapTypeReadState(HollowReadStateEngine stateEngine, HollowMapSchema schema, int numShards) { - this(stateEngine, MemoryMode.ON_HEAP, schema, numShards); + public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) { + return new HollowMapTypeReadStateShard((HollowMapTypeDataElements) dataElements, shardOrdinalShift); } - public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowMapSchema schema, int numShards) { + public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowMapSchema schema) { super(stateEngine, memoryMode, schema); this.sampler = new HollowMapSampler(schema.getName(), DisabledSamplingDirector.INSTANCE); - this.shardNumberMask = numShards - 1; - this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards); - - if(numShards < 1 || 1 << shardOrdinalShift != numShards) - throw new IllegalArgumentException("Number of shards must be a power of 2!"); - - HollowMapTypeReadStateShard shards[] = new HollowMapTypeReadStateShard[numShards]; - for(int i=0; i 1) + if(numShards > 1) maxOrdinal = VarInt.readVInt(in); - - for(int i=0; i 1) + if(shardsVolatile.shards.length > 1) maxOrdinal = VarInt.readVInt(in); - for(int i=0; i 1) diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetDeltaHistoricalStateCreator.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetDeltaHistoricalStateCreator.java index 64e2a86e7..2d740f644 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetDeltaHistoricalStateCreator.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetDeltaHistoricalStateCreator.java @@ -82,8 +82,7 @@ public IntMap getOrdinalMapping() { } public HollowSetTypeReadState createHistoricalTypeReadState() { - HollowSetTypeReadState historicalTypeState = new HollowSetTypeReadState(null, typeState.getSchema(), 1); - historicalTypeState.setCurrentData(historicalDataElements); + HollowSetTypeReadState historicalTypeState = new HollowSetTypeReadState(typeState.getSchema(), historicalDataElements); return historicalTypeState; } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReadState.java index 01d317c78..7712ad9e2 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReadState.java @@ -25,8 +25,10 @@ import com.netflix.hollow.api.sampling.HollowSetSampler; import com.netflix.hollow.core.index.FieldPaths; import com.netflix.hollow.core.index.key.HollowPrimaryKeyValueDeriver; +import com.netflix.hollow.core.memory.HollowUnsafeHandle; import com.netflix.hollow.core.memory.MemoryMode; import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; +import com.netflix.hollow.core.memory.encoding.HashCodes; import com.netflix.hollow.core.memory.encoding.VarInt; import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler; import com.netflix.hollow.core.read.HollowBlobInput; @@ -37,12 +39,14 @@ import com.netflix.hollow.core.read.engine.HollowTypeReadState; import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard; import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener; +import com.netflix.hollow.core.read.engine.SetMapKeyHasher; import com.netflix.hollow.core.read.engine.ShardsHolder; import com.netflix.hollow.core.read.engine.SnapshotPopulatedOrdinalsReader; import com.netflix.hollow.core.read.filter.HollowFilterConfig; import com.netflix.hollow.core.read.iterator.EmptyOrdinalIterator; import com.netflix.hollow.core.read.iterator.HollowOrdinalIterator; import com.netflix.hollow.core.read.iterator.HollowSetOrdinalIterator; +import com.netflix.hollow.core.schema.HollowMapSchema; import com.netflix.hollow.core.schema.HollowObjectSchema.FieldType; import com.netflix.hollow.core.schema.HollowSchema; import com.netflix.hollow.core.schema.HollowSetSchema; @@ -60,117 +64,81 @@ public class HollowSetTypeReadState extends HollowCollectionTypeReadState implem private final HollowSetSampler sampler; - private final int shardNumberMask; - private final int shardOrdinalShift; - final HollowSetTypeReadStateShard shards[]; - - private HollowPrimaryKeyValueDeriver keyDeriver; - - private int maxOrdinal; - class SetTypeShardsHolder extends ShardsHolder { - @Override - public HollowTypeReadStateShard[] getShards() { - throw new UnsupportedOperationException("Not implemented yet"); - } + private int maxOrdinal; - @Override - public int getShardNumberMask() { - throw new UnsupportedOperationException("Not implemented yet"); - } - } + private volatile HollowPrimaryKeyValueDeriver keyDeriver; + volatile HollowSetTypeShardsHolder shardsVolatile; @Override public ShardsHolder getShardsVolatile() { - throw new UnsupportedOperationException("Not implemented yet"); + return shardsVolatile; } @Override - public HollowTypeDataElements[] createTypeDataElements(int len) { - throw new UnsupportedOperationException("Not implemented yet"); + public void updateShardsVolatile(HollowTypeReadStateShard[] shards) { + this.shardsVolatile = new HollowSetTypeShardsHolder(shards); } @Override - public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) { - // schema unused - throw new UnsupportedOperationException("Not implemented yet"); + public HollowTypeDataElements[] createTypeDataElements(int len) { + return new HollowSetTypeDataElements[len]; } @Override - public void updateShardsVolatile(HollowTypeReadStateShard[] shards) { - throw new UnsupportedOperationException("Not implemented yet"); - } - - public HollowSetTypeReadState(HollowReadStateEngine stateEngine, HollowSetSchema schema, int numShards) { - this(stateEngine, MemoryMode.ON_HEAP, schema, numShards); + public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) { + return new HollowSetTypeReadStateShard((HollowSetTypeDataElements)dataElements, shardOrdinalShift); } - public HollowSetTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowSetSchema schema, int numShards) { + public HollowSetTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowMapSchema schema) { super(stateEngine, memoryMode, schema); this.sampler = new HollowSetSampler(schema.getName(), DisabledSamplingDirector.INSTANCE); - this.shardNumberMask = numShards - 1; - this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards); - - if(numShards < 1 || 1 << shardOrdinalShift != numShards) - throw new IllegalArgumentException("Number of shards must be a power of 2!"); - - HollowSetTypeReadStateShard shards[] = new HollowSetTypeReadStateShard[numShards]; - for(int i=0;i 1) + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + if(numShards > 1) maxOrdinal = VarInt.readVInt(in); - - for(int i=0;i 1) + if(shardsVolatile.shards.length > 1) maxOrdinal = VarInt.readVInt(in); - for(int i=0;i> shardOrdinalShift); + + HollowSetTypeShardsHolder shardsHolder; + HollowSetTypeReadStateShard shard; + int size; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + + size = shard.size(ordinal >> shard.shardOrdinalShift); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + return size; } @Override @@ -234,28 +217,122 @@ public boolean contains(int ordinal, int value) { @Override public boolean contains(int ordinal, int value, int hashCode) { sampler.recordGet(); - return shards[ordinal & shardNumberMask].contains(ordinal >> shardOrdinalShift, value, hashCode); + HollowSetTypeShardsHolder shardsHolder; + HollowSetTypeReadStateShard shard; + int shardOrdinal; + boolean foundData; + + threadsafe: + do { + long startBucket; + long endBucket; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + shardOrdinal = ordinal >> shard.shardOrdinalShift; + + startBucket = shard.dataElements.getStartBucket(shardOrdinal); + endBucket = shard.dataElements.getEndBucket(shardOrdinal); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + hashCode = HashCodes.hashInt(hashCode); + long bucket = startBucket + (hashCode & (endBucket - startBucket - 1)); + int bucketOrdinal = shard.dataElements.getBucketValue(bucket); + + while(bucketOrdinal != shard.dataElements.emptyBucketValue) { + if(bucketOrdinal == value) { + foundData = true; + continue threadsafe; + } + bucket++; + if(bucket == endBucket) + bucket = startBucket; + bucketOrdinal = shard.dataElements.getBucketValue(bucket); + } + + foundData = false; + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + return foundData; } @Override public int findElement(int ordinal, Object... hashKey) { + HollowSetTypeShardsHolder shardsHolder; + HollowSetTypeReadStateShard shard; + int shardOrdinal; + sampler.recordGet(); if(keyDeriver == null) return ORDINAL_NONE; - + FieldType[] fieldTypes = keyDeriver.getFieldTypes(); if(hashKey.length != fieldTypes.length) return ORDINAL_NONE; - return shards[ordinal & shardNumberMask].findElement(ordinal >> shardOrdinalShift, hashKey); + int hashCode = SetMapKeyHasher.hash(hashKey, keyDeriver.getFieldTypes()); + + threadsafe: + do { + long startBucket; + long endBucket; + + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask]; + shardOrdinal = ordinal >> shard.shardOrdinalShift; + + startBucket = shard.dataElements.getStartBucket(shardOrdinal); + endBucket = shard.dataElements.getEndBucket(shardOrdinal); + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + long bucket = startBucket + (hashCode & (endBucket - startBucket - 1)); + int bucketOrdinal = shard.dataElements.getBucketValue(bucket); + + while(bucketOrdinal != shard.dataElements.emptyBucketValue) { + if(readWasUnsafe(shardsHolder, ordinal, shard)) + continue threadsafe; + + if(keyDeriver.keyMatches(bucketOrdinal, hashKey)) + return bucketOrdinal; + + bucket++; + if(bucket == endBucket) + bucket = startBucket; + bucketOrdinal = shard.dataElements.getBucketValue(bucket); + } + + } while(readWasUnsafe(shardsHolder, ordinal, shard)); + + return ORDINAL_NONE; } @Override public int relativeBucketValue(int setOrdinal, int bucketIndex) { - return shards[setOrdinal & shardNumberMask].relativeBucketValue(setOrdinal >> shardOrdinalShift, bucketIndex); + HollowSetTypeShardsHolder shardsHolder; + HollowSetTypeReadStateShard shard; + int value; + + do { + long startBucket; + do { + shardsHolder = this.shardsVolatile; + shard = shardsHolder.shards[setOrdinal & shardsHolder.shardNumberMask]; + + startBucket = shard.dataElements.getStartBucket(setOrdinal >> shard.shardOrdinalShift); + } while (readWasUnsafe(shardsHolder, setOrdinal, shard)); + + value = shard.dataElements.getBucketValue(startBucket + bucketIndex); + + if(value == shard.dataElements.emptyBucketValue) + value = ORDINAL_NONE; + } while (readWasUnsafe(shardsHolder, setOrdinal, shard)); + + return value; } @Override @@ -274,6 +351,13 @@ public HollowOrdinalIterator ordinalIterator(int ordinal) { return new HollowSetOrdinalIterator(ordinal, this); } + private boolean readWasUnsafe(HollowSetTypeShardsHolder shardsHolder, int ordinal, HollowSetTypeReadStateShard shard) { + HollowUnsafeHandle.getUnsafe().loadFence(); + HollowSetTypeShardsHolder currShardsHolder = shardsVolatile; + return shardsHolder != currShardsHolder + && (shard != currShardsHolder.shards[ordinal & currShardsHolder.shardNumberMask]); + } + @Override public HollowSetSchema getSchema() { return (HollowSetSchema)schema; @@ -302,41 +386,42 @@ public void ignoreUpdateThreadForSampling(Thread t) { @Override protected void invalidate() { stateListeners = EMPTY_LISTENERS; - for(int i=0;i 1) - throw new UnsupportedOperationException("Cannot directly set data on sharded type state"); - shards[0].setCurrentData(data); - maxOrdinal = data.maxOrdinal; + final HollowSetTypeReadStateShard[] shards = this.shardsVolatile.shards; + HollowSetTypeDataElements[] elements = new HollowSetTypeDataElements[shards.length]; + for (int i=0;i