From 03e903e7db2f34a3f70e47615975c89d88bebe1f Mon Sep 17 00:00:00 2001 From: halibobo1205 <82020050+halibobo1205@users.noreply.github.com> Date: Wed, 30 Aug 2023 21:49:54 +0800 Subject: [PATCH] feat(all):name thread pools (#5425) --- .../tron/core/vm/PrecompiledContracts.java | 12 ++++--- .../common/storage/metric/DbStatService.java | 16 +++------ .../tron/core/capsule/TransactionCapsule.java | 7 ++-- .../tron/core/db2/core/SnapshotManager.java | 33 ++++++++----------- .../common/es/ExecutorServiceManager.java | 30 ++++++++++++++++- .../org/tron/consensus/pbft/PbftManager.java | 14 +++++--- .../org/tron/common/backup/BackupManager.java | 11 +++++-- .../common/backup/socket/BackupServer.java | 4 ++- .../tron/core/config/args/DynamicArgs.java | 12 +++---- .../main/java/org/tron/core/db/Manager.java | 6 ++-- .../messagehandler/PbftDataSyncHandler.java | 16 ++++++--- .../TransactionsMsgHandler.java | 21 ++++++------ .../org/tron/core/net/peer/PeerManager.java | 9 +++-- .../tron/core/net/peer/PeerStatusCheck.java | 10 +++--- .../tron/core/net/service/adv/AdvService.java | 16 +++++---- .../effective/EffectiveCheckService.java | 16 +++------ .../service/fetchblock/FetchBlockService.java | 12 +++---- .../core/net/service/relay/RelayService.java | 8 +++-- .../service/statistics/TronStatsManager.java | 13 ++++---- .../core/net/service/sync/SyncService.java | 15 +++++---- .../org/tron/core/services/RpcApiService.java | 7 ++-- .../interfaceOnPBFT/RpcApiServiceOnPBFT.java | 7 ++-- .../RpcApiServiceOnSolidity.java | 7 ++-- .../core/services/jsonrpc/JsonRpcServlet.java | 9 ++--- .../services/jsonrpc/TronJsonRpcImpl.java | 21 +++++++++--- .../java/org/tron/core/trie/TrieImpl.java | 6 ++-- 26 files changed, 201 insertions(+), 137 deletions(-) diff --git a/actuator/src/main/java/org/tron/core/vm/PrecompiledContracts.java b/actuator/src/main/java/org/tron/core/vm/PrecompiledContracts.java index 9d4aef2bf36..be7b9423f5c 100644 --- a/actuator/src/main/java/org/tron/core/vm/PrecompiledContracts.java +++ b/actuator/src/main/java/org/tron/core/vm/PrecompiledContracts.java @@ -26,7 +26,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import lombok.AllArgsConstructor; @@ -46,6 +45,7 @@ import org.tron.common.crypto.zksnark.BN128G2; import org.tron.common.crypto.zksnark.Fp; import org.tron.common.crypto.zksnark.PairingCheck; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.runtime.ProgramResult; import org.tron.common.runtime.vm.DataWord; @@ -983,11 +983,13 @@ public Pair execute(byte[] rawData) { public static class BatchValidateSign extends PrecompiledContract { private static final ExecutorService workers; + private static final String workersName = "validate-sign-contract"; private static final int ENGERYPERSIGN = 1500; private static final int MAX_SIZE = 16; static { - workers = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2 + 1); + workers = ExecutorServiceManager.newFixedThreadPool(workersName, + Runtime.getRuntime().availableProcessors() / 2 + 1); } @Override @@ -1290,10 +1292,12 @@ public static class VerifyTransferProof extends VerifyProof { private static final Integer[] SIZE = {2080, 2368, 2464, 2752}; private static final ExecutorService workersInConstantCall; private static final ExecutorService workersInNonConstantCall; + private static final String constantCallName = "verify-transfer-constant-call"; + private static final String nonConstantCallName = "verify-transfer-non-constant-call"; static { - workersInConstantCall = Executors.newFixedThreadPool(5); - workersInNonConstantCall = Executors.newFixedThreadPool(5); + workersInConstantCall = ExecutorServiceManager.newFixedThreadPool(constantCallName, 5); + workersInNonConstantCall = ExecutorServiceManager.newFixedThreadPool(nonConstantCallName, 5); } @Override diff --git a/chainbase/src/main/java/org/tron/common/storage/metric/DbStatService.java b/chainbase/src/main/java/org/tron/common/storage/metric/DbStatService.java index 402ab087808..b6fa25d5901 100644 --- a/chainbase/src/main/java/org/tron/common/storage/metric/DbStatService.java +++ b/chainbase/src/main/java/org/tron/common/storage/metric/DbStatService.java @@ -1,11 +1,10 @@ package org.tron.common.storage.metric; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.prometheus.Metrics; import org.tron.core.db.common.DbSourceInter; import org.tron.core.db2.common.DB; @@ -13,10 +12,9 @@ @Slf4j(topic = "metrics") @Component public class DbStatService { - private static final ScheduledExecutorService statExecutor = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("db-stats-thread-%d").build()); - + private final String esName = "db-stats"; + private final ScheduledExecutorService statExecutor = + ExecutorServiceManager.newSingleThreadScheduledExecutor(esName); public void register(DB db) { if (Metrics.enabled()) { @@ -32,11 +30,7 @@ public void register(DbSourceInter db) { public void shutdown() { if (Metrics.enabled()) { - try { - statExecutor.shutdown(); - } catch (Exception e) { - logger.error("{}", e.getMessage()); - } + ExecutorServiceManager.shutdownAndAwaitTermination(statExecutor, esName); } } } diff --git a/chainbase/src/main/java/org/tron/core/capsule/TransactionCapsule.java b/chainbase/src/main/java/org/tron/core/capsule/TransactionCapsule.java index 1edb5b114fb..9598fd99a6b 100755 --- a/chainbase/src/main/java/org/tron/core/capsule/TransactionCapsule.java +++ b/chainbase/src/main/java/org/tron/core/capsule/TransactionCapsule.java @@ -33,7 +33,6 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import lombok.Getter; @@ -43,6 +42,7 @@ import org.tron.common.crypto.ECKey.ECDSASignature; import org.tron.common.crypto.SignInterface; import org.tron.common.crypto.SignUtils; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.overlay.message.Message; import org.tron.common.parameter.CommonParameter; import org.tron.common.utils.ByteArray; @@ -86,8 +86,9 @@ @Slf4j(topic = "capsule") public class TransactionCapsule implements ProtoCapsule { - private static final ExecutorService executorService = Executors - .newFixedThreadPool(CommonParameter.getInstance() + private static final String esName = "valid-contract-proto"; + private static final ExecutorService executorService = ExecutorServiceManager + .newFixedThreadPool(esName, CommonParameter.getInstance() .getValidContractProtoThreadNum()); private static final String OWNER_ADDRESS = "ownerAddress_"; diff --git a/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java b/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java index 96796b3c460..f5de96e5587 100644 --- a/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java +++ b/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java @@ -16,8 +16,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -25,12 +23,12 @@ import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.tron.common.error.TronDBException; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.storage.WriteOptionsWrapper; import org.tron.common.utils.FileUtil; @@ -76,6 +74,7 @@ public class SnapshotManager implements RevokingDatabase { private Map flushServices = new HashMap<>(); private ScheduledExecutorService pruneCheckpointThread = null; + private final String pruneName = "checkpoint-prune"; @Autowired @Setter @@ -95,7 +94,7 @@ public void init() { checkpointVersion = CommonParameter.getInstance().getStorage().getCheckpointVersion(); // prune checkpoint if (isV2Open()) { - pruneCheckpointThread = Executors.newSingleThreadScheduledExecutor(); + pruneCheckpointThread = ExecutorServiceManager.newSingleThreadScheduledExecutor(pruneName); pruneCheckpointThread.scheduleWithFixedDelay(() -> { try { if (!unChecked) { @@ -117,18 +116,6 @@ public void init() { exitThread.start(); } - @PreDestroy - public void close() { - try { - exitThread.interrupt(); - // help GC - exitThread = null; - flushServices.values().forEach(ExecutorService::shutdown); - } catch (Exception e) { - logger.warn("exitThread interrupt error", e); - } - } - public static String simpleDecode(byte[] bytes) { byte[] lengthBytes = Arrays.copyOf(bytes, 4); int length = Ints.fromByteArray(lengthBytes); @@ -177,7 +164,8 @@ public void add(IRevokingDB db) { Chainbase revokingDB = (Chainbase) db; dbs.add(revokingDB); flushServices.put(revokingDB.getDbName(), - MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())); + MoreExecutors.listeningDecorator(ExecutorServiceManager.newSingleThreadExecutor( + "flush-service-" + revokingDB.getDbName()))); } private void advance() { @@ -284,8 +272,15 @@ public synchronized void disable() { @Override public void shutdown() { - if (pruneCheckpointThread != null) { - pruneCheckpointThread.shutdown(); + ExecutorServiceManager.shutdownAndAwaitTermination(pruneCheckpointThread, pruneName); + flushServices.forEach((key, value) -> ExecutorServiceManager.shutdownAndAwaitTermination(value, + "flush-service-" + key)); + try { + exitThread.interrupt(); + // help GC + exitThread = null; + } catch (Exception e) { + logger.warn("exitThread interrupt error", e); } } diff --git a/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java b/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java index f1e60fdcfbc..196d44ba722 100644 --- a/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java +++ b/common/src/main/java/org/tron/common/es/ExecutorServiceManager.java @@ -1,12 +1,15 @@ package org.tron.common.es; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; -@Slf4j(topic = "common") +@Slf4j(topic = "common-executor") public class ExecutorServiceManager { public static ExecutorService newSingleThreadExecutor(String name) { @@ -29,6 +32,31 @@ public static ScheduledExecutorService newSingleThreadScheduledExecutor(String n new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build()); } + public static ExecutorService newFixedThreadPool(String name, int fixThreads) { + return newFixedThreadPool(name, fixThreads, false); + } + + public static ExecutorService newFixedThreadPool(String name, int fixThreads, boolean isDaemon) { + return Executors.newFixedThreadPool(fixThreads, + new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(isDaemon).build()); + } + + public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, + String name) { + return newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + name, false); + } + + public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, + String name, boolean isDaemon) { + return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(isDaemon).build()); + } + public static void shutdownAndAwaitTermination(ExecutorService pool, String name) { if (pool == null) { return; diff --git a/consensus/src/main/java/org/tron/consensus/pbft/PbftManager.java b/consensus/src/main/java/org/tron/consensus/pbft/PbftManager.java index 2f42524e03e..9be925bdbc6 100644 --- a/consensus/src/main/java/org/tron/consensus/pbft/PbftManager.java +++ b/consensus/src/main/java/org/tron/consensus/pbft/PbftManager.java @@ -1,13 +1,14 @@ package org.tron.consensus.pbft; import com.google.protobuf.ByteString; +import java.io.Closeable; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.consensus.base.Param; import org.tron.consensus.base.Param.Miner; import org.tron.consensus.dpos.MaintenanceManager; @@ -18,7 +19,7 @@ @Slf4j(topic = "pbft") @Component -public class PbftManager { +public class PbftManager implements Closeable { @Autowired private PbftMessageHandle pbftMessageHandle; @@ -29,8 +30,8 @@ public class PbftManager { @Autowired private ChainBaseManager chainBaseManager; - private ExecutorService executorService = Executors.newFixedThreadPool(10, - r -> new Thread(r, "Pbft")); + private final String esName = "pbft-msg-manager"; + private ExecutorService executorService = ExecutorServiceManager.newFixedThreadPool(esName, 10); @PostConstruct public void init() { @@ -111,4 +112,9 @@ public boolean verifyMsg(PbftBaseMessage msg) { return witnessList.contains(ByteString.copyFrom(msg.getPublicKey())); } + @Override + public void close() { + ExecutorServiceManager.shutdownAndAwaitTermination(executorService, esName); + } + } \ No newline at end of file diff --git a/framework/src/main/java/org/tron/common/backup/BackupManager.java b/framework/src/main/java/org/tron/common/backup/BackupManager.java index ef304164f1a..0c4a3e60dfd 100644 --- a/framework/src/main/java/org/tron/common/backup/BackupManager.java +++ b/framework/src/main/java/org/tron/common/backup/BackupManager.java @@ -9,7 +9,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -19,6 +18,7 @@ import org.tron.common.backup.socket.EventHandler; import org.tron.common.backup.socket.MessageHandler; import org.tron.common.backup.socket.UdpEvent; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; @Slf4j(topic = "backup") @@ -39,7 +39,10 @@ public class BackupManager implements EventHandler { private Set members = new ConcurrentSet<>(); - private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private final String esName = "backup-manager"; + + private ScheduledExecutorService executorService = + ExecutorServiceManager.newSingleThreadScheduledExecutor(esName); private MessageHandler messageHandler; @@ -144,6 +147,10 @@ public void handleEvent(UdpEvent udpEvent) { } } + public void stop() { + ExecutorServiceManager.shutdownAndAwaitTermination(executorService, esName); + } + @Override public void channelActivated() { init(); diff --git a/framework/src/main/java/org/tron/common/backup/socket/BackupServer.java b/framework/src/main/java/org/tron/common/backup/socket/BackupServer.java index fa2c0947852..2acf1e12633 100644 --- a/framework/src/main/java/org/tron/common/backup/socket/BackupServer.java +++ b/framework/src/main/java/org/tron/common/backup/socket/BackupServer.java @@ -19,7 +19,7 @@ @Slf4j(topic = "backup") @Component -public class BackupServer { +public class BackupServer implements AutoCloseable { private CommonParameter commonParameter = CommonParameter.getInstance(); @@ -91,10 +91,12 @@ public void initChannel(NioDatagramChannel ch) } } + @Override public void close() { logger.info("Closing backup server..."); shutdown = true; ExecutorServiceManager.shutdownAndAwaitTermination(executor, name); + backupManager.stop(); if (channel != null) { try { channel.close().await(10, TimeUnit.SECONDS); diff --git a/framework/src/main/java/org/tron/core/config/args/DynamicArgs.java b/framework/src/main/java/org/tron/core/config/args/DynamicArgs.java index cbf167cb955..674ea0f74c6 100644 --- a/framework/src/main/java/org/tron/core/config/args/DynamicArgs.java +++ b/framework/src/main/java/org/tron/core/config/args/DynamicArgs.java @@ -7,12 +7,11 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.core.Constant; import org.tron.core.config.Configuration; @@ -26,10 +25,12 @@ public class DynamicArgs { private long lastModified = 0; - private ScheduledExecutorService reloadExecutor = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService reloadExecutor; + private final String esName = "dynamic-reload"; public void init() { if (parameter.isDynamicConfigEnable()) { + reloadExecutor = ExecutorServiceManager.newSingleThreadScheduledExecutor(esName); logger.info("Start the dynamic loading configuration service"); long checkInterval = parameter.getDynamicConfigCheckInterval(); File config = getConfigFile(); @@ -108,7 +109,6 @@ private void updateTrustNodes(Config config) { } public void close() { - logger.info("Closing the dynamic loading configuration service"); - reloadExecutor.shutdown(); + ExecutorServiceManager.shutdownAndAwaitTermination(reloadExecutor, esName); } -} \ No newline at end of file +} diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index c50e8c900a1..de515fefc90 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -29,7 +29,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -206,6 +205,7 @@ public class Manager { @Setter private MerkleContainer merkleContainer; private ExecutorService validateSignService; + private String validateSignName = "validate-sign"; private boolean isRunRePushThread = true; private boolean isRunTriggerCapsuleProcessThread = true; private BlockingQueue pushTransactionQueue = new LinkedBlockingQueue<>(); @@ -536,8 +536,8 @@ public void init() { logger.info("Lite node lowestNum: {}", chainBaseManager.getLowestBlockNum()); } revokingStore.enable(); - validateSignService = Executors - .newFixedThreadPool(Args.getInstance().getValidateSignThreadNum()); + validateSignService = ExecutorServiceManager + .newFixedThreadPool(validateSignName, Args.getInstance().getValidateSignThreadNum()); rePushEs = ExecutorServiceManager.newSingleThreadExecutor(rePushEsName, true); rePushEs.submit(rePushLoop); // add contract event listener for subscribing diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/PbftDataSyncHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/PbftDataSyncHandler.java index 238d131abe8..60f614632a4 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/PbftDataSyncHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/PbftDataSyncHandler.java @@ -4,6 +4,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import io.netty.util.internal.ConcurrentSet; +import java.io.Closeable; import java.security.SignatureException; import java.util.ArrayList; import java.util.List; @@ -12,12 +13,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.tron.common.crypto.ECKey; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.utils.ByteArray; import org.tron.common.utils.Sha256Hash; import org.tron.consensus.base.Param; @@ -34,12 +35,14 @@ @Slf4j(topic = "pbft-data-sync") @Service -public class PbftDataSyncHandler implements TronMsgHandler { +public class PbftDataSyncHandler implements TronMsgHandler, Closeable { private Map pbftCommitMessageCache = new ConcurrentHashMap<>(); - private ExecutorService executorService = Executors.newFixedThreadPool(19, - r -> new Thread(r, "valid-header-pbft-sign")); + private final String esName = "valid-header-pbft-sign"; + + private ExecutorService executorService = ExecutorServiceManager.newFixedThreadPool( + esName, 19); @Autowired private ChainBaseManager chainBaseManager; @@ -81,6 +84,11 @@ public void processPBFTCommitData(BlockCapsule block) { } } + @Override + public void close() { + ExecutorServiceManager.shutdownAndAwaitTermination(executorService, esName); + } + private void processPBFTCommitMessage(PbftCommitMessage pbftCommitMessage) { try { PbftSignDataStore pbftSignDataStore = chainBaseManager.getPbftSignDataStore(); diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index a0109c06060..665381b31a8 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -2,15 +2,14 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.core.config.args.Args; import org.tron.core.exception.P2pException; import org.tron.core.exception.P2pException.TypeEnum; @@ -42,19 +41,21 @@ public class TransactionsMsgHandler implements TronMsgHandler { private BlockingQueue queue = new LinkedBlockingQueue(); private int threadNum = Args.getInstance().getValidateSignThreadNum(); - private ExecutorService trxHandlePool = new ThreadPoolExecutor(threadNum, threadNum, 0L, - TimeUnit.MILLISECONDS, queue); - - private ScheduledExecutorService smartContractExecutor = Executors - .newSingleThreadScheduledExecutor(); + private final String trxEsName = "trx-msg-handler"; + private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor( + threadNum, threadNum, 0L, + TimeUnit.MILLISECONDS, queue, trxEsName); + private final String smartEsName = "contract-msg-handler"; + private final ScheduledExecutorService smartContractExecutor = ExecutorServiceManager + .newSingleThreadScheduledExecutor(smartEsName); public void init() { handleSmartContract(); } public void close() { - trxHandlePool.shutdown(); - smartContractExecutor.shutdown(); + ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); + ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName); } public boolean isBusy() { @@ -102,7 +103,7 @@ private void check(PeerConnection peer, TransactionsMessage msg) throws P2pExcep private void handleSmartContract() { smartContractExecutor.scheduleWithFixedDelay(() -> { try { - while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE) { + while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE && smartContractQueue.size() > 0) { TrxEvent event = smartContractQueue.take(); trxHandlePool.submit(() -> handleTransaction(event.getPeer(), event.getMsg())); } diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerManager.java b/framework/src/main/java/org/tron/core/net/peer/PeerManager.java index 6817720dff5..a80101d4f3a 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerManager.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerManager.java @@ -5,13 +5,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.prometheus.MetricKeys; import org.tron.common.prometheus.MetricLabels; import org.tron.common.prometheus.Metrics; @@ -25,8 +25,11 @@ public class PeerManager { private static AtomicInteger passivePeersCount = new AtomicInteger(0); @Getter private static AtomicInteger activePeersCount = new AtomicInteger(0); + private static final String esName = "peer-manager"; + + private static ScheduledExecutorService executor = + ExecutorServiceManager.newSingleThreadScheduledExecutor(esName); - private static ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); private static long DISCONNECTION_TIME_OUT = 60_000; @@ -48,7 +51,7 @@ public static void close() { p.getChannel().close(); } } - executor.shutdownNow(); + ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName); } catch (Exception e) { logger.error("Peer manager shutdown failed", e); } diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerStatusCheck.java b/framework/src/main/java/org/tron/core/net/peer/PeerStatusCheck.java index 84cf59b9bd7..6ccbf6427a7 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerStatusCheck.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerStatusCheck.java @@ -1,11 +1,11 @@ package org.tron.core.net.peer; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.core.config.Parameter.NetConstants; import org.tron.core.net.TronNetDelegate; import org.tron.protos.Protocol.ReasonCode; @@ -17,8 +17,10 @@ public class PeerStatusCheck { @Autowired private TronNetDelegate tronNetDelegate; - private ScheduledExecutorService peerStatusCheckExecutor = Executors - .newSingleThreadScheduledExecutor(); + private final String name = "peer-status-check"; + + private ScheduledExecutorService peerStatusCheckExecutor = ExecutorServiceManager + .newSingleThreadScheduledExecutor(name); private int blockUpdateTimeout = 30_000; @@ -33,7 +35,7 @@ public void init() { } public void close() { - peerStatusCheckExecutor.shutdown(); + ExecutorServiceManager.shutdownAndAwaitTermination(peerStatusCheckExecutor, name); } public void statusCheck() { diff --git a/framework/src/main/java/org/tron/core/net/service/adv/AdvService.java b/framework/src/main/java/org/tron/core/net/service/adv/AdvService.java index 03668d01837..59477e5d6f7 100644 --- a/framework/src/main/java/org/tron/core/net/service/adv/AdvService.java +++ b/framework/src/main/java/org/tron/core/net/service/adv/AdvService.java @@ -6,7 +6,6 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; - import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -15,15 +14,14 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.overlay.message.Message; import org.tron.common.utils.Sha256Hash; import org.tron.common.utils.Time; @@ -73,9 +71,13 @@ public class AdvService { .maximumSize(MAX_BLOCK_CACHE_SIZE).expireAfterWrite(1, TimeUnit.MINUTES) .recordStats().build(); - private ScheduledExecutorService spreadExecutor = Executors.newSingleThreadScheduledExecutor(); + private final String spreadName = "adv-spread"; + private final String fetchName = "adv-fetch"; + private final ScheduledExecutorService spreadExecutor = ExecutorServiceManager + .newSingleThreadScheduledExecutor(spreadName); - private ScheduledExecutorService fetchExecutor = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService fetchExecutor = ExecutorServiceManager + .newSingleThreadScheduledExecutor(fetchName); @Getter private MessageCount trxCount = new MessageCount(); @@ -102,8 +104,8 @@ public void init() { } public void close() { - spreadExecutor.shutdown(); - fetchExecutor.shutdown(); + ExecutorServiceManager.shutdownAndAwaitTermination(spreadExecutor, spreadName); + ExecutorServiceManager.shutdownAndAwaitTermination(fetchExecutor, fetchName); } public synchronized void addInvToCache(Item item) { diff --git a/framework/src/main/java/org/tron/core/net/service/effective/EffectiveCheckService.java b/framework/src/main/java/org/tron/core/net/service/effective/EffectiveCheckService.java index 44fdc56f938..a4e89412bae 100644 --- a/framework/src/main/java/org/tron/core/net/service/effective/EffectiveCheckService.java +++ b/framework/src/main/java/org/tron/core/net/service/effective/EffectiveCheckService.java @@ -2,14 +2,12 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.net.InetSocketAddress; import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -18,6 +16,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.core.config.args.Args; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.TronNetService; @@ -42,12 +41,13 @@ public class EffectiveCheckService { @Setter private volatile InetSocketAddress cur; private final AtomicInteger count = new AtomicInteger(0); - private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("effective-thread-%d").build()); + private final String esName = "effective-check"; + private ScheduledExecutorService executor; private long MAX_HANDSHAKE_TIME = 60_000; public void init() { if (isEffectiveCheck) { + executor = ExecutorServiceManager.newSingleThreadScheduledExecutor(esName); executor.scheduleWithFixedDelay(() -> { try { findEffectiveNode(); @@ -69,13 +69,7 @@ public void triggerNext() { } public void close() { - if (executor != null) { - try { - executor.shutdown(); - } catch (Exception e) { - logger.error("Exception in shutdown effective service worker, {}", e.getMessage()); - } - } + ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName); } public boolean isIsolateLand() { diff --git a/framework/src/main/java/org/tron/core/net/service/fetchblock/FetchBlockService.java b/framework/src/main/java/org/tron/core/net/service/fetchblock/FetchBlockService.java index 6a5b120a896..889f6f6e132 100644 --- a/framework/src/main/java/org/tron/core/net/service/fetchblock/FetchBlockService.java +++ b/framework/src/main/java/org/tron/core/net/service/fetchblock/FetchBlockService.java @@ -5,20 +5,17 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.utils.Sha256Hash; import org.tron.core.ChainBaseManager; import org.tron.core.capsule.BlockCapsule; -import org.tron.core.config.Parameter; import org.tron.core.metrics.MetricsKey; import org.tron.core.metrics.MetricsUtil; import org.tron.core.net.TronNetDelegate; @@ -43,9 +40,10 @@ public class FetchBlockService { private static final double BLOCK_FETCH_LEFT_TIME_PERCENT = 0.5; + private final String esName = "fetch-block"; + private final ScheduledExecutorService fetchBlockWorkerExecutor = - new ScheduledThreadPoolExecutor(1, - new BasicThreadFactory.Builder().namingPattern("FetchBlockWorkerSchedule-").build()); + ExecutorServiceManager.newSingleThreadScheduledExecutor(esName); public void init() { fetchBlockWorkerExecutor.scheduleWithFixedDelay(() -> { @@ -58,7 +56,7 @@ public void init() { } public void close() { - fetchBlockWorkerExecutor.shutdown(); + ExecutorServiceManager.shutdownAndAwaitTermination(fetchBlockWorkerExecutor, esName); } public void fetchBlock(List sha256HashList, PeerConnection peer) { diff --git a/framework/src/main/java/org/tron/core/net/service/relay/RelayService.java b/framework/src/main/java/org/tron/core/net/service/relay/RelayService.java index 665255a6594..9f1b2ef3c37 100644 --- a/framework/src/main/java/org/tron/core/net/service/relay/RelayService.java +++ b/framework/src/main/java/org/tron/core/net/service/relay/RelayService.java @@ -6,7 +6,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -18,6 +17,7 @@ import org.tron.common.backup.BackupManager.BackupStatusEnum; import org.tron.common.crypto.SignInterface; import org.tron.common.crypto.SignUtils; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.utils.ByteArray; import org.tron.common.utils.Sha256Hash; @@ -53,8 +53,10 @@ public class RelayService { private WitnessScheduleStore witnessScheduleStore; private BackupManager backupManager; + private final String esName = "relay-service"; - private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService executorService = ExecutorServiceManager + .newSingleThreadScheduledExecutor(esName); private CommonParameter parameter = Args.getInstance(); @@ -95,7 +97,7 @@ public void init() { } public void close() { - executorService.shutdown(); + ExecutorServiceManager.shutdownAndAwaitTermination(executorService, esName); } public void fillHelloMessage(HelloMessage message, Channel channel) { diff --git a/framework/src/main/java/org/tron/core/net/service/statistics/TronStatsManager.java b/framework/src/main/java/org/tron/core/net/service/statistics/TronStatsManager.java index da9b4b0f2d3..caac3f7f325 100644 --- a/framework/src/main/java/org/tron/core/net/service/statistics/TronStatsManager.java +++ b/framework/src/main/java/org/tron/core/net/service/statistics/TronStatsManager.java @@ -3,11 +3,11 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.net.InetAddress; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.prometheus.MetricKeys; import org.tron.common.prometheus.MetricLabels; import org.tron.common.prometheus.Metrics; @@ -27,7 +27,10 @@ public class TronStatsManager { private static Cache cache = CacheBuilder.newBuilder() .maximumSize(3000).recordStats().build(); - private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final String esName = "net-traffic-collector"; + + private ScheduledExecutorService executor = + ExecutorServiceManager.newSingleThreadScheduledExecutor(esName); public static NodeStatistics getNodeStatistics(InetAddress inetAddress) { NodeStatistics nodeStatistics = cache.getIfPresent(inetAddress); @@ -49,11 +52,7 @@ public void init() { } public void close() { - try { - executor.shutdownNow(); - } catch (Exception e) { - logger.error("Exception in shutdown traffic stats worker, {}", e.getMessage()); - } + ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName); } private void work() { diff --git a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java index eae134758cd..1e3e18441b9 100644 --- a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java +++ b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java @@ -11,13 +11,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.utils.Pair; import org.tron.core.capsule.BlockCapsule; import org.tron.core.capsule.BlockCapsule.BlockId; @@ -54,10 +54,13 @@ public class SyncService { .expireAfterWrite(blockCacheTimeout, TimeUnit.MINUTES).initialCapacity(10_000) .recordStats().build(); - private ScheduledExecutorService fetchExecutor = Executors.newSingleThreadScheduledExecutor(); + private final String fetchEsName = "sync-fetch-block"; + private final String handleEsName = "sync-handle-block"; + private final ScheduledExecutorService fetchExecutor = ExecutorServiceManager + .newSingleThreadScheduledExecutor(fetchEsName); - private ScheduledExecutorService blockHandleExecutor = Executors - .newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService blockHandleExecutor = ExecutorServiceManager + .newSingleThreadScheduledExecutor(handleEsName); private volatile boolean handleFlag = false; @@ -91,8 +94,8 @@ public void init() { } public void close() { - fetchExecutor.shutdown(); - blockHandleExecutor.shutdown(); + ExecutorServiceManager.shutdownAndAwaitTermination(fetchExecutor, fetchEsName); + ExecutorServiceManager.shutdownAndAwaitTermination(blockHandleExecutor, handleEsName); } public void startSync(PeerConnection peer) { diff --git a/framework/src/main/java/org/tron/core/services/RpcApiService.java b/framework/src/main/java/org/tron/core/services/RpcApiService.java index 2a4ebca95b4..26861523311 100755 --- a/framework/src/main/java/org/tron/core/services/RpcApiService.java +++ b/framework/src/main/java/org/tron/core/services/RpcApiService.java @@ -12,7 +12,6 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Objects; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -76,6 +75,7 @@ import org.tron.api.WalletGrpc.WalletImplBase; import org.tron.api.WalletSolidityGrpc.WalletSolidityImplBase; import org.tron.common.application.Service; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.utils.ByteArray; import org.tron.common.utils.Sha256Hash; @@ -204,6 +204,8 @@ public class RpcApiService implements Service { @Getter private MonitorApi monitorApi = new MonitorApi(); + private final String executorName = "rpc-full-executor"; + @Override public void init() { @@ -221,7 +223,8 @@ public void start() { if (parameter.getRpcThreadNum() > 0) { serverBuilder = serverBuilder - .executor(Executors.newFixedThreadPool(parameter.getRpcThreadNum())); + .executor(ExecutorServiceManager.newFixedThreadPool( + executorName, parameter.getRpcThreadNum())); } if (parameter.isSolidityNode()) { diff --git a/framework/src/main/java/org/tron/core/services/interfaceOnPBFT/RpcApiServiceOnPBFT.java b/framework/src/main/java/org/tron/core/services/interfaceOnPBFT/RpcApiServiceOnPBFT.java index c003f9d3994..2457e5fc891 100755 --- a/framework/src/main/java/org/tron/core/services/interfaceOnPBFT/RpcApiServiceOnPBFT.java +++ b/framework/src/main/java/org/tron/core/services/interfaceOnPBFT/RpcApiServiceOnPBFT.java @@ -4,7 +4,6 @@ import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -38,6 +37,7 @@ import org.tron.api.WalletSolidityGrpc.WalletSolidityImplBase; import org.tron.common.application.Service; import org.tron.common.crypto.ECKey; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.utils.StringUtil; import org.tron.common.utils.Utils; @@ -85,6 +85,8 @@ public class RpcApiServiceOnPBFT implements Service { @Autowired private RpcApiAccessInterceptor apiAccessInterceptor; + private final String executorName = "rpc-pbft-executor"; + @Override public void init() { } @@ -104,7 +106,8 @@ public void start() { if (args.getRpcThreadNum() > 0) { serverBuilder = serverBuilder - .executor(Executors.newFixedThreadPool(args.getRpcThreadNum())); + .executor(ExecutorServiceManager.newFixedThreadPool( + executorName, args.getRpcThreadNum())); } serverBuilder = serverBuilder.addService(new WalletPBFTApi()); diff --git a/framework/src/main/java/org/tron/core/services/interfaceOnSolidity/RpcApiServiceOnSolidity.java b/framework/src/main/java/org/tron/core/services/interfaceOnSolidity/RpcApiServiceOnSolidity.java index 3f4bfeda731..5bf6f1846e5 100755 --- a/framework/src/main/java/org/tron/core/services/interfaceOnSolidity/RpcApiServiceOnSolidity.java +++ b/framework/src/main/java/org/tron/core/services/interfaceOnSolidity/RpcApiServiceOnSolidity.java @@ -5,7 +5,6 @@ import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -38,6 +37,7 @@ import org.tron.common.application.Service; import org.tron.common.crypto.SignInterface; import org.tron.common.crypto.SignUtils; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; import org.tron.common.utils.Sha256Hash; import org.tron.common.utils.StringUtil; @@ -87,6 +87,8 @@ public class RpcApiServiceOnSolidity implements Service { @Autowired private RpcApiAccessInterceptor apiAccessInterceptor; + private final String executorName = "rpc-solidity-executor"; + @Override public void init() { } @@ -105,7 +107,8 @@ public void start() { if (parameter.getRpcThreadNum() > 0) { serverBuilder = serverBuilder - .executor(Executors.newFixedThreadPool(parameter.getRpcThreadNum())); + .executor(ExecutorServiceManager.newFixedThreadPool( + executorName, parameter.getRpcThreadNum())); } serverBuilder = serverBuilder.addService(new WalletSolidityApi()); diff --git a/framework/src/main/java/org/tron/core/services/jsonrpc/JsonRpcServlet.java b/framework/src/main/java/org/tron/core/services/jsonrpc/JsonRpcServlet.java index 61d163a3e8a..878b71d86b5 100644 --- a/framework/src/main/java/org/tron/core/services/jsonrpc/JsonRpcServlet.java +++ b/framework/src/main/java/org/tron/core/services/jsonrpc/JsonRpcServlet.java @@ -26,11 +26,7 @@ public class JsonRpcServlet extends RateLimiterServlet { private JsonRpcServer rpcServer = null; @Autowired - private NodeInfoService nodeInfoService; - @Autowired - private Wallet wallet; - @Autowired - private Manager manager; + private TronJsonRpc tronJsonRpc; @Autowired private JsonRpcInterceptor interceptor; @@ -40,10 +36,9 @@ public void init(ServletConfig config) throws ServletException { super.init(config); ClassLoader cl = Thread.currentThread().getContextClassLoader(); - TronJsonRpcImpl jsonRpcImpl = new TronJsonRpcImpl(nodeInfoService, wallet, manager); Object compositeService = ProxyUtil.createCompositeServiceProxy( cl, - new Object[] {jsonRpcImpl}, + new Object[] {tronJsonRpc}, new Class[] {TronJsonRpc.class}, true); diff --git a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java index 9b9dbbdfe70..0ca57a3b98c 100644 --- a/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java +++ b/framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java @@ -13,6 +13,8 @@ import com.alibaba.fastjson.JSON; import com.google.protobuf.ByteString; import com.google.protobuf.GeneratedMessageV3; +import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -22,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.regex.Matcher; import java.util.regex.Pattern; import lombok.Getter; @@ -30,12 +31,15 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.bouncycastle.util.encoders.Hex; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import org.tron.api.GrpcAPI.BytesMessage; import org.tron.api.GrpcAPI.EstimateEnergyMessage; import org.tron.api.GrpcAPI.Return; import org.tron.api.GrpcAPI.Return.response_code; import org.tron.api.GrpcAPI.TransactionExtention; import org.tron.common.crypto.Hash; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.logsfilter.ContractEventParser; import org.tron.common.logsfilter.capsule.BlockFilterCapsule; import org.tron.common.logsfilter.capsule.LogsFilterCapsule; @@ -91,7 +95,8 @@ import org.tron.protos.contract.SmartContractOuterClass.TriggerSmartContract; @Slf4j(topic = "API") -public class TronJsonRpcImpl implements TronJsonRpc { +@Component +public class TronJsonRpcImpl implements TronJsonRpc, Closeable { public enum RequestSource { FULLNODE, @@ -148,12 +153,15 @@ public enum RequestSource { private final NodeInfoService nodeInfoService; private final Wallet wallet; private final Manager manager; + private final String esName = "query-section"; - public TronJsonRpcImpl(NodeInfoService nodeInfoService, Wallet wallet, Manager manager) { + @Autowired + public TronJsonRpcImpl(@Autowired NodeInfoService nodeInfoService, @Autowired Wallet wallet, + @Autowired Manager manager) { this.nodeInfoService = nodeInfoService; this.wallet = wallet; this.manager = manager; - this.sectionExecutor = Executors.newFixedThreadPool(5); + this.sectionExecutor = ExecutorServiceManager.newFixedThreadPool(esName, 5); } public static void handleBLockFilter(BlockFilterCapsule blockFilterCapsule) { @@ -1370,4 +1378,9 @@ public static Object[] getFilterResult(String filterId, Map cache, byte[] root) { public static ExecutorService getExecutor() { if (executor == null) { - executor = Executors.newFixedThreadPool(4, - new ThreadFactoryBuilder().setNameFormat("trie-calc-thread-%d").build()); + executor = ExecutorServiceManager.newFixedThreadPool("trie-calc", 4); } return executor; }