diff --git a/logbat/src/main/java/info/logbat/common/event/EventConsumer.java b/logbat/src/main/java/info/logbat/common/event/EventConsumer.java new file mode 100644 index 0000000..1c57daa --- /dev/null +++ b/logbat/src/main/java/info/logbat/common/event/EventConsumer.java @@ -0,0 +1,8 @@ +package info.logbat.common.event; + +import java.util.List; + +public interface EventConsumer { + + List consume(); +} diff --git a/logbat/src/main/java/info/logbat/common/event/EventProducer.java b/logbat/src/main/java/info/logbat/common/event/EventProducer.java new file mode 100644 index 0000000..7512970 --- /dev/null +++ b/logbat/src/main/java/info/logbat/common/event/EventProducer.java @@ -0,0 +1,8 @@ +package info.logbat.common.event; + +import java.util.List; + +public interface EventProducer { + + void produce(List data); +} diff --git a/logbat/src/main/java/info/logbat/domain/log/application/LogService.java b/logbat/src/main/java/info/logbat/domain/log/application/LogService.java index cfae356..7438fc6 100644 --- a/logbat/src/main/java/info/logbat/domain/log/application/LogService.java +++ b/logbat/src/main/java/info/logbat/domain/log/application/LogService.java @@ -1,10 +1,9 @@ package info.logbat.domain.log.application; +import info.logbat.common.event.EventProducer; import info.logbat.domain.log.domain.Log; import info.logbat.domain.log.presentation.payload.request.CreateLogRequest; -import info.logbat.domain.log.repository.LogRepository; import info.logbat.domain.project.application.AppService; - import java.util.ArrayList; import java.util.List; import lombok.RequiredArgsConstructor; @@ -16,7 +15,7 @@ @RequiredArgsConstructor public class LogService { - private final LogRepository logRepository; + private final EventProducer producer; private final AppService appService; public void saveLogs(String appKey, List requests) { @@ -25,12 +24,11 @@ public void saveLogs(String appKey, List requests) { requests.forEach(request -> { try { logs.add(request.toEntity(appId)); - } - catch (Exception e) { + } catch (Exception e) { log.error("Failed to convert request to entity: {}", request, e); } }); - logRepository.saveAll(logs); + producer.produce(logs); } } diff --git a/logbat/src/main/java/info/logbat/domain/log/flatter/LogRequestFlatter.java b/logbat/src/main/java/info/logbat/domain/log/flatter/LogRequestFlatter.java new file mode 100644 index 0000000..20d92e8 --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/flatter/LogRequestFlatter.java @@ -0,0 +1,21 @@ +package info.logbat.domain.log.flatter; + +import info.logbat.common.event.EventProducer; +import info.logbat.domain.log.domain.Log; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class LogRequestFlatter { + + private final EventProducer eventProducer; + private final ExecutorService executor = Executors.newFixedThreadPool(1); + + public void flatten(List logs) { + executor.submit(() -> eventProducer.produce(logs)); + } +} diff --git a/logbat/src/main/java/info/logbat/domain/log/queue/LogQueue.java b/logbat/src/main/java/info/logbat/domain/log/queue/LogQueue.java new file mode 100644 index 0000000..4253bc4 --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/queue/LogQueue.java @@ -0,0 +1,102 @@ +package info.logbat.domain.log.queue; + +import info.logbat.common.event.EventConsumer; +import info.logbat.common.event.EventProducer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.LockSupport; +import org.springframework.beans.factory.annotation.Value; + +/** + * 로깅 데이터 전달 목적의 Thread-Safe Queue 구현체로, 생산과 소비 작업을 모두 지원합니다. 이 클래스는 단일 스레드 환경에서 작동하도록 설계되었으며, 효율적인 + * 대량 작업을 허용합니다. + *

+ * 이 큐는 생산자가 데이터를 추가하고, 소비자가 데이터를 꺼내는 방식으로 동작합니다. 소비자는 큐에 충분한 데이터가 쌓일 때까지 대기하다가, 큐에 데이터가 충분히 쌓이면 + * 일괄적으로 데이터를 꺼내서 처리합니다. 이 때, 일괄 처리 크기는 생성자를 통해 지정할 수 있습니다. 만약 큐에 충분한 데이터가 쌓이지 않은 상태에서 소비자가 데이터를 꺼내려 + * 할 때, 소비자 스레드는 일정 시간 동안 대기하다가 큐에 데이터가 추가되면 깨어나서 데이터를 꺼냅니다. 이 때 대기 시간은 생성자를 통해 지정할 수 있습니다. + *

+ * + * @param 이 큐에 저장되는 요소의 타입 + */ +public class LogQueue implements EventProducer, EventConsumer { + + // T 타입의 요소를 저장하는 list + private final LinkedList queue; + // 소비자 스레드가 대기하는 시간 (나노초 단위) + private final long timeoutNanos; + // 일괄 처리 크기 + private final int bulkSize; + // 소비자 스레드, volatile로 선언하여 가시성 보장 + private volatile Thread consumerThread; + + + /** + * 지정된 타임아웃과 벌크 크기로 새 LogQueue를 생성합니다. + * + * @param timeoutMillis 큐가 비어있을 때 소비자가 대기하는 시간(밀리초) + * @param bulkSize 단일 작업에서 소비될 수 있는 최대 요소 수 + */ + public LogQueue(@Value("${jdbc.async.timeout}") Long timeoutMillis, + @Value("${jdbc.async.bulk-size}") Integer bulkSize) { + this.queue = new LinkedList<>(); + this.timeoutNanos = timeoutMillis * 1_000_000; // Convert to nanoseconds + this.bulkSize = bulkSize; + } + + /** + * 큐에서 요소를 일괄적으로 꺼내서 반환합니다. 큐에 충분한 요소가 쌓이지 않은 경우, 소비자 스레드는 일정 시간 동안 대기합니다. + *

+ * 이 메서드는 단일 스레드 환경에서만 호출해야 합니다. 만약 다중 스레드 환경에서 호출하면 예상치 못한 결과가 발생할 수 있습니다. 이 메서드는 큐에 충분한 요소가 쌓일 + * 때까지 대기하다가, 큐에 요소가 쌓이면 일괄적으로 요소를 꺼내서 반환합니다. 만약 큐에 충분한 요소가 쌓이지 않은 경우, 일정 시간 동안 대기하다가 큐에 요소가 + * 추가되면 깨어나서 요소를 꺼냅니다. 이 때 대기 시간은 생성자를 통해 지정할 수 있습니다. 이 메서드는 큐에 쌓인 요소를 꺼내서 반환하는 것이 목적이므로, 큐에 요소를 + * 추가하는 작업은 {@link #produce(List)} 메서드를 사용해야 합니다. + *

+ * + * @return 큐에서 꺼낸 요소의 리스트 (최대 {@link #bulkSize}개) + */ + @Override + public List consume() { + List result = new ArrayList<>(bulkSize); + + if (queue.size() >= bulkSize) { + for (int i = 0; i < bulkSize; i++) { + result.add(queue.poll()); + } + return result; + } + + consumerThread = Thread.currentThread(); + + do { + LockSupport.parkNanos(timeoutNanos); + } while (queue.isEmpty()); + + for (int i = 0; i < bulkSize; i++) { + result.add(queue.poll()); + if (queue.isEmpty()) { + break; + } + } + + consumerThread = null; + return result; + } + + /** + * 큐에 요소를 추가합니다. 큐에 요소가 추가되면, 소비자 스레드를 깨워서 요소를 꺼내도록 합니다. + *

+ * 이 메서드는 단일 스레드 환경에서만 호출해야 합니다. 만약 다중 스레드 환경에서 호출하면 예상치 못한 결과가 발생할 수 있습니다. 이 메서드는 큐에 요소를 추가하는 + * 것이 목적이므로, 큐에서 요소를 꺼내는 작업은 {@link #consume()} 메서드를 사용해야 합니다. + *

+ * + * @param data 큐에 추가할 요소의 리스트 + */ + @Override + public void produce(List data) { + queue.addAll(data); + if (consumerThread != null && queue.size() >= bulkSize) { + LockSupport.unpark(consumerThread); + } + } +} \ No newline at end of file diff --git a/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java b/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java new file mode 100644 index 0000000..97f3488 --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java @@ -0,0 +1,84 @@ +package info.logbat.domain.log.queue; + +import info.logbat.common.event.EventConsumer; +import info.logbat.common.event.EventProducer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +@Scope("prototype") +@Component +public class ReentrantLogQueue implements EventProducer, EventConsumer { + + private final LinkedList queue = new LinkedList<>(); + private final long timeout; + private final int bulkSize; + private final ReentrantLock bulkLock = new ReentrantLock(); + private final Condition bulkCondition = bulkLock.newCondition(); + + public ReentrantLogQueue(@Value("${jdbc.async.timeout}") Long timeout, + @Value("${jdbc.async.bulk-size}") Integer bulkSize) { + this.timeout = timeout; + this.bulkSize = bulkSize; + } + + /* + * Consumer should be one thread + */ + @Override + public List consume() { + List result = new ArrayList<>(); + + try { + bulkLock.lockInterruptibly(); + // Case1: Full Flush + if (queue.size() >= bulkSize) { + for (int i = 0; i < bulkSize; i++) { + result.add(queue.poll()); + } + return result; + } + // Else Case: Blocking + // Blocked while Queue is Not Empty + do { + bulkCondition.await(timeout, TimeUnit.MILLISECONDS); + } while (queue.isEmpty()); + + // Bulk Size 만큼 꺼내서 반환 + for (int i = 0; i < bulkSize; i++) { + result.add(queue.poll()); + if (queue.isEmpty()) { + break; + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + bulkLock.unlock(); + } + return result; + } + + /* + * Producer should be one thread + */ + @Override + public void produce(List data) { + bulkLock.lock(); + try { + queue.addAll(data); + if (queue.size() >= bulkSize) { + bulkCondition.signal(); + } + } finally { + bulkLock.unlock(); + } + } + +} diff --git a/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkLogQueue.java b/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkLogQueue.java new file mode 100644 index 0000000..3bcafad --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkLogQueue.java @@ -0,0 +1,53 @@ +package info.logbat.domain.log.queue; + +import info.logbat.common.event.EventConsumer; +import info.logbat.common.event.EventProducer; +import java.util.ArrayList; +import java.util.List; +import org.springframework.beans.factory.annotation.Value; + +public class SingleLinkLogQueue implements EventProducer, EventConsumer { + + private final SingleLinkedList queue = new SingleLinkedList<>(); + private final long timeout; + private final int bulkSize; + + public SingleLinkLogQueue(@Value("${jdbc.async.timeout}") Long timeout, + @Value("${jdbc.async.bulk-size}") Integer bulkSize) { + this.timeout = timeout; + this.bulkSize = bulkSize; + } + + /* + * Consumer should be one thread + */ + @Override + public List consume() { + List result = new ArrayList<>(bulkSize); + + final long endTime = System.currentTimeMillis() + timeout; + + while (result.isEmpty()) { + T data = queue.poll(); + if (data != null) { + result.add(data); + } + if (result.size() >= bulkSize) { + break; + } + if (System.currentTimeMillis() >= endTime) { + break; + } + } + + return result; + } + + /* + * Producer should be one thread + */ + @Override + public void produce(List data) { + queue.addAll(data); + } +} diff --git a/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkedList.java b/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkedList.java new file mode 100644 index 0000000..92a6650 --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkedList.java @@ -0,0 +1,66 @@ +package info.logbat.domain.log.queue; + +public class SingleLinkedList { + + Node first; + Node lastHolder; + + public SingleLinkedList() { + lastHolder = new Node<>(null, null); + first = lastHolder; + } + + private void linkLast(E e) { + Node newLastHolder = new Node<>(null, null); + + // 1. add next node + lastHolder.next = newLastHolder; + // 2. set item + lastHolder.item = e; + + lastHolder = newLastHolder; + } + + private E unlinkFirst() { + // 1. get first element + E element = first.item; + + // first == lastHolder + if (first.item == null) { + return null; + } + + // 2. get next node + // if element is not null, next node should not be null + Node next = first.next; + first.item = null; + first.next = null; // help GC + first = next; + return element; + } + + + public boolean isEmpty() { + return first.item == null; + } + + public E poll() { + return unlinkFirst(); + } + + public void addAll(Iterable c) { + for (E e : c) { + linkLast(e); + } + } + + private static class Node { + volatile E item; + volatile Node next; + + Node(E element, Node next) { + this.item = element; + this.next = next; + } + } +} diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java index f8f984f..9df8714 100644 --- a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java @@ -1,45 +1,36 @@ package info.logbat.domain.log.repository; import com.zaxxer.hikari.HikariDataSource; +import info.logbat.common.event.EventConsumer; import info.logbat.domain.log.domain.Log; -import java.util.ArrayList; import java.util.List; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Component; - /** - * 비동기식 로그 처리를 담당하는 것은 AsyncLogProcessor입니다. 리더-팔로워 패턴을 사용하여 로그 항목을 처리하고 대량으로 저장합니다. 리더는 로그 큐에서 로그 - * 항목을 가져와 팔로워 스레드 풀에 전달합니다. 팔로워는 로그 항목을 대량으로 저장합니다. + * 비동기적으로 로그를 처리하는 클래스입니다. 이 클래스는 로그를 저장하는 비동기 작업을 수행하며, 이를 위해 별도의 스레드 풀을 사용합니다. */ @Slf4j -@Component public class AsyncLogProcessor { - private final LinkedBlockingQueue logQueue = new LinkedBlockingQueue<>(); + // 로그 저장 작업을 수행하는 스레드 풀 private final ExecutorService followerExecutor; - private final long defaultTimeout; - private final int defaultBulkSize; + // 로그를 소비하는 EventConsumer 객체 + private final EventConsumer eventConsumer; /** - * 지정된 시간 제한, 일괄 크기 및 JdbcTemplate을 사용하여 AsyncLogProcessor를 구축합니다. + * 지정된 Consumer 객체와 JdbcTemplate을 사용하여 새 AsyncLogProcessor를 생성합니다. HikariDataSource의 최대 풀 크기의 + * 50%를 사용하여 스레드 풀을 초기화합니다. * - * @param timeout 팔로워 스레드가 대기하는 시간 제한 - * @param bulkSize 팔로워 스레드가 한 번에 처리하는 로그 항목 수 - * @param jdbcTemplate JdbcTemplate + * @param eventConsumer 로그를 소비하는 Consumer 객체 + * @param jdbcTemplate JdbcTemplate 객체 */ - public AsyncLogProcessor(@Value("${jdbc.async.timeout}") Long timeout, - @Value("${jdbc.async.bulk-size}") Integer bulkSize, + public AsyncLogProcessor(EventConsumer eventConsumer, JdbcTemplate jdbcTemplate) { DataSource dataSource = jdbcTemplate.getDataSource(); if (!(dataSource instanceof HikariDataSource)) { @@ -48,78 +39,39 @@ public AsyncLogProcessor(@Value("${jdbc.async.timeout}") Long timeout, int poolSize = ((HikariDataSource) dataSource).getMaximumPoolSize(); log.debug("Creating AsyncLogProcessor with pool size: {}", poolSize); + this.eventConsumer = eventConsumer; // use 50% of the pool size for the follower thread pool this.followerExecutor = Executors.newFixedThreadPool(poolSize * 5 / 10); - this.defaultTimeout = Objects.requireNonNullElse(timeout, 2000L); - this.defaultBulkSize = Objects.requireNonNullElse(bulkSize, 100); } /** - * 비동기식 로그 처리를 시작합니다. 리더 스레드를 시작하고 로그 저장 함수를 전달합니다. + * 로그 처리를 초기화하고 시작합니다. 리더 태스크를 비동기적으로 실행합니다. * - * @param saveFunction 로그 저장 함수 + * @param saveFunction 로그를 저장하는 함수 */ public void init(Consumer> saveFunction) { CompletableFuture.runAsync(() -> leaderTask(saveFunction)); } /** - * 로그를 제출합니다. - *

- * 로그 큐에 로그를 추가합니다. + * 리더 태스크를 실행하는 private 메서드입니다. 로그를 소비하고 팔로워 스레드 풀에 저장 작업을 제출합니다. * - * @param log 로그 + * @param saveFunction 로그를 저장하는 함수 */ - public void submitLog(Log log) { - // Queue 크기를 제한할 거면 offer 사용하도록 변경 - logQueue.add(log); + private void leaderTask(Consumer> saveFunction) { + while (!Thread.currentThread().isInterrupted()) { + List logs = eventConsumer.consume(); + followerExecutor.execute(() -> saveFunction.accept(logs)); + } } - /** - * 로그 리스트를 제출합니다. - * - * @param logs 로그 리스트 - */ - public void submitLogs(List logs) { - logQueue.addAll(logs); + @Deprecated(forRemoval = true) + public void submitLog(Log log) { + throw new UnsupportedOperationException("This method is deprecated"); } - /** - * 리더 스레드를 시작합니다. 로그를 저장하는 함수를 전달합니다. - *

- * 리더 스레드는 로그 큐에서 로그 항목을 가져와 팔로워 스레드 풀에 전달합니다. 팔로워는 로그 항목을 대량으로 저장합니다. 리더 스레드는 종료되지 않는 한 계속해서 - * 로그를 처리합니다. 리더 스레드가 종료되면 팔로워 스레드 풀도 종료됩니다. - * - * @param saveFunction 로그 저장 함수 - */ - private void leaderTask(Consumer> saveFunction) { - while (!Thread.currentThread().isInterrupted()) { - try { - final Log log = logQueue.poll(defaultTimeout, TimeUnit.MILLISECONDS); - /* - * Log가 천천히 들어오는 경우 Timeout에 한 번씩 저장 - * - * Log가 높은 부하로 들어오는 경우 Bulk Size만큼 한 번에 저장 - * - * Timeout동안 들어온 Log가 없는 경우 다음 반복문 cycle 수행 - */ - if (log == null) { - continue; - } - List logs = new ArrayList<>(); - logs.add(log); - //drainTo는 Queue에 있는 요소를 maxElements만큼 꺼내서 Collection에 담아준다. - logQueue.drainTo(logs, defaultBulkSize - 1); - // Follower Thread Pool에 저장 요청 - followerExecutor.execute(() -> saveFunction.accept(logs)); - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("Leader thread was interrupted. Exiting.", e); - break; - } catch (Exception e) { - log.error("Unexpected error in leader thread", e); - } - } + @Deprecated(forRemoval = true) + public void submitLogs(List logs) { + throw new UnsupportedOperationException("This method is deprecated"); } } \ No newline at end of file diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java index 9e4c5ee..44a8967 100644 --- a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java @@ -20,25 +20,25 @@ public class AsyncLogRepository implements LogRepository { private final JdbcTemplate jdbcTemplate; - private final AsyncLogProcessor asyncLogProcessor; + private final AsyncMultiProcessor asyncMultiProcessor; private static final Long DEFAULT_RETURNS = 0L; @PostConstruct public void init() { log.info("AsyncLogRepository is initialized."); - asyncLogProcessor.init(this::saveLogsToDatabase); + asyncMultiProcessor.init(this::saveLogsToDatabase); } + @Deprecated @Override public long save(Log log) { - asyncLogProcessor.submitLog(log); return DEFAULT_RETURNS; } + @Deprecated @Override public List saveAll(List logs) { - asyncLogProcessor.submitLogs(logs); return logs; } diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java new file mode 100644 index 0000000..4c063d1 --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java @@ -0,0 +1,82 @@ +package info.logbat.domain.log.repository; + +import com.zaxxer.hikari.HikariDataSource; +import info.logbat.common.event.EventProducer; +import info.logbat.domain.log.queue.ReentrantLogQueue; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import javax.sql.DataSource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Primary; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Primary +@Component +public class AsyncMultiProcessor implements EventProducer { + + private final List> queues; + private final List flatterExecutors; + private Consumer> saveFunction; + private final int queueCount; + private final ObjectProvider> objectProvider; + + public AsyncMultiProcessor(@Value("${queue.count:3}") int queueCount, + @Value("${jdbc.async.timeout:5000}") Long timeout, + @Value("${jdbc.async.bulk-size:3000}") Integer bulkSize, JdbcTemplate jdbcTemplate, + ObjectProvider> objectProvider) { + this.queueCount = queueCount; + this.queues = new ArrayList<>(queueCount); + this.flatterExecutors = new ArrayList<>(queueCount); + this.objectProvider = objectProvider; + int poolSize = getPoolSize(jdbcTemplate); + setup(queueCount, timeout, bulkSize, poolSize); + } + + public void init(Consumer> saveFunction) { + this.saveFunction = saveFunction; + } + + @Override + public void produce(List data) { + if (data.isEmpty()) { + return; + } + int selectedQueue = ThreadLocalRandom.current().nextInt(queueCount); + flatterExecutors.get(selectedQueue).execute(() -> queues.get(selectedQueue).produce(data)); + } + + private void setup(int queueCount, Long timeout, Integer bulkSize, int poolSize) { + ReentrantLogQueue queue = objectProvider.getObject(timeout, bulkSize); + for (int i = 0; i < queueCount; i++) { + queues.add(queue); + flatterExecutors.add(Executors.newSingleThreadExecutor()); + } + CompletableFuture.runAsync(() -> leaderTask(queue, Executors.newFixedThreadPool(poolSize))); + } + + private void leaderTask(ReentrantLogQueue queue, ExecutorService follower) { + while (!Thread.currentThread().isInterrupted()) { + List element = queue.consume(); + follower.execute(() -> saveFunction.accept(element)); + } + } + + private static int getPoolSize(JdbcTemplate jdbcTemplate) { + DataSource dataSource = jdbcTemplate.getDataSource(); + if (!(dataSource instanceof HikariDataSource)) { + throw new IllegalArgumentException("DataSource is null"); + } + int poolSize = ((HikariDataSource) dataSource).getMaximumPoolSize(); + log.debug("Creating AsyncLogProcessor with pool size: {}", poolSize); + return poolSize * 5 / 10; + } +} diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/LogRepository.java b/logbat/src/main/java/info/logbat/domain/log/repository/LogRepository.java index 1e9cba6..e69cb35 100644 --- a/logbat/src/main/java/info/logbat/domain/log/repository/LogRepository.java +++ b/logbat/src/main/java/info/logbat/domain/log/repository/LogRepository.java @@ -8,6 +8,7 @@ public interface LogRepository { long save(Log log); + @Deprecated(forRemoval = true) List saveAll(List logs); Optional findById(Long logId); diff --git a/logbat/src/test/java/info/logbat/domain/log/queue/LogQueueTest.java b/logbat/src/test/java/info/logbat/domain/log/queue/LogQueueTest.java new file mode 100644 index 0000000..2a626bf --- /dev/null +++ b/logbat/src/test/java/info/logbat/domain/log/queue/LogQueueTest.java @@ -0,0 +1,49 @@ +package info.logbat.domain.log.queue; + +import java.util.List; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class LogQueueTest { + + // private final ReentrantLogQueue logQueue = new ReentrantLogQueue<>(2000L, 10); + private final LogQueue logQueue = new LogQueue<>(2000L, 10); + + @DisplayName("") + @Test + void test() throws InterruptedException { + // given + Thread thread = new Thread(() -> { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + logQueue.produce( + List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21)); + }); + thread.start(); + + Thread thread2 = new Thread(() -> { + try { + Thread.sleep(6000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + logQueue.produce( + List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21)); + }); + thread2.start(); + + while (true) { + List consume = logQueue.consume(); + System.out.println("consume = " + consume); + } + + // when + + // then + + } + +} \ No newline at end of file diff --git a/logbat/src/test/java/info/logbat/domain/log/repository/AsyncLogProcessorTest.java b/logbat/src/test/java/info/logbat/domain/log/repository/AsyncLogProcessorTest.java deleted file mode 100644 index 246fe90..0000000 --- a/logbat/src/test/java/info/logbat/domain/log/repository/AsyncLogProcessorTest.java +++ /dev/null @@ -1,80 +0,0 @@ -package info.logbat.domain.log.repository; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -import com.zaxxer.hikari.HikariDataSource; -import info.logbat.domain.log.domain.Log; -import java.time.LocalDateTime; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import org.springframework.jdbc.core.JdbcTemplate; - -@DisplayName("AsyncLogProcessor는") -class AsyncLogProcessorTest { - - private final JdbcTemplate jdbcTemplate = Mockito.mock(JdbcTemplate.class); - private final HikariDataSource hikariDataSource = Mockito.mock(HikariDataSource.class); - private final Long expectedLogId = 1L; - private final LocalDateTime expectedLogTimestamp = LocalDateTime.of(2021, 1, 1, 0, 0, 0); - private final Log expectedLog = new Log(1L, expectedLogId, 0, "Test log", expectedLogTimestamp); - - private AsyncLogProcessor asyncLogProcessor; - private AtomicInteger processedLogCount; - private CountDownLatch latch; - - @BeforeEach - void setUp() { - // HikariDataSource 모킹 - when(jdbcTemplate.getDataSource()).thenReturn(hikariDataSource); - when(hikariDataSource.getMaximumPoolSize()).thenReturn(10); // 원하는 풀 사이즈 설정 - - asyncLogProcessor = new AsyncLogProcessor(2000L, 100, jdbcTemplate); - processedLogCount = new AtomicInteger(0); - } - - @Test - @DisplayName("단일 로그를 처리할 수 있다.") - void testSubmitSingleLog() throws InterruptedException { - // Arrange - latch = new CountDownLatch(1); - asyncLogProcessor.init(logs -> { - processedLogCount.addAndGet(logs.size()); - latch.countDown(); - }); - - // Act - asyncLogProcessor.submitLog(expectedLog); - - // Assert - assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(processedLogCount.get()) - .isEqualTo(1); - } - - @DisplayName("벌크 로그를 처리할 수 있다.") - @Test - void testSubmitBulkLogs() throws InterruptedException { - // Arrange - int logCount = 150; // DEFAULT_BULK_SIZE(100)보다 큰 값 - latch = new CountDownLatch(2); // 최소 2번의 처리를 기대 - asyncLogProcessor.init(logs -> { - processedLogCount.addAndGet(logs.size()); - latch.countDown(); - }); - // Act - for (int i = 0; i < logCount; i++) { - asyncLogProcessor.submitLog( - new Log((long) i, expectedLogId, 0, "Test log " + i, expectedLogTimestamp)); - } - // Assert - assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(processedLogCount.get()).isEqualTo(logCount); - } - -}