Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: delta transitions in shared-memory mode #615

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1edaa13
Start adding delta support
Sunjeet May 20, 2023
9d98237
Continue adding delta support
Sunjeet May 20, 2023
0a73dd9
GapEncodedVariableLengthIntegerReader.java supports shared memory mode
Sunjeet May 20, 2023
798ad32
BlobByteBuffer supports putLong
Sunjeet May 20, 2023
67ee931
EncodedLongBuffer supports setElementValue
Sunjeet May 20, 2023
49429d4
EncodedLongBuffer supports testCopyBitRange
Sunjeet May 21, 2023
5dea324
EncodedLongBuffer supports increment/incrementMany
Sunjeet May 21, 2023
b3fe5e2
EncodedLongBuffer- add unit test for copy small bit range
Sunjeet May 21, 2023
359002d
EncodedLongBuffer implements clearElementValue
Sunjeet May 21, 2023
bf460ae
Run it up for simple data model
Sunjeet May 21, 2023
6fa9a07
Implement destroy for BlobByteBuffer, but dont invoke it yet
Sunjeet May 21, 2023
7d669a6
Refactor fixed length data provisioning
Sunjeet May 21, 2023
3446435
Cleanup before touching non object types
Sunjeet May 21, 2023
ae505e6
Support remaining fixed length types- list, set, map
Sunjeet May 21, 2023
55b655c
Shared memory mode delta transitions for variable length types
Sunjeet May 22, 2023
1bce619
Cleanup
Sunjeet May 22, 2023
770fac8
Bugfix for empty transition file being mmapped, other refactor/cleanup
Sunjeet Jun 10, 2023
46dfd6b
Gap encoded combined removed ordinals applicable conditionally- alway…
Sunjeet Jun 12, 2023
730cec9
Delta application performance- bulk copy fixed length data- but only …
Sunjeet Jun 13, 2023
a75f7a1
Delta target file- add schema name and shard num for diagnostic, disa…
Sunjeet Jun 17, 2023
f66b074
Take a stab at unmap/close
Sunjeet Jun 18, 2023
4a91f9b
Some minor fixes in file cleanup
Sunjeet Jun 18, 2023
37e9084
lifecycle staging files and cleanup
Sunjeet Jun 19, 2023
8770271
Delta transitions files under java.io.tmpdir
Sunjeet Jun 21, 2023
ab45e0d
Change unexpected read from exception to warn
Sunjeet Jun 21, 2023
ea9ef51
Fix for encoded long buffer setELementVAlue getElementValue for bytes…
Sunjeet Jun 22, 2023
5fabbd3
Trying removing explicit call to gc
Sunjeet Jun 22, 2023
013ca0f
Employ sun misc Cleaner for unmap - java8 only
Sunjeet Jun 22, 2023
5f91441
Fewer threads when computing history
Sunjeet Jun 23, 2023
6001a79
Patch allocated target file length
Sunjeet Jun 23, 2023
202cc43
Cleanup logs
Sunjeet Jun 23, 2023
ca84256
Temporarily disable cleaner
Sunjeet Jun 23, 2023
97a1030
Revert "Temporarily disable cleaner"
Sunjeet Jun 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Patch allocated target file length
Sunjeet committed Jun 23, 2023
commit 6001a795311bd2e50339567ec269a829d21f4474
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ public static FixedLengthData get(HollowBlobInput in, MemoryMode memoryMode, Arr
}
}

