Skip to content

Commit

Permalink
Refactor data elements and splitter for polymorphism
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Aug 17, 2024
1 parent a1d11de commit fc480a3
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 124 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;

public abstract class AbstractHollowTypeDataElements {

public int maxOrdinal;
public GapEncodedVariableLengthIntegerReader encodedAdditions;
public GapEncodedVariableLengthIntegerReader encodedRemovals;
public final ArraySegmentRecycler memoryRecycler;
public final MemoryMode memoryMode;

public AbstractHollowTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;

public abstract class AbstractHollowTypeDataElementsSplitter {
public final int numSplits;
public final int toMask;
public final int toOrdinalShift;
public final AbstractHollowTypeDataElements from;

public AbstractHollowTypeDataElements[] to;

public AbstractHollowTypeDataElementsSplitter(AbstractHollowTypeDataElements from, int numSplits) {
this.from = from;
this.numSplits = numSplits;
this.toMask = numSplits - 1;
this.toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);

if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
throw new IllegalStateException("Must split by power of 2");
}

if (from.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements splitter- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
}

public AbstractHollowTypeDataElements[] split() {
for(int i=0;i<to.length;i++) {
to[i].maxOrdinal = -1;
}
populateStats();

copyRecords();

if (from.encodedRemovals != null) {
GapEncodedVariableLengthIntegerReader[] splitRemovals = from.encodedRemovals.split(numSplits);
for(int i=0;i<to.length;i++) {
to[i].encodedRemovals = splitRemovals[i];
}
}

return to;
}

public abstract void populateStats();

public abstract void copyRecords();


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.hollow.core.memory.encoding.VarInt;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElements;
import java.io.IOException;

/**
Expand All @@ -31,30 +32,21 @@
* During a delta, the HollowListTypeReadState will create a new HollowListTypeDataElements and atomically swap
* with the existing one to make sure a consistent view of the data is always available.
*/
public class HollowListTypeDataElements {

int maxOrdinal;
public class HollowListTypeDataElements extends AbstractHollowTypeDataElements {

FixedLengthData listPointerData;
FixedLengthData elementData;

GapEncodedVariableLengthIntegerReader encodedAdditions;
GapEncodedVariableLengthIntegerReader encodedRemovals;

int bitsPerListPointer;
int bitsPerElement;
long totalNumberOfElements = 0;

final ArraySegmentRecycler memoryRecycler;
final MemoryMode memoryMode;

public HollowListTypeDataElements(ArraySegmentRecycler memoryRecycler) {
this(MemoryMode.ON_HEAP, memoryRecycler);
}

public HollowListTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
super(memoryMode, memoryRecycler);
}

void readSnapshot(HollowBlobInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,29 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsSplitter;

/**
* Split a {@code HollowListTypeDataElements} into multiple {@code HollowListTypeDataElements}s.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* {@code numSplits} must be a power of 2.
*/
public class HollowListTypeDataElementsSplitter {
public class HollowListTypeDataElementsSplitter extends AbstractHollowTypeDataElementsSplitter {

HollowListTypeDataElements[] split(HollowListTypeDataElements from, int numSplits) {
final int toMask = numSplits - 1;
final int toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);

if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
throw new IllegalStateException("Must split by power of 2");
}

HollowListTypeDataElements[] to = new HollowListTypeDataElements[numSplits];
public HollowListTypeDataElementsSplitter(HollowListTypeDataElements from, int numSplits) {
super(from, numSplits);
this.to = new HollowListTypeDataElements[numSplits];
for(int i=0;i<to.length;i++) {
to[i] = new HollowListTypeDataElements(from.memoryMode, from.memoryRecycler);
to[i].maxOrdinal = -1;
}

if (from.encodedRemovals != null) {
GapEncodedVariableLengthIntegerReader[] splitRemovals = from.encodedRemovals.split(numSplits);
for(int i=0;i<to.length;i++) {
to[i].encodedRemovals = splitRemovals[i];
}
}
if (from.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements splitter- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}

populateStats(to, from, toMask, toOrdinalShift);

copyRecords(to, from, toMask, toOrdinalShift);

return to;
}

private void populateStats(HollowListTypeDataElements[] to, HollowListTypeDataElements from, int toMask, int toOrdinalShift) {
@Override
public void populateStats() {
HollowListTypeDataElements[] to = (HollowListTypeDataElements[])this.to;
HollowListTypeDataElements from = (HollowListTypeDataElements) this.from;

int numSplits = to.length;
long[] totalOfListSizes = new long[numSplits];

Expand Down Expand Up @@ -87,7 +66,11 @@ private void populateStats(HollowListTypeDataElements[] to, HollowListTypeDataEl
}
}

private void copyRecords(HollowListTypeDataElements[] to, HollowListTypeDataElements from, int toMask, int toOrdinalShift) {
@Override
public void copyRecords() {
HollowListTypeDataElements[] to = (HollowListTypeDataElements[])this.to;
HollowListTypeDataElements from = (HollowListTypeDataElements) this.from;

int numSplits = to.length;
long elementCounter[] = new long[numSplits];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.hollow.core.memory.encoding.VarInt;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElements;
import java.io.IOException;

/**
Expand All @@ -31,16 +32,11 @@
* During a delta, the HollowMapTypeReadState will create a new HollowMapTypeDataElements and atomically swap
* with the existing one to make sure a consistent view of the data is always available.
*/
public class HollowMapTypeDataElements {

int maxOrdinal;
public class HollowMapTypeDataElements extends AbstractHollowTypeDataElements {

FixedLengthData mapPointerAndSizeData;
FixedLengthData entryData;

GapEncodedVariableLengthIntegerReader encodedRemovals;
GapEncodedVariableLengthIntegerReader encodedAdditions;

int bitsPerMapPointer;
int bitsPerMapSizeValue;
int bitsPerFixedLengthMapPortion;
Expand All @@ -50,16 +46,12 @@ public class HollowMapTypeDataElements {
int emptyBucketKeyValue;
long totalNumberOfBuckets;

final ArraySegmentRecycler memoryRecycler;
final MemoryMode memoryMode;

public HollowMapTypeDataElements(ArraySegmentRecycler memoryRecycler) {
this(MemoryMode.ON_HEAP, memoryRecycler);
}

public HollowMapTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
super(memoryMode, memoryRecycler);
}

void readSnapshot(HollowBlobInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.netflix.hollow.core.memory.encoding.VarInt;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElements;
import com.netflix.hollow.core.schema.HollowObjectSchema;
import java.io.IOException;

Expand All @@ -35,18 +36,13 @@
* During a delta, the HollowObjectTypeReadState will create a new HollowObjectTypeDataElements and atomically swap
* with the existing one to make sure a consistent view of the data is always available.
*/
public class HollowObjectTypeDataElements {
public class HollowObjectTypeDataElements extends AbstractHollowTypeDataElements {

final HollowObjectSchema schema;

int maxOrdinal;
public final HollowObjectSchema schema;

FixedLengthData fixedLengthData;
final VariableLengthData varLengthData[];

GapEncodedVariableLengthIntegerReader encodedAdditions;
GapEncodedVariableLengthIntegerReader encodedRemovals;

final int bitsPerField[];
final int bitOffsetPerField[];
final long nullValueForField[];
Expand All @@ -55,21 +51,17 @@ public class HollowObjectTypeDataElements {
private int bitsPerUnfilteredField[];
private boolean unfilteredFieldIsIncluded[];

final ArraySegmentRecycler memoryRecycler;
final MemoryMode memoryMode;

public HollowObjectTypeDataElements(HollowObjectSchema schema, ArraySegmentRecycler memoryRecycler) {
this(schema, MemoryMode.ON_HEAP, memoryRecycler);
}

public HollowObjectTypeDataElements(HollowObjectSchema schema, MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
super(memoryMode, memoryRecycler);
varLengthData = new VariableLengthData[schema.numFields()];
bitsPerField = new int[schema.numFields()];
bitOffsetPerField = new int[schema.numFields()];
nullValueForField = new long[schema.numFields()];
this.schema = schema;
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
}

void readSnapshot(HollowBlobInput in, HollowObjectSchema unfilteredSchema) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,31 @@

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.memory.VariableLengthDataFactory;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElements;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsSplitter;

/**
* Split a {@code HollowObjectTypeDataElements} into multiple {@code HollowObjectTypeDataElements}s.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* // SNAP: TODO: single-shot, split() not expected to be called repeatedly
* {@code numSplits} must be a power of 2.
*/
public class HollowObjectTypeDataElementsSplitter {
public class HollowObjectTypeDataElementsSplitter extends AbstractHollowTypeDataElementsSplitter {

HollowObjectTypeDataElements[] split(HollowObjectTypeDataElements from, int numSplits) {
final int toMask = numSplits - 1;
final int toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);
final long[][] currentWriteVarLengthDataPointers;

if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
throw new IllegalStateException("Must split by power of 2");
}

HollowObjectTypeDataElements[] to = new HollowObjectTypeDataElements[numSplits];
HollowObjectTypeDataElementsSplitter(HollowObjectTypeDataElements from, int numSplits) {
super(from, numSplits);
this.to = new HollowObjectTypeDataElements[numSplits];
for(int i=0;i<to.length;i++) {
to[i] = new HollowObjectTypeDataElements(from.schema, from.memoryMode, from.memoryRecycler);
to[i].maxOrdinal = -1;
}
currentWriteVarLengthDataPointers = new long[numSplits][from.schema.numFields()];

populateStats(to, from, toMask, toOrdinalShift);

if (from.encodedRemovals != null) {
GapEncodedVariableLengthIntegerReader[] splitRemovals = from.encodedRemovals.split(numSplits);
for(int i=0;i<to.length;i++) {
to[i].encodedRemovals = splitRemovals[i];
}
}
if (from.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements splitter- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}

for(int i=0;i<to.length;i++) {
to[i].fixedLengthData = FixedLengthDataFactory.get((long)to[i].bitsPerRecord * (to[i].maxOrdinal + 1), to[i].memoryMode, to[i].memoryRecycler);
for(int fieldIdx=0;fieldIdx<from.schema.numFields();fieldIdx++) {
if(from.varLengthData[fieldIdx] != null) {
to[i].varLengthData[fieldIdx] = VariableLengthDataFactory.get(from.memoryMode, from.memoryRecycler);
}
}
}

for(int i=0;i<=from.maxOrdinal;i++) {
int toIndex = i & toMask;
int toOrdinal = i >> toOrdinalShift;
copyRecord(to[toIndex], toOrdinal, from, i, currentWriteVarLengthDataPointers[toIndex]);
}
return to;
}

private void populateStats(HollowObjectTypeDataElements[] to, HollowObjectTypeDataElements from, int toMask, int toOrdinalShift) {
@Override
public void populateStats() {
HollowObjectTypeDataElements[] to = (HollowObjectTypeDataElements[])this.to;
HollowObjectTypeDataElements from = (HollowObjectTypeDataElements) this.from;

long[][] varLengthSizes = new long[to.length][from.schema.numFields()];

for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) {
Expand Down Expand Up @@ -93,4 +60,27 @@ private void populateStats(HollowObjectTypeDataElements[] to, HollowObjectTypeDa
}
}
}

@Override
public void copyRecords() {
HollowObjectTypeDataElements[] to = (HollowObjectTypeDataElements[])this.to;
HollowObjectTypeDataElements from = (HollowObjectTypeDataElements) this.from;

final long[][] currentWriteVarLengthDataPointers = new long[to.length][from.schema.numFields()];

for(int i=0;i<to.length;i++) {
to[i].fixedLengthData = FixedLengthDataFactory.get((long)to[i].bitsPerRecord * (to[i].maxOrdinal + 1), to[i].memoryMode, to[i].memoryRecycler);
for(int fieldIdx=0;fieldIdx<from.schema.numFields();fieldIdx++) {
if(from.varLengthData[fieldIdx] != null) {
to[i].varLengthData[fieldIdx] = VariableLengthDataFactory.get(from.memoryMode, from.memoryRecycler);
}
}
}

for(int i=0;i<=from.maxOrdinal;i++) {
int toIndex = i & toMask;
int toOrdinal = i >> toOrdinalShift;
copyRecord(to[toIndex], toOrdinal, from, i, currentWriteVarLengthDataPointers[toIndex]);
}
}
}
Loading

0 comments on commit fc480a3

Please sign in to comment.