Skip to content

Commit

Permalink
fix: cleanup the current batch after flush crashed (#824)
Browse files Browse the repository at this point in the history
### Motivation

In the existing logic, whether the flush operation fails or succeeds, the variables `currentBatchSize` and `currentBatchBytes` will be cleared. However, if an error such as an Out of Memory (OOM) exception occurs during the flush operation, it will be caught here: 
https://github.com/streamnative/pulsar-io-cloud-storage/blob/06d8c33afa7d813ffc2019ed4331a3883f9f9044/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java#L217-L221

Nonetheless, the batch might not be cleared properly. This could result in an empty queue, despite `currentBatchSize` and `currentBatchBytes` not being reset to 0.

### Modifications

- The batch is now cleared before the actual flush operation takes place.
  • Loading branch information
RobertIndie authored Dec 6, 2023
1 parent 7418e11 commit f2a2a2f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public abstract class BlobStoreAbstractSink<V extends BlobStoreAbstractConfig> i

private long maxBatchSize;
private long maxBatchBytes;
private final AtomicLong currentBatchSize = new AtomicLong(0L);
private final AtomicLong currentBatchBytes = new AtomicLong(0L);
final AtomicLong currentBatchSize = new AtomicLong(0L);
final AtomicLong currentBatchBytes = new AtomicLong(0L);
private ArrayBlockingQueue<Record<GenericRecord>> pendingFlushQueue;
private final AtomicBoolean isFlushRunning = new AtomicBoolean(false);
private SinkContext sinkContext;
Expand Down Expand Up @@ -198,7 +198,7 @@ public void write(Record<GenericRecord> record) throws Exception {

private void flush() {
if (log.isDebugEnabled()) {
log.debug("flush requested, pending: {} ({} bytes}, batchSize: {}, maxBatchBytes: {}",
log.debug("flush requested, pending: {} ({} bytes), batchSize: {}, maxBatchBytes: {}",
currentBatchSize.get(), currentBatchBytes.get(), maxBatchSize, maxBatchBytes);
}

Expand Down Expand Up @@ -236,6 +236,8 @@ private void unsafeFlush() {
recordsToInsert.add(r);
}
}
currentBatchBytes.addAndGet(-1 * recordsToInsertBytes);
currentBatchSize.addAndGet(-1 * recordsToInsert.size());
log.info("Flushing {} buffered records to blob store", recordsToInsert.size());
if (log.isDebugEnabled()) {
log.debug("buffered records {}", recordsToInsert);
Expand Down Expand Up @@ -282,8 +284,6 @@ private void unsafeFlush() {
elapsedMs = System.currentTimeMillis() - elapsedMs;
log.debug("Uploading blob {} elapsed time in ms: {}", filepath, elapsedMs);
singleTopicRecordsToInsert.forEach(Record::ack);
currentBatchBytes.addAndGet(-1 * uploadBytes);
currentBatchSize.addAndGet(-1 * uploadSize);
if (sinkContext != null) {
sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, singleTopicRecordsToInsert.size());
sinkContext.recordMetric(METRICS_LATEST_UPLOAD_ELAPSED_TIME, elapsedMs);
Expand Down Expand Up @@ -312,8 +312,6 @@ private void bulkHandleFailedRecords(List<Record<GenericRecord>> failedRecords)
} else {
failedRecords.forEach(Record::fail);
}
currentBatchBytes.addAndGet(-1 * getBytesSum(failedRecords));
currentBatchSize.addAndGet(-1 * failedRecords.size());
if (sinkContext != null) {
sinkContext.recordMetric(METRICS_TOTAL_FAILURE, failedRecords.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.io.jcloud.partitioner.Partitioner;
import org.apache.pulsar.io.jcloud.writer.BlobWriter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand Down Expand Up @@ -160,6 +161,24 @@ public void repeatedlyFlushOnMaxBatchBytesTest() throws Exception {
verifyRecordAck(100);
}

@Test
public void testBatchCleanupWhenFlushCrashed() throws Exception {
this.config.put("pendingQueueSize", 1000);
this.config.put("batchTimeMs", 1000);
this.config.put("maxBatchBytes", 5 * PAYLOAD_BYTES);
this.config.put("batchSize", 1);

this.sink.open(this.config, this.mockSinkContext);
when(mockRecord.getSchema()).thenThrow(new OutOfMemoryError());
sendMockRecord(1);
await().atMost(Duration.ofSeconds(10)).untilAsserted(
() -> {
Assert.assertEquals(0, this.sink.currentBatchBytes.get());
Assert.assertEquals(0, this.sink.currentBatchSize.get());
}
);
}

private void verifyRecordAck(int numberOfRecords) throws Exception {
this.sink.open(this.config, this.mockSinkContext);
sendMockRecord(numberOfRecords);
Expand Down

0 comments on commit f2a2a2f

Please sign in to comment.