Skip to content

Commit

Permalink
Resharding logic refactor, move out of read state and into resharding…
Browse files Browse the repository at this point in the history
… strategy
  • Loading branch information
Sunjeet committed Sep 4, 2024
1 parent d4df8da commit 3d0e7f0
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException {
int numShards = readNumShards(in);
HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(typeState);
if (reshardingStrategy.shouldReshard(typeState.numShards(), numShards)) {
reshardingStrategy.reshard(typeState, numShards);
}
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards);
} else {
discardDelta(in, schema, numShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
*/
public abstract class HollowCollectionTypeReadState extends HollowTypeReadState implements HollowCollectionTypeDataAccess {

public HollowCollectionTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowSchema schema, HollowTypeReshardingStrategy reshardingStrategy) {
super(stateEngine, memoryMode, schema, reshardingStrategy);
public HollowCollectionTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowSchema schema) {
super(stateEngine, memoryMode, schema);
}

public abstract int size(int ordinal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@ public abstract class HollowTypeReadState implements HollowTypeDataAccess {
protected final HollowSchema schema;
protected HollowTypeStateListener[] stateListeners;

private final HollowTypeReshardingStrategy reshardingStrategy;

public HollowTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowSchema schema, HollowTypeReshardingStrategy reshardingStrategy) {
public HollowTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowSchema schema) {
this.stateEngine = stateEngine;
this.memoryMode = memoryMode;
this.schema = schema;
this.stateListeners = EMPTY_LISTENERS;
this.reshardingStrategy = reshardingStrategy;
}

/**
Expand Down Expand Up @@ -213,165 +210,7 @@ public long getApproximateShardSizeInBytes() {

public abstract HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift);

protected boolean shouldReshard(int currNumShards, int deltaNumShards) {
return currNumShards!=0 && deltaNumShards!=0 && currNumShards!=deltaNumShards;
}

/**
* Reshards this type state to the desired shard count using O(shard size) space while supporting concurrent reads
* into the underlying data elements.
*
* @param newNumShards The desired number of shards
*/
public void reshard(int newNumShards) {
int prevNumShards = getShardsVolatile().getShards().length; // SNAP: TODO: or .shards.length
int shardingFactor = shardingFactor(prevNumShards, newNumShards);
HollowTypeDataElements[] newDataElements;
int[] shardOrdinalShifts;

try {
if (newNumShards > prevNumShards) { // split existing shards
// Step 1: Grow the number of shards. Each original shard will result in N child shards where N is the sharding factor.
// The child shards will reference into the existing data elements as-is, and reuse existing shardOrdinalShift.
// However since the shards array is resized, a read will map into the new shard index, as a result a subset of
// ordinals in each shard will be accessed. In the next "splitting" step, the data elements in these new shards
// will be filtered to only retain the subset of ordinals that are actually accessed.
//
// This is an atomic update to shardsVolatile: full construction happens-before the store to shardsVolatile,
// in other words a fully constructed object as visible to this thread will be visible to other threads that
// load the new shardsVolatile.
updateShardsVolatile(expandWithOriginalDataElements(getShardsVolatile(), shardingFactor));

// Step 2: Split each original data element into N child data elements where N is the sharding factor.
// Then update each of the N child shards with the respective split of data element, this will be
// sufficient to serve all reads into this shard. Once all child shards for a pre-split parent
// shard have been assigned the split data elements, the parent data elements can be discarded.
for (int i = 0; i < prevNumShards; i++) {
HollowTypeDataElements originalDataElements = getShardsVolatile().getShards()[i].getDataElements();

updateShardsVolatile(splitDataElementsForOneShard(getShardsVolatile(), i, prevNumShards, shardingFactor));

destroyOriginalDataElements(originalDataElements);
}
// Re-sharding done.
// shardsVolatile now contains newNumShards shards where each shard contains
// a split of original data elements.

} else { // join existing shards
// Step 1: Join N data elements to create one, where N is the sharding factor. Then update each of the
// N shards to reference the joined result, but with a new shardOrdinalShift.
// Reads will continue to reference the same shard index as before, but the new shardOrdinalShift
// will help these reads land at the right ordinal in the joined shard. When all N old shards
// corresponding to one new shard have been updated, the N pre-join data elements can be destroyed.
for (int i = 0; i < newNumShards; i++) {
HollowTypeDataElements destroyCandidates[] = joinCandidates(getShardsVolatile().getShards(), i, shardingFactor);

updateShardsVolatile(joinDataElementsForOneShard(getShardsVolatile(), i, shardingFactor)); // atomic update to shardsVolatile

for (int j = 0; j < shardingFactor; j++) {
destroyOriginalDataElements(destroyCandidates[j]);
}
}

// Step 2: Resize the shards array to only keep the first newNumShards shards.
newDataElements = createTypeDataElements(getShardsVolatile().getShards().length);
shardOrdinalShifts = new int[getShardsVolatile().getShards().length];
copyShardDataElements(getShardsVolatile(), newDataElements, shardOrdinalShifts);

HollowTypeReadStateShard[] newShards = Arrays.copyOfRange(getShardsVolatile().getShards(), 0, newNumShards);
updateShardsVolatile(newShards);

// Re-sharding done.
// shardsVolatile now contains newNumShards shards where each shard contains
// a join of original data elements.
}
} catch (Exception e) {
throw new RuntimeException("Error in re-sharding", e);
}
}

/**
* Given old and new numShards, this method returns the shard resizing multiplier.
*/
public static int shardingFactor(int oldNumShards, int newNumShards) {
if (newNumShards <= 0 || oldNumShards <= 0 || newNumShards == oldNumShards) {
throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards);
}

boolean isNewGreater = newNumShards > oldNumShards;
int dividend = isNewGreater ? newNumShards : oldNumShards;
int divisor = isNewGreater ? oldNumShards : newNumShards;

if (dividend % divisor != 0) {
throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards);
}
return dividend / divisor;
}

private void copyShardDataElements(ShardsHolder from, HollowTypeDataElements[] newDataElements, int[] shardOrdinalShifts) {
for (int i=0; i<from.getShards().length; i++) {
newDataElements[i] = from.getShards()[i].getDataElements();
shardOrdinalShifts[i] = from.getShards()[i].getShardOrdinalShift();
}
}

private HollowTypeDataElements[] joinCandidates(HollowTypeReadStateShard[] shards, int indexIntoShards, int shardingFactor) {
HollowTypeDataElements[] result = createTypeDataElements(shardingFactor);
int newNumShards = shards.length / shardingFactor;
for (int i=0; i<shardingFactor; i++) {
result[i] = shards[indexIntoShards + (newNumShards*i)].getDataElements();
}
return result;
}

public HollowTypeReadStateShard[] joinDataElementsForOneShard(ShardsHolder shardsHolder, int currentIndex, int shardingFactor) {
int newNumShards = shardsHolder.getShards().length / shardingFactor;
int newShardOrdinalShift = 31 - Integer.numberOfLeadingZeros(newNumShards);

HollowTypeDataElements[] joinCandidates = joinCandidates(shardsHolder.getShards(), currentIndex, shardingFactor);

// SNAP: TODO: a better way
HollowTypeDataElementsJoiner joiner = reshardingStrategy.createDataElementsJoiner(joinCandidates);
HollowTypeDataElements joined = joiner.join();

HollowTypeReadStateShard[] newShards = Arrays.copyOf(shardsHolder.getShards(), shardsHolder.getShards().length);
for (int i=0; i<shardingFactor; i++) {
newShards[currentIndex + (newNumShards*i)] = createTypeReadStateShard(schema, joined, newShardOrdinalShift);
}

return newShards;
}

public HollowTypeReadStateShard[] expandWithOriginalDataElements(ShardsHolder shardsHolder, int shardingFactor) {
int prevNumShards = shardsHolder.getShards().length;
int newNumShards = prevNumShards * shardingFactor;
HollowTypeReadStateShard[] newShards = new HollowTypeReadStateShard[newNumShards];

for(int i=0; i<prevNumShards; i++) {
for (int j=0; j<shardingFactor; j++) {
newShards[i+(prevNumShards*j)] = shardsHolder.getShards()[i];
}
}
return newShards;
// return shardsHolder.getClass().getConstructor(HollowTypeReadStateShard[].class).newInstance((Object) newShards);
}

public HollowTypeReadStateShard[] splitDataElementsForOneShard(ShardsHolder shardsHolder, int currentIndex, int prevNumShards, int shardingFactor) {
int newNumShards = shardsHolder.getShards().length;
int newShardOrdinalShift = 31 - Integer.numberOfLeadingZeros(newNumShards);

HollowTypeDataElements dataElementsToSplit = shardsHolder.getShards()[currentIndex].getDataElements();
HollowTypeDataElementsSplitter splitter = reshardingStrategy.createDataElementsSplitter(dataElementsToSplit, shardingFactor);
HollowTypeDataElements[] splits = splitter.split();

HollowTypeReadStateShard[] newShards = Arrays.copyOf(shardsHolder.getShards(), shardsHolder.getShards().length);
for (int i = 0; i < shardingFactor; i ++) {
newShards[currentIndex + (prevNumShards*i)] = createTypeReadStateShard(schema, splits[i], newShardOrdinalShift);
}
return newShards;
}

private void destroyOriginalDataElements(HollowTypeDataElements dataElements) {
public void destroyOriginalDataElements(HollowTypeDataElements dataElements) {
dataElements.destroy();
if (dataElements.encodedRemovals != null) {
dataElements.encodedRemovals.destroy();
Expand Down
Loading

0 comments on commit 3d0e7f0

Please sign in to comment.