Skip to content

Commit

Permalink
Add log event and change Thread Name Format (apache#7307)
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 authored Sep 15, 2022
1 parent ca37376 commit 8c42eed
Show file tree
Hide file tree
Showing 25 changed files with 210 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void send(TsBlock tsBlock) {
if (queue.hasNoMoreTsBlocks()) {
return;
}
logger.info("send TsBlocks");
logger.info("[StartSendTsBlockOnLocal]");
synchronized (this) {
blocked = queue.add(tsBlock);
}
Expand All @@ -129,22 +129,21 @@ public synchronized void send(int partition, List<TsBlock> tsBlocks) {
public void setNoMoreTsBlocks() {
synchronized (queue) {
synchronized (this) {
logger.info("set noMoreTsBlocks.");
logger.info("[StartSetNoMoreTsBlocksOnLocal]");
if (aborted || closed) {
logger.info("SinkHandle has been aborted={} or closed={}.", aborted, closed);
return;
}
queue.setNoMoreTsBlocks(true);
sinkHandleListener.onEndOfBlocks(this);
}
}
checkAndInvokeOnFinished();
logger.info("noMoreTsBlocks has been set.");
logger.info("[EndSetNoMoreTsBlocksOnLocal]");
}

@Override
public void abort() {
logger.info("Sink handle is being aborted.");
logger.info("[StartAbortLocalSinkHandle]");
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
Expand All @@ -155,12 +154,12 @@ public void abort() {
sinkHandleListener.onAborted(this);
}
}
logger.info("Sink handle is aborted");
logger.info("[EndAbortLocalSinkHandle]");
}

@Override
public void close() {
logger.info("Sink handle is being closed.");
logger.info("[StartCloseLocalSinkHandle]");
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
Expand All @@ -171,7 +170,7 @@ public void close() {
sinkHandleListener.onFinish(this);
}
}
logger.info("Sink handle is closed");
logger.info("[EndCloseLocalSinkHandle]");
}

public TFragmentInstanceId getRemoteFragmentInstanceId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package org.apache.iotdb.db.mpp.execution.exchange;

import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;

import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.SetThreadName;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -61,8 +61,7 @@ public LocalSourceHandle(
this.queue = Validate.notNull(queue);
this.queue.setSourceHandle(this);
this.sourceHandleListener = Validate.notNull(sourceHandleListener);
this.threadName =
createFullIdFrom(localFragmentInstanceId, localPlanNodeId + "." + "SourceHandle");
this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId);
}

@Override
Expand Down Expand Up @@ -95,7 +94,9 @@ public TsBlock receive() {
if (tsBlock != null) {
currSequenceId++;
logger.info(
"Receive {} TsdBlock, size is {}", currSequenceId, tsBlock.getRetainedSizeInBytes());
"[GetTsBlockFromQueue] TsBlock:{} size:{}",
currSequenceId,
tsBlock.getRetainedSizeInBytes());
}
checkAndInvokeOnFinished();
return tsBlock;
Expand Down Expand Up @@ -139,7 +140,7 @@ public void abort() {
return;
}
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
logger.info("Source handle is being aborted.");
logger.info("[StartAbortLocalSourceHandle]");
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
Expand All @@ -150,7 +151,7 @@ public void abort() {
sourceHandleListener.onAborted(this);
}
}
logger.info("Source handle is aborted");
logger.info("[EndAbortLocalSourceHandle]");
}
}

Expand All @@ -160,7 +161,7 @@ public void close() {
return;
}
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
logger.info("Source handle is being closed.");
logger.info("[StartCloseLocalSourceHandle]");
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
Expand All @@ -171,7 +172,7 @@ public void close() {
sourceHandleListener.onFinished(this);
}
}
logger.info("Source handle is closed");
logger.info("[EndCloseLocalSourceHandle]");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
Expand All @@ -33,7 +34,6 @@
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;

