Skip to content

Commit

Permalink
Refactored list and set type read state and read state shard
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Sep 5, 2024
1 parent 3d0e7f0 commit 6293306
Show file tree
Hide file tree
Showing 14 changed files with 420 additions and 492 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,30 +337,25 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro
if(!filter.includes(typeName)) {
HollowListTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema), numShards);
}
} else if(schema instanceof HollowSetSchema) {
if(!filter.includes(typeName)) {
HollowSetTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema), numShards);
}
} else if(schema instanceof HollowMapSchema) {
if(!filter.includes(typeName)) {
HollowMapTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema), numShards);
}
}

return typeName;
}

private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState typeState) throws IOException {
stateEngine.addTypeState(typeState);
typeState.readSnapshot(in, stateEngine.getMemoryRecycler());
}

private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException {
if (numShards<=0 || ((numShards&(numShards-1))!=0)) {
throw new IllegalArgumentException("Number of shards must be a power of 2!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ public BitSet getPreviousOrdinals() {
*/
public abstract int maxOrdinal();

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException;

public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ public IntMap getOrdinalMapping() {
}

public HollowListTypeReadState createHistoricalTypeReadState() {
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(null, typeState.getSchema(), 1);
historicalTypeState.setCurrentData(historicalDataElements);
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(typeState.getSchema(), historicalDataElements);
return historicalTypeState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.hollow.api.sampling.HollowListSampler;
import com.netflix.hollow.api.sampling.HollowSampler;
import com.netflix.hollow.api.sampling.HollowSamplingDirector;
import com.netflix.hollow.core.memory.HollowUnsafeHandle;
import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.memory.encoding.VarInt;
Expand Down Expand Up @@ -49,115 +50,79 @@
public class HollowListTypeReadState extends HollowCollectionTypeReadState implements HollowListTypeDataAccess {
private final HollowListSampler sampler;

final int shardNumberMask;
private final int shardOrdinalShift;
final HollowListTypeReadStateShard shards[];

private int maxOrdinal;

// SNAP: TODO: move shards holder and shards volatile here
class ListTypeShardsHolder extends ShardsHolder {
@Override
public HollowTypeReadStateShard[] getShards() {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public int getShardNumberMask() {
throw new UnsupportedOperationException("Not implemented yet");
}
}
volatile HollowListTypeShardsHolder shardsVolatile;

@Override
public ShardsHolder getShardsVolatile() {
throw new UnsupportedOperationException("Not implemented yet");
return shardsVolatile;
}

@Override
public HollowTypeDataElements[] createTypeDataElements(int len) {
throw new UnsupportedOperationException("Not implemented yet");
public void updateShardsVolatile(HollowTypeReadStateShard[] shards) {
this.shardsVolatile = new HollowListTypeShardsHolder(shards);
}

@Override
public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) {
// schema unused
throw new UnsupportedOperationException("Not implemented yet");
public HollowTypeDataElements[] createTypeDataElements(int len) {
return new HollowListTypeDataElements[len];
}

@Override
public void updateShardsVolatile(HollowTypeReadStateShard[] shards) {
throw new UnsupportedOperationException("Not implemented yet");
}

public HollowListTypeReadState(HollowReadStateEngine stateEngine, HollowListSchema schema, int numShards) {
this(stateEngine, MemoryMode.ON_HEAP, schema, numShards);
public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) {
return new HollowListTypeReadStateShard((HollowListTypeDataElements) dataElements, shardOrdinalShift);
}

public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowListSchema schema, int numShards) {
public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowListSchema schema) {
super(stateEngine, memoryMode, schema);
this.sampler = new HollowListSampler(schema.getName(), DisabledSamplingDirector.INSTANCE);
this.shardNumberMask = numShards - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);

if(numShards < 1 || 1 << shardOrdinalShift != numShards)
throw new IllegalArgumentException("Number of shards must be a power of 2!");

HollowListTypeReadStateShard shards[] = new HollowListTypeReadStateShard[numShards];
for(int i=0;i<shards.length;i++)
shards[i] = new HollowListTypeReadStateShard();

this.shards = shards;
this.shardsVolatile = null;
}

HollowListTypeReadState(MemoryMode memoryMode, HollowListSchema schema, HollowListTypeReadStateShard[] shards) {
super(null, memoryMode, schema);
public HollowListTypeReadState(HollowListSchema schema, HollowListTypeDataElements dataElements) {
super(null, MemoryMode.ON_HEAP, schema);
this.sampler = new HollowListSampler(schema.getName(), DisabledSamplingDirector.INSTANCE);
int numShards = shards.length;
this.shardNumberMask = numShards - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);

if(numShards < 1 || 1 << shardOrdinalShift != numShards)
throw new IllegalArgumentException("Number of shards must be a power of 2!");

this.shards = shards;
HollowListTypeReadStateShard newShard = new HollowListTypeReadStateShard(dataElements, 0);
this.shardsVolatile = new HollowListTypeShardsHolder(new HollowListTypeReadStateShard[] {newShard});
this.maxOrdinal = dataElements.maxOrdinal;
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
if(numShards > 1)
maxOrdinal = VarInt.readVInt(in);

for(int i=0;i<shards.length;i++) {
HollowListTypeDataElements snapshotData = new HollowListTypeDataElements(memoryMode, memoryRecycler);
snapshotData.readSnapshot(in);
shards[i].setCurrentData(snapshotData);

HollowListTypeReadStateShard[] newShards = new HollowListTypeReadStateShard[numShards];
int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);
for(int i=0; i<numShards; i++) {
HollowListTypeDataElements shardDataElements = new HollowListTypeDataElements(memoryMode, memoryRecycler);
shardDataElements.readSnapshot(in);
newShards[i] = new HollowListTypeReadStateShard(shardDataElements, shardOrdinalShift);
}

if(shards.length == 1)
maxOrdinal = shards[0].currentDataElements().maxOrdinal;

shardsVolatile = new HollowListTypeShardsHolder(newShards);

if(shardsVolatile.shards.length == 1)
maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal;

SnapshotPopulatedOrdinalsReader.readOrdinals(in, stateListeners);
}

@Override
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException {
if(shards.length > 1)
if(shardsVolatile.shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

for(int i=0; i<shards.length; i++) {
for(int i=0; i<shardsVolatile.shards.length; i++) {
HollowListTypeDataElements deltaData = new HollowListTypeDataElements(memoryMode, memoryRecycler);
deltaData.readDelta(in);
if(stateEngine.isSkipTypeShardUpdateWithNoAdditions() && deltaData.encodedAdditions.isEmpty()) {

if(!deltaData.encodedRemovals.isEmpty())
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shards.length);
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shardsVolatile.shards.length);

HollowListTypeDataElements currentData = shards[i].currentDataElements();
HollowListTypeDataElements currentData = shardsVolatile.shards[i].dataElements;
GapEncodedVariableLengthIntegerReader oldRemovals = currentData.encodedRemovals == null ? GapEncodedVariableLengthIntegerReader.EMPTY_READER : currentData.encodedRemovals;
if(oldRemovals.isEmpty()) {
currentData.encodedRemovals = deltaData.encodedRemovals;
Expand All @@ -173,19 +138,22 @@ public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecy
deltaData.encodedAdditions.destroy();
} else {
HollowListTypeDataElements nextData = new HollowListTypeDataElements(memoryMode, memoryRecycler);
HollowListTypeDataElements oldData = shards[i].currentDataElements();
HollowListTypeDataElements oldData = shardsVolatile.shards[i].dataElements;
nextData.applyDelta(oldData, deltaData);
shards[i].setCurrentData(nextData);
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shards.length);

HollowListTypeReadStateShard newShard = new HollowListTypeReadStateShard(nextData, shardsVolatile.shards[i].shardOrdinalShift);
shardsVolatile = new HollowListTypeShardsHolder(shardsVolatile.shards, newShard, i);

notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shardsVolatile.shards.length);
deltaData.encodedAdditions.destroy();
oldData.destroy();
}
deltaData.destroy();
stateEngine.getMemoryRecycler().swap();
}

if(shards.length == 1)
maxOrdinal = shards[0].currentDataElements().maxOrdinal;
if(shardsVolatile.shards.length == 1)
maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal;
}

public static void discardSnapshot(HollowBlobInput in, int numShards) throws IOException {
Expand Down Expand Up @@ -215,13 +183,51 @@ public int maxOrdinal() {
@Override
public int getElementOrdinal(int ordinal, int listIndex) {
sampler.recordGet();
return shards[ordinal & shardNumberMask].getElementOrdinal(ordinal >> shardOrdinalShift, listIndex);

HollowListTypeShardsHolder shardsHolder;
HollowListTypeReadStateShard shard;
int shardOrdinal;
int elementOrdinal;
long startElement;
long endElement;

do {

do {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
shardOrdinal = ordinal >> shard.shardOrdinalShift;

startElement = shard.dataElements.getStartElement(shardOrdinal);
endElement = shard.dataElements.getEndElement(shardOrdinal);
} while(readWasUnsafe(shardsHolder, ordinal, shard));

long elementIndex = startElement + listIndex;

if(elementIndex >= endElement)
throw new ArrayIndexOutOfBoundsException("Array index out of bounds: " + listIndex + ", list size: " + (endElement - startElement));

elementOrdinal = (int)shard.dataElements.elementData.getElementValue(elementIndex * shard.dataElements.bitsPerElement, shard.dataElements.bitsPerElement);
} while(readWasUnsafe(shardsHolder, ordinal, shard));

return elementOrdinal;
}

@Override
public int size(int ordinal) {
sampler.recordSize();
return shards[ordinal & shardNumberMask].size(ordinal >> shardOrdinalShift);

HollowListTypeShardsHolder shardsHolder;
HollowListTypeReadStateShard shard;
int size;

do {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];

size = shard.size(ordinal >> shard.shardOrdinalShift);
} while(readWasUnsafe(shardsHolder, ordinal, shard));
return size;
}

@Override
Expand All @@ -231,6 +237,13 @@ public HollowOrdinalIterator ordinalIterator(int ordinal) {
return new HollowListOrdinalIterator(ordinal, this);
}

private boolean readWasUnsafe(HollowListTypeShardsHolder shardsHolder, int ordinal, HollowListTypeReadStateShard shard) {
HollowUnsafeHandle.getUnsafe().loadFence();
HollowListTypeShardsHolder currShardsHolder = shardsVolatile;
return shardsHolder != currShardsHolder
&& (shard != currShardsHolder.shards[ordinal & currShardsHolder.shardNumberMask]);
}

@Override
public HollowSampler getSampler() {
return sampler;
Expand All @@ -254,39 +267,40 @@ public void ignoreUpdateThreadForSampling(Thread t) {
@Override
protected void invalidate() {
stateListeners = EMPTY_LISTENERS;
for(int i=0;i<shards.length;i++)
shards[i].invalidate();
final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards;
int numShards = shards.length;
HollowListTypeReadStateShard[] newShards = new HollowListTypeReadStateShard[numShards];
for (int i=0;i<numShards;i++) {
newShards[i] = new HollowListTypeReadStateShard(null, shards[i].shardOrdinalShift);
}
this.shardsVolatile = new HollowListTypeShardsHolder(newShards);
}

HollowListTypeDataElements[] currentDataElements() {
HollowListTypeDataElements currentDataElements[] = new HollowListTypeDataElements[shards.length];

for(int i=0; i<shards.length; i++)
currentDataElements[i] = shards[i].currentDataElements();

return currentDataElements;
}

void setCurrentData(HollowListTypeDataElements data) {
if(shards.length > 1)
throw new UnsupportedOperationException("Cannot directly set data on sharded type state");
shards[0].setCurrentData(data);
maxOrdinal = data.maxOrdinal;
final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards;
HollowListTypeDataElements[] elements = new HollowListTypeDataElements[shards.length];
for (int i=0;i<shards.length;i++) {
elements[i] = shards[i].dataElements;
}
return elements;
}

@Override
protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema) {
final HollowListTypeShardsHolder shardsHolder = this.shardsVolatile;
final HollowListTypeReadStateShard[] shards = shardsHolder.shards;
if(!getSchema().equals(withSchema))
throw new IllegalArgumentException("HollowListTypeReadState cannot calculate checksum with unequal schemas: " + getSchema().getName());

BitSet populatedOrdinals = getListener(PopulatedOrdinalListener.class).getPopulatedOrdinals();

for(int i=0; i<shards.length; i++)
shards[i].applyToChecksum(checksum, populatedOrdinals, i, shards.length);
shards[i].applyShardToChecksum(checksum, populatedOrdinals, i, shards.length);
}

@Override
public long getApproximateHeapFootprintInBytes() {
final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards;
long totalApproximateHeapFootprintInBytes = 0;

for(int i=0; i<shards.length; i++)
Expand All @@ -297,6 +311,7 @@ public long getApproximateHeapFootprintInBytes() {

@Override
public long getApproximateHoleCostInBytes() {
final HollowListTypeReadStateShard[] shards = this.shardsVolatile.shards;
long totalApproximateHoleCostInBytes = 0;

BitSet populatedOrdinals = getPopulatedOrdinals();
Expand All @@ -309,6 +324,6 @@ public long getApproximateHoleCostInBytes() {

@Override
public int numShards() {
return shards.length;
} // SNAP: TODO: move to abstract?
return this.shardsVolatile.shards.length;
}
}
Loading

0 comments on commit 6293306

Please sign in to comment.