Skip to content

Commit

Permalink
feat: use batches and reduce the threads
Browse files Browse the repository at this point in the history
  • Loading branch information
MiguelAHM committed Nov 6, 2024
1 parent f96822c commit 2820409
Showing 1 changed file with 27 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
Expand All @@ -46,6 +46,8 @@ public class SnapshotImporter {

private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray();

private static final int BATCH_SIZE = 100;

private static final int BUFFER_SIZE = 4096;

public SnapshotImporter(final NrtmRestClient nrtmRestClient,
Expand Down Expand Up @@ -87,6 +89,7 @@ public void importSnapshot(final String source, final UpdateNotificationFileResp
final Timer timer = new Timer();
printProgress(timer, processedCount);

final ExecutorService executorService = Executors.newFixedThreadPool(3);
decompressAndProcessRecords(
payload,
firstRecord -> {
Expand All @@ -102,13 +105,17 @@ public void importSnapshot(final String source, final UpdateNotificationFileResp
}
LOGGER.info("Processed first record");
},
remainingRecord -> {
try {
processObject(remainingRecord);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
processedCount.incrementAndGet();
remainingRecords -> {
executorService.submit(() -> {
for (String record : remainingRecords){
try {
processObject(record);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
});
processedCount.addAndGet(remainingRecords.length);
}
);

Expand Down Expand Up @@ -201,15 +208,15 @@ private static String decompress(final byte[] compressed) throws IOException {
}

public static void decompressAndProcessRecords(final byte[] compressed, Consumer<String> firstRecordProcessor,
Consumer<String> remainingRecordProcessor){

final ExecutorService executorService = Executors.newFixedThreadPool(4);
Consumer<String[]> remainingRecordProcessor){
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressed);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream, BUFFER_SIZE);
InputStreamReader reader = new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8);
BufferedReader bufferedReader = new BufferedReader(reader)) {

String line;
String[] batch = new String[BATCH_SIZE];
int batchIndex = 0;
boolean isFirstRecord = true;

while ((line = bufferedReader.readLine()) != null) {
Expand All @@ -222,23 +229,21 @@ public static void decompressAndProcessRecords(final byte[] compressed, Consumer
firstRecordProcessor.accept(record); // Process the first record
isFirstRecord = false;
} else {
executorService.submit(() -> remainingRecordProcessor.accept(record)); // Process remaining records
batch[batchIndex++] = record;
if (batchIndex == BATCH_SIZE) {
remainingRecordProcessor.accept(batch);
batch = new String[BATCH_SIZE];
batchIndex = 0;
}
}
}
if (batchIndex > 0) {
remainingRecordProcessor.accept(Arrays.copyOf(batch, batchIndex));
}

} catch (IOException e) {
e.printStackTrace();
} finally {
executorService.shutdown(); // Properly shutdown the executor
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}

}

private static String calculateSha256(final byte[] bytes) {
Expand Down

0 comments on commit 2820409

Please sign in to comment.