Skip to content

Commit

Permalink
Merge pull request #345 from FISCO-BCOS/release-2.0.5
Browse files Browse the repository at this point in the history
Async can initialize the thread pool by constructor
  • Loading branch information
bxq2011hust authored Jul 12, 2019
2 parents 255e90b + 7691b87 commit 3c9ad2c
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/fisco/bcos/channel/client/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ public void onReceiveTransactionMessage(ChannelHandlerContext ctx, BcosMessage m

public String newSeq() {
String seq = UUID.randomUUID().toString().replaceAll("-", "");
logger.info("New Seq" + seq);
logger.debug("New Seq" + seq);
return seq;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public <T extends Response> T send(Request request, Class<T> responseType) throw
channelService.sendEthereumMessage(
bcosRequest, request.getTransactionSucCallback());
}
logger.info(
logger.debug(
"bcos request, seq:{}, method:{}", bcosRequest.getMessageID(), request.getMethod());
logger.debug(
"bcos request:{} {}",
Expand Down
34 changes: 28 additions & 6 deletions src/main/java/org/fisco/bcos/web3j/utils/Async.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@

import java.util.concurrent.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Async task facilitation. */
public class Async {
private static final ExecutorService executor = Executors.newFixedThreadPool(web3AsyncPoolSize);

static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown(executor)));
}

static Logger logger = LoggerFactory.getLogger(Async.class);

private static Executor executor;

public static <T> CompletableFuture<T> run(Callable<T> callable) {
public static <T> CompletableFuture<T> run(Callable<T> callable) {

if (null == executor) {
logger.info(" default set setExeutor , pool size is {}", web3AsyncPoolSize);
setExeutor(Executors.newFixedThreadPool(web3AsyncPoolSize), true);
}

CompletableFuture<T> result = new CompletableFuture<>();
CompletableFuture.runAsync(
() -> {
Expand Down Expand Up @@ -67,4 +75,18 @@ private static void shutdown(ExecutorService executorService) {
Thread.currentThread().interrupt();
}
}

public static synchronized void setExeutor(Executor pool, boolean setIfNull) {
if(null == Async.executor && setIfNull) {
Async.executor = pool;
logger.info(" set setExeutor because executor null, executor is {}", pool.toString());
} else if(!setIfNull) {
Async.executor = pool;
logger.info(" set setExeutor even executor already exist, executor is {}", pool.toString());
}
}

public Async(Executor pool) {
setExeutor(pool, true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package org.fisco.bcos.channel.test.contract;

import com.google.common.util.concurrent.RateLimiter;

import java.math.BigInteger;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.fisco.bcos.channel.client.Service;
import org.fisco.bcos.web3j.crypto.Credentials;
import org.fisco.bcos.web3j.protocol.Web3j;
import org.fisco.bcos.web3j.protocol.channel.ChannelEthereumService;
import org.fisco.bcos.web3j.protocol.core.methods.response.TransactionReceipt;
import org.fisco.bcos.web3j.utils.Async;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class PerformanceOkDSync {
private static Logger logger = LoggerFactory.getLogger(PerformanceOkDSync.class);
private static AtomicInteger sended = new AtomicInteger(0);

public static void main(String[] args) throws Exception {
try {
String groupId = args[3];
ApplicationContext context =
new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
Service service = context.getBean(Service.class);
service.setGroupId(Integer.parseInt(groupId));
service.run();

System.out.println("Start test...");
System.out.println(
"===================================================================");

ChannelEthereumService channelEthereumService = new ChannelEthereumService();
channelEthereumService.setChannelService(service);

if(args.length > 4) {
Integer threadPoolSize = Integer.parseInt(args[4]);
Async async = new Async(Executors.newFixedThreadPool(threadPoolSize));
System.out.println(" === thread pool size = " + threadPoolSize);
}

ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(500);
Web3j web3 =
Web3j.build(
channelEthereumService,
15 * 100,
scheduledExecutorService,
Integer.parseInt(groupId));

Credentials credentials =
Credentials.create(
"b83261efa42895c38c6c2364ca878f43e77f3cddbc922bf57d0d48070f79feb6");

BigInteger gasPrice = new BigInteger("30000000");
BigInteger gasLimit = new BigInteger("30000000");

String command = args[0];
Integer count = 0;
Integer qps = 0;

switch (command) {
case "trans":
count = Integer.parseInt(args[1]);
qps = Integer.parseInt(args[2]);

break;
default:
System.out.println("Args: <trans> <Total> <QPS> <GroupID> <ThreadPoolSize>");
}

ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(200);
threadPool.setMaxPoolSize(500);
threadPool.setQueueCapacity(count);

threadPool.initialize();

System.out.println("Deploying contract...");
OkD ok = OkD.deploy(web3, credentials, gasPrice, gasLimit).send();

PerformanceCollector collector = new PerformanceCollector();
collector.setTotal(count);

RateLimiter limiter = RateLimiter.create(qps);
Integer area = count / 10;
final Integer total = count;

Random random = new Random(System.currentTimeMillis());

System.out.println("Start test,total:" + count);
for (Integer i = 0; i < count; ++i) {
threadPool.execute(
new Runnable() {
@Override
public void run() {
limiter.acquire();
PerformanceOkCallback callback = new PerformanceOkCallback();
callback.setCollector(collector);
try {
TransactionReceipt receipt = ok.trans(
String.valueOf(random.nextLong()),
new BigInteger("1")).sendAsync().get();
callback.onResponse(receipt);
} catch (Exception e) {
TransactionReceipt receipt = new TransactionReceipt();
receipt.setStatus("-1");

callback.onResponse(receipt);
logger.error("Error sending:", e);
}

int current = sended.incrementAndGet();

if (current >= area && ((current % area) == 0)) {
System.out.println(
"Already sended: "
+ current
+ "/"
+ total
+ " transactions");
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
}

0 comments on commit 3c9ad2c

Please sign in to comment.