Skip to content

Commit

Permalink
shard holders for other types
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Sep 4, 2024
1 parent 37a26dc commit 882de97
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,6 @@ public void reshard(int newNumShards) {

HollowTypeReadStateShard[] newShards = Arrays.copyOfRange(getShardsVolatile().getShards(), 0, newNumShards);
updateShardsVolatile(newShards);
// setShardsVolatile( // SNAP: TODO: constructor has HollowObjectTypeReadStateShard[] parameter
// getShardsVolatile().getClass().getConstructor(HollowTypeReadStateShard[].class).newInstance(
// newShards
// )
//); // SNAP: TODO: is this better addressed with a factory method in the interface, or by switching over to an abstract class?
// SNAP: TODO: can do generics with reflective constructor invocation here, or create an abstract method reassignShardsHolder(), or check instanceOf or assignableFrom

// Re-sharding done.
// shardsVolatile now contains newNumShards shards where each shard contains
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public HollowTypeDataElementsSplitter createDataElementsSplitter(HollowTypeDataE

@Override
public HollowTypeDataElementsJoiner createDataElementsJoiner(HollowTypeDataElements[] from) {
// SNAP: TODO: array casting?
return new HollowListTypeDataElementsJoiner((HollowListTypeDataElements[]) from);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard;
import com.netflix.hollow.core.read.engine.ShardsHolder;

public class HollowListTypeShardsHolder extends ShardsHolder {
final HollowListTypeReadStateShard shards[];
final int shardNumberMask;

/**
* Thread safe construction of ShardHolder with given shards
* @param fromShards shards to be used
*/
public HollowListTypeShardsHolder(HollowTypeReadStateShard[] fromShards) {
this.shards = new HollowListTypeReadStateShard[fromShards.length];
for (int i=0; i<fromShards.length; i++) {
this.shards[i] = (HollowListTypeReadStateShard) fromShards[i];
}
this.shardNumberMask = fromShards.length - 1;
}

/**
* Thread safe construction of a ShardHolder which has all the shards from {@code oldShards} except
* the shard at index {@code newShardIndex}, using the shard {@code newShard} at that index instead.
* @param oldShards original shards
* @param newShard a new shard
* @param newShardIndex index at which to place the new shard
*/
HollowListTypeShardsHolder(HollowListTypeReadStateShard[] oldShards, HollowListTypeReadStateShard newShard, int newShardIndex) {
int numShards = oldShards.length;
HollowListTypeReadStateShard[] shards = new HollowListTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
if (i == newShardIndex) {
shards[i] = newShard;
} else {
shards[i] = oldShards[i];
}
}
this.shards = shards;
this.shardNumberMask = numShards - 1;
}

@Override
public HollowTypeReadStateShard[] getShards() {
throw new UnsupportedOperationException("Not implemented yet");
// return shards;
}

@Override
public int getShardNumberMask() {
return shardNumberMask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public HollowTypeDataElementsSplitter createDataElementsSplitter(HollowTypeDataE

@Override
public HollowTypeDataElementsJoiner createDataElementsJoiner(HollowTypeDataElements[] from) {
// SNAP: TODO: array casting?
return new HollowMapTypeDataElementsJoiner((HollowMapTypeDataElements[]) from);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.netflix.hollow.core.read.engine.map;

import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard;
import com.netflix.hollow.core.read.engine.ShardsHolder;

public class HollowMapTypeShardsHolder extends ShardsHolder {
final HollowMapTypeReadStateShard shards[];
final int shardNumberMask;

/**
* Thread safe construction of ShardHolder with given shards
* @param fromShards shards to be used
*/
public HollowMapTypeShardsHolder(HollowTypeReadStateShard[] fromShards) {
this.shards = new HollowMapTypeReadStateShard[fromShards.length];
for (int i=0; i<fromShards.length; i++) {
this.shards[i] = (HollowMapTypeReadStateShard) fromShards[i];
}
this.shardNumberMask = fromShards.length - 1;
}

/**
* Thread safe construction of a ShardHolder which has all the shards from {@code oldShards} except
* the shard at index {@code newShardIndex}, using the shard {@code newShard} at that index instead.
* @param oldShards original shards
* @param newShard a new shard
* @param newShardIndex index at which to place the new shard
*/
HollowMapTypeShardsHolder(HollowMapTypeReadStateShard[] oldShards, HollowMapTypeReadStateShard newShard, int newShardIndex) {
int numShards = oldShards.length;
HollowMapTypeReadStateShard[] shards = new HollowMapTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
if (i == newShardIndex) {
shards[i] = newShard;
} else {
shards[i] = oldShards[i];
}
}
this.shards = shards;
this.shardNumberMask = numShards - 1;
}

@Override
public HollowTypeReadStateShard[] getShards() {
throw new UnsupportedOperationException("Not implemented yet");
// return shards;
}

@Override
public int getShardNumberMask() {
return shardNumberMask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class HollowObjectDeltaHistoricalStateCreator {
private final HollowObjectTypeDataElements historicalDataElements;

private HollowObjectTypeReadState typeState;
private ObjectTypeShardsHolder shardsHolder;
private HollowObjectTypeShardsHolder shardsHolder;
private RemovedOrdinalIterator iter;
private IntMap ordinalMapping;
private int nextOrdinal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ public class HollowObjectTypeReadState extends HollowTypeReadState implements Ho

private int maxOrdinal;

volatile ObjectTypeShardsHolder shardsVolatile;
volatile HollowObjectTypeShardsHolder shardsVolatile;

@Override
public ObjectTypeShardsHolder getShardsVolatile() {
public HollowObjectTypeShardsHolder getShardsVolatile() {
return shardsVolatile;
}

@Override
public void updateShardsVolatile(HollowTypeReadStateShard[] shards) {
this.shardsVolatile = new ObjectTypeShardsHolder(shards);
this.shardsVolatile = new HollowObjectTypeShardsHolder(shards);
}

@Override
Expand All @@ -90,7 +90,7 @@ public HollowObjectTypeReadState(HollowObjectSchema schema, HollowObjectTypeData
this.unfilteredSchema = schema;

HollowObjectTypeReadStateShard newShard = new HollowObjectTypeReadStateShard(schema, dataElements, 0);
this.shardsVolatile = new ObjectTypeShardsHolder(new HollowObjectTypeReadStateShard[] {newShard});
this.shardsVolatile = new HollowObjectTypeShardsHolder(new HollowObjectTypeReadStateShard[] {newShard});
this.maxOrdinal = dataElements.maxOrdinal;
}

Expand Down Expand Up @@ -121,7 +121,7 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
shardDataElements.readSnapshot(in, unfilteredSchema);
newShards[i] = new HollowObjectTypeReadStateShard(getSchema(), shardDataElements, shardOrdinalShift);
}
shardsVolatile = new ObjectTypeShardsHolder(newShards);
shardsVolatile = new HollowObjectTypeShardsHolder(newShards);

if(shardsVolatile.shards.length == 1)
maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal;
Expand Down Expand Up @@ -165,7 +165,7 @@ public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmen
nextData.applyDelta(oldData, deltaData);

HollowObjectTypeReadStateShard newShard = new HollowObjectTypeReadStateShard(getSchema(), nextData, shardsVolatile.shards[i].shardOrdinalShift);
shardsVolatile = new ObjectTypeShardsHolder(shardsVolatile.shards, newShard, i);
shardsVolatile = new HollowObjectTypeShardsHolder(shardsVolatile.shards, newShard, i);

notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shardsVolatile.shards.length);
deltaData.encodedAdditions.destroy();
Expand Down Expand Up @@ -197,7 +197,7 @@ public static void discardType(HollowBlobInput in, HollowObjectSchema schema, in
public boolean isNull(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
long fixedLengthValue;

Expand Down Expand Up @@ -225,7 +225,7 @@ public boolean isNull(int ordinal, int fieldIndex) {
public int readOrdinal(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
long refOrdinal;

Expand All @@ -244,7 +244,7 @@ public int readOrdinal(int ordinal, int fieldIndex) {
public int readInt(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
long value;

Expand All @@ -263,7 +263,7 @@ public int readInt(int ordinal, int fieldIndex) {
public float readFloat(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
int value;

Expand All @@ -282,7 +282,7 @@ public float readFloat(int ordinal, int fieldIndex) {
public double readDouble(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
long value;

Expand All @@ -301,7 +301,7 @@ public double readDouble(int ordinal, int fieldIndex) {
public long readLong(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
long value;

Expand All @@ -320,7 +320,7 @@ public long readLong(int ordinal, int fieldIndex) {
public Boolean readBoolean(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
long value;

Expand All @@ -339,7 +339,7 @@ public Boolean readBoolean(int ordinal, int fieldIndex) {
public byte[] readBytes(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
byte[] result;
int numBitsForField;
Expand Down Expand Up @@ -370,7 +370,7 @@ public byte[] readBytes(int ordinal, int fieldIndex) {
public String readString(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
String result;
int numBitsForField;
Expand Down Expand Up @@ -401,7 +401,7 @@ public String readString(int ordinal, int fieldIndex) {
public boolean isStringFieldEqual(int ordinal, int fieldIndex, String testValue) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
boolean result;
int numBitsForField;
Expand Down Expand Up @@ -432,7 +432,7 @@ public boolean isStringFieldEqual(int ordinal, int fieldIndex, String testValue)
public int findVarLengthFieldHashCode(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);

ObjectTypeShardsHolder shardsHolder;
HollowObjectTypeShardsHolder shardsHolder;
HollowObjectTypeReadStateShard shard;
int hashCode;
int numBitsForField;
Expand All @@ -459,7 +459,7 @@ public int findVarLengthFieldHashCode(int ordinal, int fieldIndex) {
return hashCode;
}

private boolean readWasUnsafe(ObjectTypeShardsHolder shardsHolder, int ordinal, HollowObjectTypeReadStateShard shard) {
private boolean readWasUnsafe(HollowObjectTypeShardsHolder shardsHolder, int ordinal, HollowObjectTypeReadStateShard shard) {
// Use a load (acquire) fence to constrain the compiler reordering prior plain loads so
// that they cannot "float down" below the volatile load of shardsVolatile.
// This ensures data is checked against current shard holder *after* optimistic calculations
Expand Down Expand Up @@ -487,7 +487,7 @@ private boolean readWasUnsafe(ObjectTypeShardsHolder shardsHolder, int ordinal,
// [Comment credit: Paul Sandoz]
//
HollowUnsafeHandle.getUnsafe().loadFence();
ObjectTypeShardsHolder currShardsHolder = shardsVolatile; // SNAP: TODO: cant cast here
HollowObjectTypeShardsHolder currShardsHolder = shardsVolatile;
// Validate against the underlying shard so that, during a delta application that involves re-sharding the worst
// case no. of times a read will be invalidated is 3: when shards are expanded or truncated, when a shard is affected
// by a split or join, and finally when delta is applied to a shard. If only shardsHolder was checked here, the
Expand Down Expand Up @@ -529,7 +529,7 @@ protected void invalidate() {
for (int i=0;i<numShards;i++) {
newShards[i] = new HollowObjectTypeReadStateShard(getSchema(), null, shards[i].shardOrdinalShift);
}
this.shardsVolatile = new ObjectTypeShardsHolder(newShards);
this.shardsVolatile = new HollowObjectTypeShardsHolder(newShards);
}

@Override
Expand Down Expand Up @@ -558,7 +558,7 @@ HollowObjectTypeDataElements[] currentDataElements() {

@Override
protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema) {
final ObjectTypeShardsHolder shardsHolder = this.shardsVolatile;
final HollowObjectTypeShardsHolder shardsHolder = this.shardsVolatile;
final HollowObjectTypeReadStateShard[] shards = shardsHolder.shards;
int shardNumberMask = shardsHolder.shardNumberMask;
if(!(withSchema instanceof HollowObjectSchema))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public HollowTypeDataElementsSplitter createDataElementsSplitter(HollowTypeDataE

@Override
public HollowTypeDataElementsJoiner createDataElementsJoiner(HollowTypeDataElements[] from) {
// SNAP: TODO: array casting?
return new HollowObjectTypeDataElementsJoiner((HollowObjectTypeDataElements[]) from);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,30 @@
import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard;
import com.netflix.hollow.core.read.engine.ShardsHolder;

public class ObjectTypeShardsHolder extends ShardsHolder {
public class HollowObjectTypeShardsHolder extends ShardsHolder {
final HollowObjectTypeReadStateShard shards[];
final int shardNumberMask;

public ObjectTypeShardsHolder(HollowTypeReadStateShard[] fromShards) {
/**
* Thread safe construction of ShardHolder with given shards
* @param fromShards shards to be used
*/
public HollowObjectTypeShardsHolder(HollowTypeReadStateShard[] fromShards) {
this.shards = new HollowObjectTypeReadStateShard[fromShards.length];
for (int i=0; i<fromShards.length; i++) {
this.shards[i] = (HollowObjectTypeReadStateShard) fromShards[i];
}
this.shardNumberMask = fromShards.length - 1;
}

// SNAP: TODO: javadoc: supports thread-safe construction
ObjectTypeShardsHolder(HollowObjectTypeReadStateShard[] oldShards, HollowObjectTypeReadStateShard newShard, int newShardIndex) {
/**
* Thread safe construction of a ShardHolder which has all the shards from {@code oldShards} except
* the shard at index {@code newShardIndex}, using the shard {@code newShard} at that index instead.
* @param oldShards original shards
* @param newShard a new shard
* @param newShardIndex index at which to place the new shard
*/
HollowObjectTypeShardsHolder(HollowObjectTypeReadStateShard[] oldShards, HollowObjectTypeReadStateShard newShard, int newShardIndex) {
int numShards = oldShards.length;
HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public HollowTypeDataElementsSplitter createDataElementsSplitter(HollowTypeDataE

@Override
public HollowTypeDataElementsJoiner createDataElementsJoiner(HollowTypeDataElements[] from) {
// SNAP: TODO: array casting?
return new HollowSetTypeDataElementsJoiner((HollowSetTypeDataElements[]) from);
}
}
Loading

0 comments on commit 882de97

Please sign in to comment.