Skip to content

Commit

Permalink
Merge pull request #3 from yu199195/master
Browse files Browse the repository at this point in the history
fork raincat
  • Loading branch information
ChaosCoffee authored Aug 23, 2018
2 parents 954efad + 3cc1de1 commit 95485c4
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public void signal() {
public void await() {
try {
lock.lock();
condition.await();
if(!isNotify()) {
condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.enums.CompensationActionEnum;
import com.raincat.common.holder.LogUtil;
import com.raincat.core.concurrent.threadpool.TxTransactionThreadFactory;
import com.raincat.core.disruptor.event.TxTransactionEvent;
import com.raincat.core.disruptor.factory.TxTransactionEventFactory;
import com.raincat.core.disruptor.handler.TxTransactionEventHandler;
Expand All @@ -37,6 +38,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -47,13 +52,20 @@
@Component
public class TxTransactionEventPublisher implements DisposableBean {

/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(TxTransactionEventPublisher.class);

private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors() << 1;

private Executor executor;

private Disruptor<TxTransactionEvent> disruptor;

private final TxTransactionEventHandler txTransactionEventHandler;

@Autowired
private TxTransactionEventHandler txTransactionEventHandler;
public TxTransactionEventPublisher(TxTransactionEventHandler txTransactionEventHandler) {
this.txTransactionEventHandler = txTransactionEventHandler;
}

/**
* disruptor start.
Expand All @@ -69,20 +81,24 @@ public void start(final int bufferSize) {
disruptor.setDefaultExceptionHandler(new ExceptionHandler<TxTransactionEvent>() {
@Override
public void handleEventException(Throwable ex, long sequence, TxTransactionEvent event) {
LogUtil.error(LOGGER,()-> "Disruptor handleEventException:"
+ event.getType() + event.getTransactionRecover().toString() );
LogUtil.error(LOGGER, () -> "Disruptor handleEventException:"
+ event.getType() + event.getTransactionRecover().toString());
}

@Override
public void handleOnStartException(Throwable ex) {
LogUtil.error(LOGGER,()-> "Disruptor start exception");
LogUtil.error(LOGGER, () -> "Disruptor start exception");
}

@Override
public void handleOnShutdownException(Throwable ex) {
LogUtil.error(LOGGER,()-> "Disruptor close Exception ");
LogUtil.error(LOGGER, () -> "Disruptor close Exception ");
}
});
executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
TxTransactionThreadFactory.create("raincat-log-disruptor", false),
new ThreadPoolExecutor.AbortPolicy());
disruptor.start();
}

Expand All @@ -93,8 +109,10 @@ public void handleOnShutdownException(Throwable ex) {
* @param type {@linkplain CompensationActionEnum}
*/
public void publishEvent(final TransactionRecover transactionRecover, final int type) {
final RingBuffer<TxTransactionEvent> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(new TxTransactionEventTranslator(type), transactionRecover);
executor.execute(() -> {
final RingBuffer<TxTransactionEvent> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(new TxTransactionEventTranslator(type), transactionRecover);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
<property name="txManagerUrl" value="http://192.168.1.109:8761"/>
<property name="serializer" value="kryo"/>
<property name="nettySerializer" value="kryo"/>
<property name="blockingQueueType" value="Linked"/>
<property name="compensation" value="true"/>
<property name="compensationCacheType" value="db"/>
<property name="txDbConfig">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
<property name="txManagerUrl" value="http://192.168.1.109:8761"/>
<property name="serializer" value="kryo"/>
<property name="nettySerializer" value="kryo"/>
<property name="blockingQueueType" value="Linked"/>
<property name="compensationCacheType" value="db"/>
<property name="compensation" value="true"/>
<property name="txDbConfig">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
<property name="txManagerUrl" value="http://192.168.1.109:8761"/>
<property name="serializer" value="kryo"/>
<property name="nettySerializer" value="kryo"/>
<property name="blockingQueueType" value="Linked"/>
<property name="compensationCacheType" value="db"/>
<property name="compensation" value="true"/>
<property name="txDbConfig">
Expand Down

0 comments on commit 95485c4

Please sign in to comment.