diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java index aaba4b8d0bac..1f70ff44606d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java @@ -113,7 +113,7 @@ public void send(TsBlock tsBlock) { if (queue.hasNoMoreTsBlocks()) { return; } - logger.info("send TsBlocks"); + logger.info("[StartSendTsBlockOnLocal]"); synchronized (this) { blocked = queue.add(tsBlock); } @@ -129,9 +129,8 @@ public synchronized void send(int partition, List tsBlocks) { public void setNoMoreTsBlocks() { synchronized (queue) { synchronized (this) { - logger.info("set noMoreTsBlocks."); + logger.info("[StartSetNoMoreTsBlocksOnLocal]"); if (aborted || closed) { - logger.info("SinkHandle has been aborted={} or closed={}.", aborted, closed); return; } queue.setNoMoreTsBlocks(true); @@ -139,12 +138,12 @@ public void setNoMoreTsBlocks() { } } checkAndInvokeOnFinished(); - logger.info("noMoreTsBlocks has been set."); + logger.info("[EndSetNoMoreTsBlocksOnLocal]"); } @Override public void abort() { - logger.info("Sink handle is being aborted."); + logger.info("[StartAbortLocalSinkHandle]"); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -155,12 +154,12 @@ public void abort() { sinkHandleListener.onAborted(this); } } - logger.info("Sink handle is aborted"); + logger.info("[EndAbortLocalSinkHandle]"); } @Override public void close() { - logger.info("Sink handle is being closed."); + logger.info("[StartCloseLocalSinkHandle]"); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -171,7 +170,7 @@ public void close() { sinkHandleListener.onFinish(this); } } - logger.info("Sink handle is closed"); + logger.info("[EndCloseLocalSinkHandle]"); } public TFragmentInstanceId getRemoteFragmentInstanceId() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java index 35c7cda82372..6c17791efd78 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java @@ -20,11 +20,11 @@ package org.apache.iotdb.db.mpp.execution.exchange; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +61,7 @@ public LocalSourceHandle( this.queue = Validate.notNull(queue); this.queue.setSourceHandle(this); this.sourceHandleListener = Validate.notNull(sourceHandleListener); - this.threadName = - createFullIdFrom(localFragmentInstanceId, localPlanNodeId + "." + "SourceHandle"); + this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId); } @Override @@ -95,7 +94,9 @@ public TsBlock receive() { if (tsBlock != null) { currSequenceId++; logger.info( - "Receive {} TsdBlock, size is {}", currSequenceId, tsBlock.getRetainedSizeInBytes()); + "[GetTsBlockFromQueue] TsBlock:{} size:{}", + currSequenceId, + tsBlock.getRetainedSizeInBytes()); } checkAndInvokeOnFinished(); return tsBlock; @@ -139,7 +140,7 @@ public void abort() { return; } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("Source handle is being aborted."); + logger.info("[StartAbortLocalSourceHandle]"); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -150,7 +151,7 @@ public void abort() { sourceHandleListener.onAborted(this); } } - logger.info("Source handle is aborted"); + logger.info("[EndAbortLocalSourceHandle]"); } } @@ -160,7 +161,7 @@ public void close() { return; } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("Source handle is being closed."); + logger.info("[StartCloseLocalSourceHandle]"); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -171,7 +172,7 @@ public void close() { sourceHandleListener.onFinished(this); } } - logger.info("Source handle is closed"); + logger.info("[EndCloseLocalSourceHandle]"); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index e14672ec8824..628fe7ebb602 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService; import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent; import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent; @@ -33,7 +34,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent; import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde; -import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.Validate; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -78,12 +78,15 @@ class MPPDataExchangeServiceImpl implements MPPDataExchangeService.Iface { @Override public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req) throws TException { try (SetThreadName fragmentInstanceName = - new SetThreadName(createFullIdFrom(req.sourceFragmentInstanceId, "SinkHandle"))) { + new SetThreadName( + createFullId( + req.sourceFragmentInstanceId.queryId, + req.sourceFragmentInstanceId.fragmentId, + req.sourceFragmentInstanceId.instanceId))) { logger.debug( - "Get data block request received, for data blocks whose sequence ID in [{}, {}) from {}.", + "[ProcessGetTsBlockRequest] sequence ID in [{}, {})", req.getStartSequenceId(), - req.getEndSequenceId(), - req.getSourceFragmentInstanceId()); + req.getEndSequenceId()); if (!sinkHandles.containsKey(req.getSourceFragmentInstanceId())) { throw new TException( "Source fragment instance not found. Fragment instance ID: " @@ -105,9 +108,13 @@ public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req) throws TExce } @Override - public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) throws TException { + public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) { try (SetThreadName fragmentInstanceName = - new SetThreadName(createFullIdFrom(e.sourceFragmentInstanceId, "SinkHandle"))) { + new SetThreadName( + createFullId( + e.sourceFragmentInstanceId.queryId, + e.sourceFragmentInstanceId.fragmentId, + e.sourceFragmentInstanceId.instanceId))) { logger.debug( "Acknowledge data block event received, for data blocks whose sequence ID in [{}, {}) from {}.", e.getStartSequenceId(), @@ -131,8 +138,7 @@ public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) throws TEx @Override public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException { try (SetThreadName fragmentInstanceName = - new SetThreadName( - createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId + ".SourceHandle"))) { + new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) { logger.debug( "New data block event received, for plan node {} of {} from {}.", e.getTargetPlanNodeId(), @@ -170,8 +176,7 @@ public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException { @Override public void onEndOfDataBlockEvent(TEndOfDataBlockEvent e) throws TException { try (SetThreadName fragmentInstanceName = - new SetThreadName( - createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId + ".SourceHandle"))) { + new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) { logger.debug( "End of data block event received, for plan node {} of {} from {}.", e.getTargetPlanNodeId(), @@ -215,12 +220,12 @@ public SourceHandleListenerImpl(IMPPDataExchangeManagerCallback onFai @Override public void onFinished(ISourceHandle sourceHandle) { - logger.info("finished and release resources"); + logger.info("[ScHListenerOnFinish]"); if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) || !sourceHandles .get(sourceHandle.getLocalFragmentInstanceId()) .containsKey(sourceHandle.getLocalPlanNodeId())) { - logger.info("resources has already been released"); + logger.warn("[ScHListenerAlreadyReleased]"); } else { sourceHandles .get(sourceHandle.getLocalFragmentInstanceId()) @@ -234,7 +239,7 @@ public void onFinished(ISourceHandle sourceHandle) { @Override public void onAborted(ISourceHandle sourceHandle) { - logger.info("onAborted is invoked"); + logger.info("[ScHListenerOnAbort]"); onFinished(sourceHandle); } @@ -262,28 +267,29 @@ public SinkHandleListenerImpl( @Override public void onFinish(ISinkHandle sinkHandle) { - logger.info("onFinish is invoked"); + logger.info("[SkHListenerOnFinish]"); removeFromMPPDataExchangeManager(sinkHandle); context.finished(); } @Override public void onEndOfBlocks(ISinkHandle sinkHandle) { + logger.info("[SkHListenerOnEndOfTsBlocks]"); context.transitionToFlushing(); } @Override public void onAborted(ISinkHandle sinkHandle) { - logger.info("onAborted is invoked"); + logger.info("[SkHListenerOnAbort]"); removeFromMPPDataExchangeManager(sinkHandle); } private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) { - logger.info("release resources of finished sink handle"); - if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) { - logger.info("resources already been released"); + if (sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()) == null) { + logger.warn("[RemoveNoSinkHandle]"); + } else { + logger.info("[RemoveSinkHandle]"); } - sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()); } @Override @@ -493,7 +499,7 @@ public ISourceHandle createSourceHandle( *

This method should be called when a fragment instance finished in an abnormal state. */ public void forceDeregisterFragmentInstance(TFragmentInstanceId fragmentInstanceId) { - logger.info("Force deregister fragment instance"); + logger.info("[StartForceReleaseFIDataExchangeResource]"); if (sinkHandles.containsKey(fragmentInstanceId)) { ISinkHandle sinkHandle = sinkHandles.get(fragmentInstanceId); sinkHandle.abort(); @@ -502,11 +508,12 @@ public void forceDeregisterFragmentInstance(TFragmentInstanceId fragmentInstance if (sourceHandles.containsKey(fragmentInstanceId)) { Map planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId); for (Entry entry : planNodeIdToSourceHandle.entrySet()) { - logger.info("Close source handle {}", sourceHandles); + logger.info("[CloseSourceHandle] {}", entry.getKey()); entry.getValue().abort(); } sourceHandles.remove(fragmentInstanceId); } + logger.info("[EndForceReleaseFIDataExchangeResource]"); } /** @param suffix should be like [PlanNodeId].SourceHandle/SinHandle */ diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index 1222ee5ef094..ed044814a2a6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java @@ -96,7 +96,7 @@ public void setSourceHandle(LocalSourceHandle sourceHandle) { /** Notify no more tsblocks will be added to the queue. */ public void setNoMoreTsBlocks(boolean noMoreTsBlocks) { - logger.info("SharedTsBlockQueue receive no more TsBlocks signal."); + logger.info("[SignalNoMoreTsBlockOnQueue]"); if (closed) { logger.warn("queue has been destroyed"); return; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 703a04d7606b..387dc6467921 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent; @@ -34,7 +35,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +48,7 @@ import java.util.concurrent.ExecutorService; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; -import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom; +import static org.apache.iotdb.db.mpp.common.FragmentInstanceId.createFullId; import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; public class SinkHandle implements ISinkHandle { @@ -113,7 +113,11 @@ public SinkHandle( this.sinkHandleListener = Validate.notNull(sinkHandleListener); this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; - this.threadName = createFullIdFrom(localFragmentInstanceId, "SinkHandle"); + this.threadName = + createFullId( + localFragmentInstanceId.queryId, + localFragmentInstanceId.fragmentId, + localFragmentInstanceId.instanceId); this.blocked = localMemoryManager .getQueryPool() @@ -167,7 +171,7 @@ public synchronized void send(int partition, List tsBlocks) { } private void sendEndOfDataBlockEvent() throws Exception { - logger.info("send end of data block event"); + logger.info("[NotifyNoMoreTsBlock]"); int attempt = 0; TEndOfDataBlockEvent endOfDataBlockEvent = new TEndOfDataBlockEvent( @@ -193,7 +197,7 @@ private void sendEndOfDataBlockEvent() throws Exception { @Override public synchronized void setNoMoreTsBlocks() { - logger.info("start to set no-more-tsblocks"); + logger.info("[StartSetNoMoreTsBlocks]"); if (aborted || closed) { return; } @@ -202,20 +206,17 @@ public synchronized void setNoMoreTsBlocks() { } catch (Exception e) { throw new RuntimeException("Send EndOfDataBlockEvent failed", e); } - logger.info("set noMoreTsBlocks to true"); noMoreTsBlocks = true; if (isFinished()) { - logger.info("revoke onFinish() of sinkHandleListener"); sinkHandleListener.onFinish(this); } - logger.info("revoke onEndOfBlocks() of sinkHandleListener"); sinkHandleListener.onEndOfBlocks(this); } @Override public synchronized void abort() { - logger.info("SinkHandle is being aborted."); + logger.info("[StartAbortSinkHandle]"); sequenceIdToTsBlock.clear(); aborted = true; bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked); @@ -226,12 +227,12 @@ public synchronized void abort() { bufferRetainedSizeInBytes = 0; } sinkHandleListener.onAborted(this); - logger.info("SinkHandle is aborted"); + logger.info("[EndAbortSinkHandle]"); } @Override public synchronized void close() { - logger.info("SinkHandle is being closed."); + logger.info("[StartCloseSinkHandle]"); sequenceIdToTsBlock.clear(); closed = true; bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked); @@ -242,7 +243,7 @@ public synchronized void close() { bufferRetainedSizeInBytes = 0; } sinkHandleListener.onFinish(this); - logger.info("SinkHandle is closed"); + logger.info("[EndCloseSinkHandle]"); } @Override @@ -307,7 +308,7 @@ void acknowledgeTsBlock(int startSequenceId, int endSequenceId) { freedBytes += entry.getValue().right; bufferRetainedSizeInBytes -= entry.getValue().right; iterator.remove(); - logger.info("ack TsBlock {}.", entry.getKey()); + logger.info("[ACKTsBlock] {}.", entry.getKey()); } } if (isFinished()) { @@ -381,9 +382,7 @@ class SendNewDataBlockEventTask implements Runnable { public void run() { try (SetThreadName sinkHandleName = new SetThreadName(threadName)) { logger.info( - "Send new data block event [{}, {})", - startSequenceId, - startSequenceId + blockSizes.size()); + "[NotifyNewTsBlock] [{}, {})", startSequenceId, startSequenceId + blockSizes.size()); int attempt = 0; TNewDataBlockEvent newDataBlockEvent = new TNewDataBlockEvent( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index bd95291eb98c..0b4f2c4a11a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest; @@ -35,7 +36,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,8 +111,7 @@ public SourceHandle( this.bufferRetainedSizeInBytes = 0L; this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; - this.threadName = - createFullIdFrom(localFragmentInstanceId, localPlanNodeId + "." + "SourceHandle"); + this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId); } @Override @@ -130,13 +129,13 @@ public synchronized TsBlock receive() { return null; } long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId); - logger.info("Receive {} TsBlock, size is {}", currSequenceId, retainedSize); + logger.info("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", currSequenceId, retainedSize); currSequenceId += 1; bufferRetainedSizeInBytes -= retainedSize; localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), retainedSize); if (sequenceIdToTsBlock.isEmpty() && !isFinished()) { - logger.info("no buffered TsBlock, blocked"); + logger.info("[WaitForMoreTsBlock]"); blocked = SettableFuture.create(); } if (isFinished()) { @@ -148,70 +147,67 @@ public synchronized TsBlock receive() { } private synchronized void trySubmitGetDataBlocksTask() { - try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - if (aborted || closed) { - return; - } - if (blockedOnMemory != null && !blockedOnMemory.isDone()) { - return; - } - - final int startSequenceId = nextSequenceId; - int endSequenceId = nextSequenceId; - long reservedBytes = 0L; - Pair, Boolean> pair = null; - long blockedSize = 0L; - while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) { - Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId); - if (bytesToReserve == null) { - throw new IllegalStateException("Data block size is null."); - } - pair = - localMemoryManager - .getQueryPool() - .reserve(localFragmentInstanceId.getQueryId(), bytesToReserve); - bufferRetainedSizeInBytes += bytesToReserve; - endSequenceId += 1; - reservedBytes += bytesToReserve; - if (!pair.right) { - blockedSize = bytesToReserve; - break; - } - } + if (aborted || closed) { + return; + } + if (blockedOnMemory != null && !blockedOnMemory.isDone()) { + return; + } - if (pair == null) { - // Next data block not generated yet. Do nothing. - return; + final int startSequenceId = nextSequenceId; + int endSequenceId = nextSequenceId; + long reservedBytes = 0L; + Pair, Boolean> pair = null; + long blockedSize = 0L; + while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) { + Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId); + if (bytesToReserve == null) { + throw new IllegalStateException("Data block size is null."); } - nextSequenceId = endSequenceId; - + pair = + localMemoryManager + .getQueryPool() + .reserve(localFragmentInstanceId.getQueryId(), bytesToReserve); + bufferRetainedSizeInBytes += bytesToReserve; + endSequenceId += 1; + reservedBytes += bytesToReserve; if (!pair.right) { - endSequenceId--; - reservedBytes -= blockedSize; - // The future being not completed indicates, - // 1. Memory has been reserved for blocks in [startSequenceId, endSequenceId). - // 2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked. - // 3. Have not reserve memory for the rest of blocks. - // - // startSequenceId endSequenceId - 1 endSequenceId - // |-------- reserved --------|--- blocked ---|--- not reserved ---| - - // Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks. - blockedOnMemory = pair.left; - final int blockedSequenceId = endSequenceId; - final long blockedRetainedSize = blockedSize; - blockedOnMemory.addListener( - () -> - executorService.submit( - new GetDataBlocksTask( - blockedSequenceId, blockedSequenceId + 1, blockedRetainedSize)), - executorService); + blockedSize = bytesToReserve; + break; } + } - if (endSequenceId > startSequenceId) { - executorService.submit( - new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes)); - } + if (pair == null) { + // Next data block not generated yet. Do nothing. + return; + } + nextSequenceId = endSequenceId; + + if (!pair.right) { + endSequenceId--; + reservedBytes -= blockedSize; + // The future being not completed indicates, + // 1. Memory has been reserved for blocks in [startSequenceId, endSequenceId). + // 2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked. + // 3. Have not reserve memory for the rest of blocks. + // + // startSequenceId endSequenceId - 1 endSequenceId + // |-------- reserved --------|--- blocked ---|--- not reserved ---| + + // Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks. + blockedOnMemory = pair.left; + final int blockedSequenceId = endSequenceId; + final long blockedRetainedSize = blockedSize; + blockedOnMemory.addListener( + () -> + executorService.submit( + new GetDataBlocksTask( + blockedSequenceId, blockedSequenceId + 1, blockedRetainedSize)), + executorService); + } + + if (endSequenceId > startSequenceId) { + executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes)); } } @@ -222,7 +218,7 @@ public synchronized ListenableFuture isBlocked() { } synchronized void setNoMoreTsBlocks(int lastSequenceId) { - logger.info("receive NoMoreTsBlock event. "); + logger.info("[ReceiveNoMoreTsBlockEvent]"); this.lastSequenceId = lastSequenceId; if (!blocked.isDone() && remoteTsBlockedConsumedUp()) { blocked.set(null); @@ -234,7 +230,7 @@ synchronized void setNoMoreTsBlocks(int lastSequenceId) { synchronized void updatePendingDataBlockInfo(int startSequenceId, List dataBlockSizes) { logger.info( - "receive newDataBlockEvent. [{}, {}), each size is: {}", + "[ReceiveNewTsBlockNotification] [{}, {}), each size is: {}", startSequenceId, startSequenceId + dataBlockSizes.size(), dataBlockSizes); @@ -380,7 +376,7 @@ class GetDataBlocksTask implements Runnable { @Override public void run() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("try to get data blocks [{}, {}) ", startSequenceId, endSequenceId); + logger.info("[StartPullTsBlocksFromRemote] [{}, {}) ", startSequenceId, endSequenceId); TGetDataBlockRequest req = new TGetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId); int attempt = 0; @@ -394,7 +390,7 @@ public void run() { TsBlock tsBlock = serde.deserialize(byteBuffer); tsBlocks.add(tsBlock); } - logger.info("got data blocks. count: {}", tsBlocks.size()); + logger.info("[EndPullTsBlocksFromRemote] Count:{}", tsBlocks.size()); executorService.submit( new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId)); synchronized (SourceHandle.this) { @@ -404,6 +400,7 @@ public void run() { for (int i = startSequenceId; i < endSequenceId; i++) { sequenceIdToTsBlock.put(i, tsBlocks.get(i - startSequenceId)); } + logger.info("[PutTsBlocksIntoBuffer]"); if (!blocked.isDone()) { blocked.set(null); } @@ -452,7 +449,7 @@ public SendAcknowledgeDataBlockEventTask(int startSequenceId, int endSequenceId) @Override public void run() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("send ack data block event [{}, {}).", startSequenceId, endSequenceId); + logger.info("[SendACKTsBlock] [{}, {}).", startSequenceId, endSequenceId); int attempt = 0; TAcknowledgeDataBlockEvent acknowledgeDataBlockEvent = new TAcknowledgeDataBlockEvent( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java index df99f50d5839..678f5cadce24 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java @@ -22,9 +22,9 @@ import org.apache.iotdb.db.mpp.execution.driver.IDriver; import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle; import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler; +import org.apache.iotdb.db.utils.SetThreadName; import com.google.common.collect.ImmutableList; -import io.airlift.concurrent.SetThreadName; import io.airlift.stats.CounterStat; import static java.util.Objects.requireNonNull; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index f27aedb2173d..16bf91ba364a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -30,8 +30,8 @@ import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.utils.SetThreadName; -import io.airlift.concurrent.SetThreadName; import io.airlift.stats.CounterStat; import io.airlift.units.Duration; import org.slf4j.Logger; @@ -186,7 +186,7 @@ public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentIns /** Cancels a FragmentInstance. */ public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) { - logger.debug("cancelTask"); + logger.debug("[CancelFI]"); requireNonNull(instanceId, "taskId is null"); FragmentInstanceContext context = instanceContext.remove(instanceId); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java index 699777d8f0f2..d19e2c7f35a1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java @@ -21,11 +21,11 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.StateMachine; import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener; +import org.apache.iotdb.db.utils.SetThreadName; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +78,7 @@ public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, Execu instanceState.addStateChangeListener( newState -> { try (SetThreadName threadName = new SetThreadName(fragmentInstanceId.getFullId())) { - LOGGER.info("State transfer to {}", newState); + LOGGER.info("[StateChanged] To {}", newState); } }); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java index c5fc410f4c7e..148b688e0db7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java @@ -20,8 +20,8 @@ import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask; +import org.apache.iotdb.db.utils.SetThreadName; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +63,7 @@ public void run() { new SetThreadName(next.getFragmentInstance().getInfo().getFullId())) { execute(next); } catch (Throwable t) { - logger.error("execute failed", t); + logger.error("[ExecuteFailed]", t); if (next != null) { next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED); scheduler.toAborted(next); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java index f3d33b3df20c..4416cb84c961 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java @@ -34,9 +34,9 @@ import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskID; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,13 +93,13 @@ private DriverScheduler() { public void start() throws StartupException { for (int i = 0; i < WORKER_THREAD_NUM; i++) { AbstractDriverThread t = - new DriverTaskThread("Worker-Thread-" + i, workerGroups, readyQueue, scheduler); + new DriverTaskThread("Query-Worker-Thread-" + i, workerGroups, readyQueue, scheduler); threads.add(t); t.start(); } AbstractDriverThread t = new DriverTaskTimeoutSentinelThread( - "Sentinel-Thread", workerGroups, timeoutQueue, scheduler); + "Query-Sentinel-Thread", workerGroups, timeoutQueue, scheduler); threads.add(t); t.start(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java index e56560d0ac74..52b890797e06 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java @@ -23,10 +23,10 @@ import org.apache.iotdb.db.mpp.execution.driver.IDriver; import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.db.utils.stats.CpuTimer; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.SetThreadName; import io.airlift.units.Duration; import java.util.concurrent.Executor; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java index d4f8ac8e9a1e..4dfd933b40b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java @@ -38,8 +38,8 @@ import org.apache.iotdb.db.mpp.plan.execution.config.ConfigExecution; import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement; import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.utils.SetThreadName; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,7 +120,7 @@ public ExecutionResult execute( QueryId globalQueryId = queryIdGenerator.createNextQueryId(); try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) { if (sql != null && sql.length() > 0) { - LOGGER.info("start executing sql: {}", sql); + LOGGER.info("[QueryStart] sql: {}", sql); } MPPQueryContext queryContext = new MPPQueryContext( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index a5dace95a88e..47ab3a348d15 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java @@ -137,10 +137,6 @@ public AnalyzeVisitor( this.schemaFetcher = schemaFetcher; } - private String getLogHeader() { - return String.format("Query[%s]:", context.getQueryId()); - } - @Override public Analysis visitNode(StatementNode node, MPPQueryContext context) { throw new UnsupportedOperationException( @@ -169,9 +165,9 @@ public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext contex analysis.setStatement(queryStatement); // request schema fetch API - logger.info("{} fetch query schema...", getLogHeader()); + logger.info("[StartFetchSchema]"); ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); - logger.info("{} fetch schema done", getLogHeader()); + logger.info("[EndFetchSchema]"); // If there is no leaf node in the schema tree, the query should be completed immediately if (schemaTree.isEmpty()) { if (queryStatement.isLastQuery()) { @@ -1355,9 +1351,9 @@ public Analysis visitShowTimeSeries( if (showTimeSeriesStatement.isOrderByHeat()) { patternTree.constructTree(); // request schema fetch API - logger.info("{} fetch query schema...", getLogHeader()); + logger.info("[StartFetchSchema]"); ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); - logger.info("{} fetch schema done", getLogHeader()); + logger.info("[EndFetchSchema]]"); List allSelectedPath = schemaTree.getAllMeasurement(); Set sourceExpressions = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java index 567030ff5194..9697bf29e26b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java @@ -22,13 +22,8 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.plan.statement.Statement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** Analyze the statement and generate Analysis. */ public class Analyzer { - private static final Logger logger = LoggerFactory.getLogger(Analyzer.class); - private final MPPQueryContext context; private final IPartitionFetcher partitionFetcher; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java index 77ff8e9abb21..c3dbb79aeaba 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement; import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -54,8 +55,6 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import io.airlift.concurrent.SetThreadName; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index e9d6857ae10f..48607b996f08 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -56,13 +56,13 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +153,7 @@ public QueryExecution( if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) { - logger.info("release resource because Query State is: {}", state); + logger.info("[ReleaseQueryResource] state is: {}", state); releaseResource(); } } @@ -162,7 +162,7 @@ public QueryExecution( public void start() { if (skipExecute()) { - logger.info("execution of query will be skipped. Transit to RUNNING immediately."); + logger.info("[SkipExecute]"); constructResultForMemorySource(); stateMachine.transitionToRunning(); return; @@ -184,14 +184,14 @@ public void start() { private ExecutionResult retry() { if (retryCount >= MAX_RETRY_COUNT) { - logger.error("reach max retry count. transit query to failed"); + logger.warn("[ReachMaxRetryCount]"); stateMachine.transitionToFailed(); return getStatus(); } logger.warn("error when executing query. {}", stateMachine.getFailureMessage()); // stop and clean up resources the QueryExecution used this.stopAndCleanup(); - logger.info("wait {}ms before retry...", RETRY_INTERVAL_IN_MS); + logger.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS); try { Thread.sleep(RETRY_INTERVAL_IN_MS); } catch (InterruptedException e) { @@ -199,7 +199,7 @@ private ExecutionResult retry() { Thread.currentThread().interrupt(); } retryCount++; - logger.info("start to retry. Retry count is: {}", retryCount); + logger.info("[Retry] retry count is: {}", retryCount); stateMachine.transitionToQueued(); // force invalid PartitionCache partitionFetcher.invalidAllCache(); @@ -326,7 +326,7 @@ public Optional getBatchResult() throws IoTDBException { while (true) { try { if (resultHandle.isAborted()) { - logger.info("resultHandle for client is aborted"); + logger.warn("[ResultHandleAborted]"); stateMachine.transitionToAborted(); if (stateMachine.getFailureStatus() != null) { throw new IoTDBException( @@ -339,7 +339,7 @@ public Optional getBatchResult() throws IoTDBException { // Once the resultHandle is finished, we should transit the state of this query to // FINISHED. // So that the corresponding cleanup work could be triggered. - logger.info("resultHandle for client is finished"); + logger.info("[ResultHandleFinished]"); stateMachine.transitionToFinished(); return Optional.empty(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index f7ba31d0d913..cc0d7e7dfd52 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -32,10 +32,10 @@ import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.read.filter.basic.Filter; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +134,7 @@ private void checkMemory(Operator root, FragmentInstanceStateMachine stateMachin freeMemoryForOperators -= estimatedMemorySize; LOGGER.info( String.format( - "consume memory: %d, current remaining memory: %d", + "[ConsumeMemory] consume: %d, current remaining memory: %d", estimatedMemorySize, freeMemoryForOperators)); } } @@ -148,7 +148,7 @@ private void checkMemory(Operator root, FragmentInstanceStateMachine stateMachin this.freeMemoryForOperators += estimatedMemorySize; LOGGER.info( String.format( - "release memory: %d, current remaining memory: %d", + "[ReleaseMemory] release: %d, current remaining memory: %d", estimatedMemorySize, freeMemoryForOperators)); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java index 08563d93619f..5c50b80e9b26 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java @@ -27,8 +27,8 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.utils.SetThreadName; -import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +100,7 @@ private void fetchStateAndUpdate() { instanceStateMap.computeIfAbsent( instance.getId(), k -> new InstanceStateMetrics(instance.isRoot())); if (needPrintState(metrics.lastState, state, metrics.durationToLastPrintInMS)) { - logger.info("State is {}", state); + logger.info("[PrintFIState] state is {}", state); metrics.reset(state); } else { metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index e2b5a885914f..e1d01c975340 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TPlanNode; import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq; @@ -48,7 +49,6 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +109,7 @@ private Future dispatchRead(List i } catch (FragmentInstanceDispatchException e) { return new FragInstanceDispatchResult(e.getFailureStatus()); } catch (Throwable t) { - logger.error("cannot dispatch FI for read operation", t); + logger.error("[DispatchFailed]", t); return new FragInstanceDispatchResult( RpcUtils.getStatus( TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())); @@ -126,7 +126,7 @@ private Future dispatchWriteSync(List