Skip to content

Commit

Permalink
Log allocates and more data.
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Oct 10, 2023
1 parent e39d6e2 commit 8605063
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public long getCurrentMemoryBytes() {
*
* @return the size of the allocated block, in bytes
*/
public synchronized long requestMemory() {
public synchronized long requestMemory(final String name) {
// todo(davin): what happens if the incoming record is larger than 30MB?
if (currentMemoryBytes.get() >= maxMemoryBytes) {
return 0L;
Expand All @@ -72,10 +72,11 @@ public synchronized long requestMemory() {
final var toAllocateBytes = Math.min(freeMem, BLOCK_SIZE_BYTES);
currentMemoryBytes.addAndGet(toAllocateBytes);

log.debug("Memory Requested: max: {}, allocated: {}, allocated in this request: {}",
log.info("Memory Requested: max: {}, allocated: {}, allocated in this request: {}, for {}",
FileUtils.byteCountToDisplaySize(maxMemoryBytes),
FileUtils.byteCountToDisplaySize(currentMemoryBytes.get()),
FileUtils.byteCountToDisplaySize(toAllocateBytes));
FileUtils.byteCountToDisplaySize(toAllocateBytes),
name);
return toAllocateBytes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ public void addRecord(final PartialAirbyteMessage message, final Integer sizeInB

private void handleRecord(final PartialAirbyteMessage message, final Integer sizeInBytes) {
final StreamDescriptor streamDescriptor = extractStateFromRecord(message);
final var queue = buffers.computeIfAbsent(streamDescriptor, _k -> new StreamAwareQueue(memoryManager.requestMemory()));
final var streamName = streamDescriptor.getNamespace() == null ? streamDescriptor.getName() : streamDescriptor.getNamespace() + " - " + streamDescriptor.getName();
final var queue = buffers.computeIfAbsent(streamDescriptor, _k -> new StreamAwareQueue(memoryManager.requestMemory(streamName + " init")));
final long stateId = stateManager.getStateIdAndIncrementCounter(streamDescriptor);

var addedToQueue = queue.offer(message, sizeInBytes, stateId);


int i = 0;
while (!addedToQueue) {
final var newlyAllocatedMemory = memoryManager.requestMemory();
final var newlyAllocatedMemory = memoryManager.requestMemory(streamName + " record");
if (newlyAllocatedMemory > 0) {
queue.addMaxMemory(newlyAllocatedMemory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ private void printQueueInfo() {
for (final var entry : buffers.entrySet()) {
final var queue = entry.getValue();
queueInfo.append(
String.format(" Queue name: %s, num records: %d, num bytes: %s",
entry.getKey().getName(), queue.size(), AirbyteFileUtils.byteCountToDisplaySize(queue.getCurrentMemoryUsage())))
String.format(" Queue name: %s, num records: %d, num bytes: %s, allocated bytes: %s",
entry.getKey().getName(), queue.size(), AirbyteFileUtils.byteCountToDisplaySize(queue.getCurrentMemoryUsage()),
AirbyteFileUtils.byteCountToDisplaySize(queue.getMaxMemoryUsage())))
.append(System.lineSeparator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class GlobalAsyncStateManager {

public GlobalAsyncStateManager(final GlobalMemoryManager memoryManager) {
this.memoryManager = memoryManager;
memoryAllocated = new AtomicLong(memoryManager.requestMemory());
memoryAllocated = new AtomicLong(memoryManager.requestMemory("init"));
memoryUsed = new AtomicLong();
}

Expand Down Expand Up @@ -286,7 +286,7 @@ private void closeState(final PartialAirbyteMessage message, final long sizeInBy
private void allocateMemoryToState(final long sizeInBytes) {
if (memoryAllocated.get() < memoryUsed.get() + sizeInBytes) {
while (memoryAllocated.get() < memoryUsed.get() + sizeInBytes) {
memoryAllocated.addAndGet(memoryManager.requestMemory());
memoryAllocated.addAndGet(memoryManager.requestMemory("state"));
try {
LOGGER.debug("Insufficient memory to store state message. Allocated: {}, Used: {}, Size of State Msg: {}, Needed: {}",
FileUtils.byteCountToDisplaySize(memoryAllocated.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ public class GlobalMemoryManagerTest {
void test() {
final GlobalMemoryManager mgr = new GlobalMemoryManager(15 * BYTES_MB);

assertEquals(10 * BYTES_MB, mgr.requestMemory());
assertEquals(5 * BYTES_MB, mgr.requestMemory());
assertEquals(0, mgr.requestMemory());
assertEquals(10 * BYTES_MB, mgr.requestMemory(""));
assertEquals(5 * BYTES_MB, mgr.requestMemory(""));
assertEquals(0, mgr.requestMemory(""));

mgr.free(10 * BYTES_MB);
assertEquals(10 * BYTES_MB, mgr.requestMemory());
assertEquals(10 * BYTES_MB, mgr.requestMemory(""));
mgr.free(16 * BYTES_MB);
assertEquals(10 * BYTES_MB, mgr.requestMemory());
assertEquals(10 * BYTES_MB, mgr.requestMemory(""));
}

}

0 comments on commit 8605063

Please sign in to comment.