// allocate (for write)
// allocate (for write) // unused
public static FixedLengthData allocate(HollowBlobInput in,
MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler,
String fileName) throws IOException {
@@ -39,13 +39,13 @@ public static FixedLengthData allocate(HollowBlobInput in,

public static FixedLengthData allocate(long numBits, MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler,
String fileName) throws IOException {
long numLongs = ((numBits - 1) >>> 6) + 1;
numLongs ++; // accommodate for reading a long starting at bit index within numLongs-1
long numBytes = numLongs << 3;
if (memoryMode.equals(MemoryMode.ON_HEAP)) {
return new FixedLengthElementArray(memoryRecycler, numBits);
} else {
File targetFile = provisionTargetFile(numBytes, fileName);
long numLongs = ((numBits - 1) >>> 6) + 1;
long numBytes = numLongs << 3;
// add Long.BYTES to provisioned file size to accommodate unaligned read starting offset in last long
File targetFile = provisionTargetFile(numBytes + Long.BYTES, fileName);
try (HollowBlobInput targetBlob = HollowBlobInput.randomAccess(targetFile, MAX_SINGLE_BUFFER_CAPACITY)) {
return EncodedLongBuffer.newFrom(targetBlob, numLongs, targetFile); // TODO: test with different single buffer capacities
}
@@ -65,7 +65,7 @@ public static void destroy(FixedLengthData fld, ArraySegmentRecycler memoryRecyc
if (fld instanceof FixedLengthElementArray) {
((FixedLengthElementArray) fld).destroy(memoryRecycler);
} else if (fld instanceof EncodedLongBuffer) {
LOG.info("SNAP: Destroy operation invoked on EncodedLongBuffer (FixedLengthData)");
// LOG.info("SNAP: Destroy operation invoked on EncodedLongBuffer (FixedLengthData)");
((EncodedLongBuffer) fld).destroy();
} else {
throw new UnsupportedOperationException("Unknown type");
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ public static void destroy(VariableLengthData vld) throws IOException {
if (vld instanceof SegmentedByteArray) {
((SegmentedByteArray) vld).destroy();
} else if (vld instanceof EncodedByteBuffer) {
LOG.info("SNAP: Destroy operation invoked on EncodedByteBuffer (VariableLengthData)");
// LOG.info("SNAP: Destroy operation invoked on EncodedByteBuffer (VariableLengthData)");
((EncodedByteBuffer) vld).destroy();
} else {
throw new UnsupportedOperationException("Unknown type");
@@ -87,6 +87,10 @@ public void orderedCopy(VariableLengthData src, long srcPos, long destPos, long
while (length > 0) {
int toReadBytes = (int) Math.min(length, (long) chunk.length);
int readBytes = encodedByteBuffer.getBytes(srcPos, toReadBytes, chunk);
if (readBytes == 0) {
throw new IllegalStateException(String.format("SNAP: 0 bytes read from encoded byte buffer, " +
"srcPos= %s, toReadBytes= %s, chunk.length=%s", srcPos, toReadBytes, chunk.length));
}
length = length - readBytes;
srcPos = srcPos + readBytes;

@@ -120,7 +124,7 @@ public VariableLengthData commit() throws IOException {
this.raf.seek(0);
try (HollowBlobInput hbi = HollowBlobInput.mmap(this.file, this.raf, MAX_SINGLE_BUFFER_CAPACITY, false)) {
byteBuffer.loadFrom(hbi, this.raf.length());
LOG.info("SNAP: Closing randomaccessfile because HollowBlobInput does not manage the lifecycle (will not close) for " + file);
// LOG.info("SNAP: Closing randomaccessfile because HollowBlobInput does not manage the lifecycle (will not close) for " + file);
this.raf.close();
return byteBuffer;
}
Original file line number Diff line number Diff line change
@@ -155,7 +155,7 @@ public byte getByte(long index) throws BufferUnderflowException {
}
else {
assert(index < capacity + Long.BYTES);
LOG.warning("SNAP: This is happening, not necessarily bad but test using unit test readUsingVariableLengthDataModes");
// LOG.warning("SNAP: This is happening, not necessarily bad but test using unit test readUsingVariableLengthDataModes");
// this situation occurs when read for bits near the end of the buffer requires reading a long value that
// extends past the buffer capacity by upto Long.BYTES bytes. To handle this case,
// return 0 for (index >= capacity - Long.BYTES && index < capacity )
@@ -171,7 +171,7 @@ public int getBytes(long index, long len, byte[] bytes, boolean restorePos) {
// extends past the buffer capacity by upto Long.BYTES bytes. To handle this case,
// return 0 for (index >= capacity - Long.BYTES && index < capacity )
// these zero bytes will be discarded anyway when the returned long value is shifted to get the queried bits
LOG.warning(String.format("Unexpected read past the end, index=%s, capacity=%s", index, capacity));
LOG.warning(String.format("Unexpected read past the end, index=%s, capacity=%s, len=%s", index, capacity, len));
}
int spineIndex = (int)(index >>> (shift));
ByteBuffer buf = spine[spineIndex];
@@ -293,7 +293,7 @@ public void unmapBlob() {
// count will sustain it from getting cleaned up, but cleanup will be promptly invoked on delta blob files after
// consumption and on per-shard per-type delta target files when it is superseded by another file in a future delta.
if (this.referenceCount.decrementAndGet() == 0) {
LOG.info("SNAP: Unmapping BlobByteBuffer because ref count has reached 0");
// LOG.info("SNAP: Unmapping BlobByteBuffer because ref count has reached 0");
for (int i = 0; i < spine.length; i++) {
ByteBuffer buf = spine[i];
if (buf != null) {
Original file line number Diff line number Diff line change
@@ -306,12 +306,12 @@ public long skipBytes(long n) throws IOException {
@Override
public void close() throws IOException {
if (input instanceof RandomAccessFile) {
LOG.info("SNAP: close called on BlobByteBuffer composing instance of RandomAccessFile");
// LOG.info("SNAP: close called on BlobByteBuffer composing instance of RandomAccessFile");
if (manageRafLifecycle) {
LOG.info("SNAP: HollowBlobInput manages the lifecycle of randomaccessfile " + file + ". Calling close.");
// LOG.info("SNAP: HollowBlobInput manages the lifecycle of randomaccessfile " + file + ". Calling close.");
((RandomAccessFile) input).close();
} else {
LOG.info("SNAP: HollowBlobInput does not manage the lifecycle (will not close) of randomaccessfile " + file + ". Won't close file.");
// LOG.info("SNAP: HollowBlobInput does not manage the lifecycle (will not close) of randomaccessfile " + file + ". Won't close file.");
}
if (buffer != null) {
buffer.unmapBlob();
Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@
import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener;
import com.netflix.hollow.core.util.IntMap;
import com.netflix.hollow.core.util.RemovedOrdinalIterator;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* This class contains the logic for extracting the removed records from an OBJECT type state
@@ -32,6 +34,7 @@
* Not intended for external consumption.
*/
public class HollowObjectDeltaHistoricalStateCreator {
private static final Logger LOG = Logger.getLogger(HollowObjectDeltaHistoricalStateCreator.class.getName());

private final HollowObjectTypeDataElements historicalDataElements;

@@ -159,8 +162,28 @@ private void copyRecord(int ordinal) {
long size = fromEndByte - fromStartByte;

historicalDataElements.fixedLengthData.setElementValue(((long)nextOrdinal * historicalDataElements.bitsPerRecord) + historicalDataElements.bitOffsetPerField[i], historicalDataElements.bitsPerField[i], currentWriteVarLengthDataPointers[i] + size);
historicalDataElements.varLengthData[i].copy(stateEngineDataElements[shard].varLengthData[i], fromStartByte, currentWriteVarLengthDataPointers[i], size);

try {
historicalDataElements.varLengthData[i].copy(stateEngineDataElements[shard].varLengthData[i], fromStartByte, currentWriteVarLengthDataPointers[i], size);
} catch (ArrayIndexOutOfBoundsException e) {
LOG.log(Level.SEVERE,
String.format("ArrayIndexOutOfBoundsException when building historical state: " +
"fieldName=%s, " +
"fieldType=%s, " +
"shard=%s, " +
"stateEngineDataElements[shard].varLengthData[i].length()=%s, " +
"fromStartByte=%s, " +
"size=%s, " +
"currentWriteVarLengthDataPointers[i]=%s, ",
historicalDataElements.schema.getFieldName(i),
historicalDataElements.schema.getFieldType(i),
shard,
stateEngineDataElements[shard].varLengthData[i].length(),
fromStartByte,
size,
currentWriteVarLengthDataPointers[i]),
e);
throw e;
}
currentWriteVarLengthDataPointers[i] += size;
}
}