Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev To Main Last Week #142

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package info.logbat.common.event;

import java.util.List;

public interface EventConsumer<T> {

List<T> consume();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package info.logbat.common.event;

import java.util.List;

public interface EventProducer<T> {

void produce(List<T> data);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package info.logbat.domain.log.application;

import info.logbat.common.event.EventProducer;
import info.logbat.domain.log.domain.Log;
import info.logbat.domain.log.presentation.payload.request.CreateLogRequest;
import info.logbat.domain.log.repository.LogRepository;
import info.logbat.domain.project.application.AppService;

import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
Expand All @@ -16,7 +15,7 @@
@RequiredArgsConstructor
public class LogService {

private final LogRepository logRepository;
private final EventProducer<Log> producer;
private final AppService appService;

public void saveLogs(String appKey, List<CreateLogRequest> requests) {
Expand All @@ -25,12 +24,11 @@ public void saveLogs(String appKey, List<CreateLogRequest> requests) {
requests.forEach(request -> {
try {
logs.add(request.toEntity(appId));
}
catch (Exception e) {
} catch (Exception e) {
log.error("Failed to convert request to entity: {}", request, e);
}
});
logRepository.saveAll(logs);
producer.produce(logs);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package info.logbat.domain.log.flatter;

import info.logbat.common.event.EventProducer;
import info.logbat.domain.log.domain.Log;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class LogRequestFlatter {

private final EventProducer<Log> eventProducer;
private final ExecutorService executor = Executors.newFixedThreadPool(1);

public void flatten(List<Log> logs) {
executor.submit(() -> eventProducer.produce(logs));
}
}
102 changes: 102 additions & 0 deletions logbat/src/main/java/info/logbat/domain/log/queue/LogQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package info.logbat.domain.log.queue;

import info.logbat.common.event.EventConsumer;
import info.logbat.common.event.EventProducer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import org.springframework.beans.factory.annotation.Value;

/**
* 로깅 데이터 전달 목적의 Thread-Safe Queue 구현체로, 생산과 소비 작업을 모두 지원합니다. 이 클래스는 단일 스레드 환경에서 작동하도록 설계되었으며, 효율적인
* 대량 작업을 허용합니다.
* <p>
* 이 큐는 생산자가 데이터를 추가하고, 소비자가 데이터를 꺼내는 방식으로 동작합니다. 소비자는 큐에 충분한 데이터가 쌓일 때까지 대기하다가, 큐에 데이터가 충분히 쌓이면
* 일괄적으로 데이터를 꺼내서 처리합니다. 이 때, 일괄 처리 크기는 생성자를 통해 지정할 수 있습니다. 만약 큐에 충분한 데이터가 쌓이지 않은 상태에서 소비자가 데이터를 꺼내려
* 할 때, 소비자 스레드는 일정 시간 동안 대기하다가 큐에 데이터가 추가되면 깨어나서 데이터를 꺼냅니다. 이 때 대기 시간은 생성자를 통해 지정할 수 있습니다.
* </p>
*
* @param <T> 이 큐에 저장되는 요소의 타입
*/
public class LogQueue<T> implements EventProducer<T>, EventConsumer<T> {

// T 타입의 요소를 저장하는 list
private final LinkedList<T> queue;
// 소비자 스레드가 대기하는 시간 (나노초 단위)
private final long timeoutNanos;
// 일괄 처리 크기
private final int bulkSize;
// 소비자 스레드, volatile로 선언하여 가시성 보장
private volatile Thread consumerThread;


/**
* 지정된 타임아웃과 벌크 크기로 새 LogQueue를 생성합니다.
*
* @param timeoutMillis 큐가 비어있을 때 소비자가 대기하는 시간(밀리초)
* @param bulkSize 단일 작업에서 소비될 수 있는 최대 요소 수
*/
public LogQueue(@Value("${jdbc.async.timeout}") Long timeoutMillis,
@Value("${jdbc.async.bulk-size}") Integer bulkSize) {
this.queue = new LinkedList<>();
this.timeoutNanos = timeoutMillis * 1_000_000; // Convert to nanoseconds
this.bulkSize = bulkSize;
}

/**
* 큐에서 요소를 일괄적으로 꺼내서 반환합니다. 큐에 충분한 요소가 쌓이지 않은 경우, 소비자 스레드는 일정 시간 동안 대기합니다.
* <p>
* 이 메서드는 단일 스레드 환경에서만 호출해야 합니다. 만약 다중 스레드 환경에서 호출하면 예상치 못한 결과가 발생할 수 있습니다. 이 메서드는 큐에 충분한 요소가 쌓일
* 때까지 대기하다가, 큐에 요소가 쌓이면 일괄적으로 요소를 꺼내서 반환합니다. 만약 큐에 충분한 요소가 쌓이지 않은 경우, 일정 시간 동안 대기하다가 큐에 요소가
* 추가되면 깨어나서 요소를 꺼냅니다. 이 때 대기 시간은 생성자를 통해 지정할 수 있습니다. 이 메서드는 큐에 쌓인 요소를 꺼내서 반환하는 것이 목적이므로, 큐에 요소를
* 추가하는 작업은 {@link #produce(List)} 메서드를 사용해야 합니다.
* </p>
*
* @return 큐에서 꺼낸 요소의 리스트 (최대 {@link #bulkSize}개)
*/
@Override
public List<T> consume() {
List<T> result = new ArrayList<>(bulkSize);

if (queue.size() >= bulkSize) {
for (int i = 0; i < bulkSize; i++) {
result.add(queue.poll());
}
return result;
}

consumerThread = Thread.currentThread();

do {
LockSupport.parkNanos(timeoutNanos);
} while (queue.isEmpty());

for (int i = 0; i < bulkSize; i++) {
result.add(queue.poll());
if (queue.isEmpty()) {
break;
}
}

consumerThread = null;
return result;
}

/**
* 큐에 요소를 추가합니다. 큐에 요소가 추가되면, 소비자 스레드를 깨워서 요소를 꺼내도록 합니다.
* <p>
* 이 메서드는 단일 스레드 환경에서만 호출해야 합니다. 만약 다중 스레드 환경에서 호출하면 예상치 못한 결과가 발생할 수 있습니다. 이 메서드는 큐에 요소를 추가하는
* 것이 목적이므로, 큐에서 요소를 꺼내는 작업은 {@link #consume()} 메서드를 사용해야 합니다.
* </p>
*
* @param data 큐에 추가할 요소의 리스트
*/
@Override
public void produce(List<T> data) {
queue.addAll(data);
if (consumerThread != null && queue.size() >= bulkSize) {
LockSupport.unpark(consumerThread);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package info.logbat.domain.log.queue;

import info.logbat.common.event.EventConsumer;
import info.logbat.common.event.EventProducer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component
public class ReentrantLogQueue<T> implements EventProducer<T>, EventConsumer<T> {

private final LinkedList<T> queue = new LinkedList<>();
private final long timeout;
private final int bulkSize;
private final ReentrantLock bulkLock = new ReentrantLock();
private final Condition bulkCondition = bulkLock.newCondition();

public ReentrantLogQueue(@Value("${jdbc.async.timeout}") Long timeout,
@Value("${jdbc.async.bulk-size}") Integer bulkSize) {
this.timeout = timeout;
this.bulkSize = bulkSize;
}

/*
* Consumer should be one thread
*/
@Override
public List<T> consume() {
List<T> result = new ArrayList<>();

try {
bulkLock.lockInterruptibly();
// Case1: Full Flush
if (queue.size() >= bulkSize) {
for (int i = 0; i < bulkSize; i++) {
result.add(queue.poll());
}
return result;
}
// Else Case: Blocking
// Blocked while Queue is Not Empty
do {
bulkCondition.await(timeout, TimeUnit.MILLISECONDS);
} while (queue.isEmpty());

// Bulk Size 만큼 꺼내서 반환
for (int i = 0; i < bulkSize; i++) {
result.add(queue.poll());
if (queue.isEmpty()) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
bulkLock.unlock();
}
return result;
}

/*
* Producer should be one thread
*/
@Override
public void produce(List<T> data) {
bulkLock.lock();
try {
queue.addAll(data);
if (queue.size() >= bulkSize) {
bulkCondition.signal();
}
} finally {
bulkLock.unlock();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package info.logbat.domain.log.queue;

import info.logbat.common.event.EventConsumer;
import info.logbat.common.event.EventProducer;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Value;

public class SingleLinkLogQueue<T> implements EventProducer<T>, EventConsumer<T> {

private final SingleLinkedList<T> queue = new SingleLinkedList<>();
private final long timeout;
private final int bulkSize;

public SingleLinkLogQueue(@Value("${jdbc.async.timeout}") Long timeout,
@Value("${jdbc.async.bulk-size}") Integer bulkSize) {
this.timeout = timeout;
this.bulkSize = bulkSize;
}

/*
* Consumer should be one thread
*/
@Override
public List<T> consume() {
List<T> result = new ArrayList<>(bulkSize);

final long endTime = System.currentTimeMillis() + timeout;

while (result.isEmpty()) {
T data = queue.poll();
if (data != null) {
result.add(data);
}
if (result.size() >= bulkSize) {
break;
}
if (System.currentTimeMillis() >= endTime) {
break;
}
}

return result;
}

/*
* Producer should be one thread
*/
@Override
public void produce(List<T> data) {
queue.addAll(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package info.logbat.domain.log.queue;

public class SingleLinkedList<E> {

Node<E> first;
Node<E> lastHolder;

public SingleLinkedList() {
lastHolder = new Node<>(null, null);
first = lastHolder;
}

private void linkLast(E e) {
Node<E> newLastHolder = new Node<>(null, null);

// 1. add next node
lastHolder.next = newLastHolder;
// 2. set item
lastHolder.item = e;

lastHolder = newLastHolder;
}

private E unlinkFirst() {
// 1. get first element
E element = first.item;

// first == lastHolder
if (first.item == null) {
return null;
}

// 2. get next node
// if element is not null, next node should not be null
Node<E> next = first.next;
first.item = null;
first.next = null; // help GC
first = next;
return element;
}


public boolean isEmpty() {
return first.item == null;
}

public E poll() {
return unlinkFirst();
}

public void addAll(Iterable<? extends E> c) {
for (E e : c) {
linkLast(e);
}
}

private static class Node<E> {
volatile E item;
volatile Node<E> next;

Node(E element, Node<E> next) {
this.item = element;
this.next = next;
}
}
}
Loading
Loading