From 2820409c74530395baba0945aa89715657b6929a Mon Sep 17 00:00:00 2001 From: mherran Date: Wed, 6 Nov 2024 14:45:27 +0100 Subject: [PATCH] feat: use batches and reduce the threads --- .../client/importer/SnapshotImporter.java | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotImporter.java b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotImporter.java index 24ac9e16dc..710c5e1b40 100644 --- a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotImporter.java +++ b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotImporter.java @@ -23,11 +23,11 @@ import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.zip.GZIPInputStream; @@ -46,6 +46,8 @@ public class SnapshotImporter { private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray(); + private static final int BATCH_SIZE = 100; + private static final int BUFFER_SIZE = 4096; public SnapshotImporter(final NrtmRestClient nrtmRestClient, @@ -87,6 +89,7 @@ public void importSnapshot(final String source, final UpdateNotificationFileResp final Timer timer = new Timer(); printProgress(timer, processedCount); + final ExecutorService executorService = Executors.newFixedThreadPool(3); decompressAndProcessRecords( payload, firstRecord -> { @@ -102,13 +105,17 @@ public void importSnapshot(final String source, final UpdateNotificationFileResp } LOGGER.info("Processed first record"); }, - remainingRecord -> { - try { - processObject(remainingRecord); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - processedCount.incrementAndGet(); + remainingRecords -> { + executorService.submit(() -> { + for (String record : remainingRecords){ + try { + processObject(record); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + }); + processedCount.addAndGet(remainingRecords.length); } ); @@ -201,15 +208,15 @@ private static String decompress(final byte[] compressed) throws IOException { } public static void decompressAndProcessRecords(final byte[] compressed, Consumer firstRecordProcessor, - Consumer remainingRecordProcessor){ - - final ExecutorService executorService = Executors.newFixedThreadPool(4); + Consumer remainingRecordProcessor){ try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressed); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream, BUFFER_SIZE); InputStreamReader reader = new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8); BufferedReader bufferedReader = new BufferedReader(reader)) { String line; + String[] batch = new String[BATCH_SIZE]; + int batchIndex = 0; boolean isFirstRecord = true; while ((line = bufferedReader.readLine()) != null) { @@ -222,23 +229,21 @@ public static void decompressAndProcessRecords(final byte[] compressed, Consumer firstRecordProcessor.accept(record); // Process the first record isFirstRecord = false; } else { - executorService.submit(() -> remainingRecordProcessor.accept(record)); // Process remaining records + batch[batchIndex++] = record; + if (batchIndex == BATCH_SIZE) { + remainingRecordProcessor.accept(batch); + batch = new String[BATCH_SIZE]; + batchIndex = 0; + } } } + if (batchIndex > 0) { + remainingRecordProcessor.accept(Arrays.copyOf(batch, batchIndex)); + } } catch (IOException e) { e.printStackTrace(); - } finally { - executorService.shutdown(); // Properly shutdown the executor - try { - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - } } - } private static String calculateSha256(final byte[] bytes) {