import io.airlift.concurrent.SetThreadName;
import org.apache.commons.lang3.Validate;
import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -78,12 +78,15 @@ class MPPDataExchangeServiceImpl implements MPPDataExchangeService.Iface {
@Override
public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req) throws TException {
try (SetThreadName fragmentInstanceName =
new SetThreadName(createFullIdFrom(req.sourceFragmentInstanceId, "SinkHandle"))) {
new SetThreadName(
createFullId(
req.sourceFragmentInstanceId.queryId,
req.sourceFragmentInstanceId.fragmentId,
req.sourceFragmentInstanceId.instanceId))) {
logger.debug(
"Get data block request received, for data blocks whose sequence ID in [{}, {}) from {}.",
"[ProcessGetTsBlockRequest] sequence ID in [{}, {})",
req.getStartSequenceId(),
req.getEndSequenceId(),
req.getSourceFragmentInstanceId());
req.getEndSequenceId());
if (!sinkHandles.containsKey(req.getSourceFragmentInstanceId())) {
throw new TException(
"Source fragment instance not found. Fragment instance ID: "
Expand All @@ -105,9 +108,13 @@ public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req) throws TExce
}

@Override
public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) throws TException {
public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) {
try (SetThreadName fragmentInstanceName =
new SetThreadName(createFullIdFrom(e.sourceFragmentInstanceId, "SinkHandle"))) {
new SetThreadName(
createFullId(
e.sourceFragmentInstanceId.queryId,
e.sourceFragmentInstanceId.fragmentId,
e.sourceFragmentInstanceId.instanceId))) {
logger.debug(
"Acknowledge data block event received, for data blocks whose sequence ID in [{}, {}) from {}.",
e.getStartSequenceId(),
Expand All @@ -131,8 +138,7 @@ public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) throws TEx
@Override
public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException {
try (SetThreadName fragmentInstanceName =
new SetThreadName(
createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId + ".SourceHandle"))) {
new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) {
logger.debug(
"New data block event received, for plan node {} of {} from {}.",
e.getTargetPlanNodeId(),
Expand Down Expand Up @@ -170,8 +176,7 @@ public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException {
@Override
public void onEndOfDataBlockEvent(TEndOfDataBlockEvent e) throws TException {
try (SetThreadName fragmentInstanceName =
new SetThreadName(
createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId + ".SourceHandle"))) {
new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) {
logger.debug(
"End of data block event received, for plan node {} of {} from {}.",
e.getTargetPlanNodeId(),
Expand Down Expand Up @@ -215,12 +220,12 @@ public SourceHandleListenerImpl(IMPPDataExchangeManagerCallback<Throwable> onFai

@Override
public void onFinished(ISourceHandle sourceHandle) {
logger.info("finished and release resources");
logger.info("[ScHListenerOnFinish]");
if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
|| !sourceHandles
.get(sourceHandle.getLocalFragmentInstanceId())
.containsKey(sourceHandle.getLocalPlanNodeId())) {
logger.info("resources has already been released");
logger.warn("[ScHListenerAlreadyReleased]");
} else {
sourceHandles
.get(sourceHandle.getLocalFragmentInstanceId())
Expand All @@ -234,7 +239,7 @@ public void onFinished(ISourceHandle sourceHandle) {

@Override
public void onAborted(ISourceHandle sourceHandle) {
logger.info("onAborted is invoked");
logger.info("[ScHListenerOnAbort]");
onFinished(sourceHandle);
}

Expand Down Expand Up @@ -262,28 +267,29 @@ public SinkHandleListenerImpl(

@Override
public void onFinish(ISinkHandle sinkHandle) {
logger.info("onFinish is invoked");
logger.info("[SkHListenerOnFinish]");
removeFromMPPDataExchangeManager(sinkHandle);
context.finished();
}

@Override
public void onEndOfBlocks(ISinkHandle sinkHandle) {
logger.info("[SkHListenerOnEndOfTsBlocks]");
context.transitionToFlushing();
}

@Override
public void onAborted(ISinkHandle sinkHandle) {
logger.info("onAborted is invoked");
logger.info("[SkHListenerOnAbort]");
removeFromMPPDataExchangeManager(sinkHandle);
}

private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) {
logger.info("release resources of finished sink handle");
if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
logger.info("resources already been released");
if (sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()) == null) {
logger.warn("[RemoveNoSinkHandle]");
} else {
logger.info("[RemoveSinkHandle]");
}
sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
}

@Override
Expand Down Expand Up @@ -493,7 +499,7 @@ public ISourceHandle createSourceHandle(
* <p>This method should be called when a fragment instance finished in an abnormal state.
*/
public void forceDeregisterFragmentInstance(TFragmentInstanceId fragmentInstanceId) {
logger.info("Force deregister fragment instance");
logger.info("[StartForceReleaseFIDataExchangeResource]");
if (sinkHandles.containsKey(fragmentInstanceId)) {
ISinkHandle sinkHandle = sinkHandles.get(fragmentInstanceId);
sinkHandle.abort();
Expand All @@ -502,11 +508,12 @@ public void forceDeregisterFragmentInstance(TFragmentInstanceId fragmentInstance
if (sourceHandles.containsKey(fragmentInstanceId)) {
Map<String, ISourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId);
for (Entry<String, ISourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
logger.info("Close source handle {}", sourceHandles);
logger.info("[CloseSourceHandle] {}", entry.getKey());
entry.getValue().abort();
}
sourceHandles.remove(fragmentInstanceId);
}
logger.info("[EndForceReleaseFIDataExchangeResource]");
}

/** @param suffix should be like [PlanNodeId].SourceHandle/SinHandle */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void setSourceHandle(LocalSourceHandle sourceHandle) {

/** Notify no more tsblocks will be added to the queue. */
public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
logger.info("SharedTsBlockQueue receive no more TsBlocks signal.");
logger.info("[SignalNoMoreTsBlockOnQueue]");
if (closed) {
logger.warn("queue has been destroyed");
return;
Expand Down
Loading

0 comments on commit 8c42eed

Please sign in to comment.