Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](lock)add deadlock detection tool and monitored lock implementations #39015 #22

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2987,4 +2987,17 @@ public static int metaServiceRpcRetryTimes() {
//==========================================================================
// end of cloud config
//==========================================================================
//==========================================================================
// start of lock config
@ConfField(description = {"是否开启死锁检测",
"Whether to enable deadlock detection"})
public static boolean enable_deadlock_detection = false;

@ConfField(description = {"死锁检测间隔时间,单位分钟",
"Deadlock detection interval time, unit minute"})
public static long deadlock_detection_interval_minute = 5;

@ConfField(mutable = true, description = {"表示最大锁持有时间,超过该时间会打印告警日志,单位秒",
"Maximum lock hold time; logs a warning if exceeded"})
public static long max_lock_hold_threshold_seconds = 10;
}
11 changes: 10 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.LogUtils;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
import org.apache.doris.common.lock.DeadlockMonitor;
import org.apache.doris.common.util.JdkUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.HttpServer;
Expand Down Expand Up @@ -60,6 +61,7 @@
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;

public class DorisFE {
private static final Logger LOG = LogManager.getLogger(DorisFE.class);
Expand Down Expand Up @@ -95,6 +97,13 @@ public static void main(String[] args) {
start(DORIS_HOME_DIR, PID_DIR, args, options);
}

private static void startMonitor() {
if (Config.enable_deadlock_detection) {
DeadlockMonitor deadlockMonitor = new DeadlockMonitor();
deadlockMonitor.startMonitoring(Config.deadlock_detection_interval_minute, TimeUnit.MINUTES);
}
}

// entrance for doris frontend
public static void start(String dorisHomeDir, String pidDir, String[] args, StartupOptions options) {
if (System.getenv("DORIS_LOG_TO_STDERR") != null) {
Expand Down Expand Up @@ -214,7 +223,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
}

ThreadPoolManager.registerAllThreadPoolMetric();

startMonitor();
while (true) {
Thread.sleep(2000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.persist.ColocatePersistInfo;
Expand Down Expand Up @@ -57,7 +58,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -183,7 +183,7 @@ public static boolean isGlobalGroupName(String groupName) {
// save some error msg of the group for show. no need to persist
private Map<GroupId, String> group2ErrMsgs = Maps.newHashMap();

private transient ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private transient MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock();

public ColocateTableIndex() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.persist.CreateTableInfo;
Expand Down Expand Up @@ -87,7 +87,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>,
@SerializedName(value = "fullQualifiedName")
private volatile String fullQualifiedName;

private QueryableReentrantReadWriteLock rwLock;
private MonitoredReentrantReadWriteLock rwLock;

// table family group map
private final Map<Long, Table> idToTable;
Expand Down Expand Up @@ -138,7 +138,7 @@ public Database(long id, String name) {
if (this.fullQualifiedName == null) {
this.fullQualifiedName = "";
}
this.rwLock = new QueryableReentrantReadWriteLock(true);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
this.idToTable = Maps.newConcurrentMap();
this.nameToTable = Maps.newConcurrentMap();
this.lowerCaseToTableName = Maps.newConcurrentMap();
Expand Down
6 changes: 3 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.common.publish.TopicPublisher;
import org.apache.doris.common.publish.TopicPublisherThread;
import org.apache.doris.common.publish.WorkloadGroupPublisher;
Expand All @@ -122,7 +123,6 @@
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.QueryableReentrantLock;
import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -362,7 +362,7 @@ public class Env {
// We use fair ReentrantLock to avoid starvation. Do not use this lock in critical code pass
// because fair lock has poor performance.
// Using QueryableReentrantLock to print owner thread in debug mode.
private QueryableReentrantLock lock;
private MonitoredReentrantLock lock;

private CatalogMgr catalogMgr;
private GlobalFunctionMgr globalFunctionMgr;
Expand Down Expand Up @@ -680,7 +680,7 @@ public Env(boolean isCheckpointCatalog) {
this.syncJobManager = new SyncJobManager();
this.alter = new Alter();
this.consistencyChecker = new ConsistencyChecker();
this.lock = new QueryableReentrantLock(true);
this.lock = new MonitoredReentrantLock(true);
this.backupHandler = new BackupHandler(this);
this.metaDir = Config.meta_dir;
this.publishVersionDaemon = new PublishVersionDaemon();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest;
import org.apache.doris.rpc.BackendServiceProxy;
Expand All @@ -43,8 +44,6 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class StorageVaultMgr {
private static final Logger LOG = LogManager.getLogger(StorageVaultMgr.class);
Expand All @@ -54,7 +53,7 @@ public class StorageVaultMgr {

private Map<String, String> vaultNameToVaultId = new HashMap<>();

private ReadWriteLock rwLock = new ReentrantReadWriteLock();
private MonitoredReentrantReadWriteLock rwLock = new MonitoredReentrantReadWriteLock();

private static final ExecutorService ALTER_BE_SYNC_THREAD_POOL = Executors.newFixedThreadPool(1);

Expand Down
16 changes: 8 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.persist.gson.GsonPostProcessable;
Expand Down Expand Up @@ -59,7 +60,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
Expand All @@ -84,11 +84,11 @@ public abstract class Table extends MetaObject implements Writable, TableIf, Gso
protected TableType type;
@SerializedName(value = "createTime")
protected long createTime;
protected QueryableReentrantReadWriteLock rwLock;
protected MonitoredReentrantReadWriteLock rwLock;
// Used for queuing commit transactifon tasks to avoid fdb transaction conflicts,
// especially to reduce conflicts when obtaining delete bitmap update locks for
// MoW table
protected ReentrantLock commitLock;
protected MonitoredReentrantLock commitLock;

/*
* fullSchema and nameToColumn should contains all columns, both visible and shadow.
Expand Down Expand Up @@ -134,11 +134,11 @@ public Table(TableType type) {
this.type = type;
this.fullSchema = Lists.newArrayList();
this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
this.rwLock = new QueryableReentrantReadWriteLock(true);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
this.commitLock = new ReentrantLock(true);
this.commitLock = new MonitoredReentrantLock(true);
}

public Table(long id, String tableName, TableType type, List<Column> fullSchema) {
Expand All @@ -158,12 +158,12 @@ public Table(long id, String tableName, TableType type, List<Column> fullSchema)
// Only view in with-clause have null base
Preconditions.checkArgument(type == TableType.VIEW, "Table has no columns");
}
this.rwLock = new QueryableReentrantReadWriteLock(true);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
this.createTime = Instant.now().getEpochSecond();
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
this.commitLock = new ReentrantLock(true);
this.commitLock = new MonitoredReentrantLock(true);
}

public void markDropped() {
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
Expand All @@ -51,7 +52,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -98,7 +98,7 @@ public enum TabletStatus {
private long cooldownReplicaId = -1;
@SerializedName(value = "ctm", alternate = {"cooldownTerm"})
private long cooldownTerm = -1;
private ReentrantReadWriteLock cooldownConfLock = new ReentrantReadWriteLock();
private MonitoredReentrantReadWriteLock cooldownConfLock = new MonitoredReentrantReadWriteLock();

// last time that the tablet checker checks this tablet.
// no need to persist
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.lock;

import org.apache.doris.common.Config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Abstract base class for a monitored lock that tracks lock acquisition,
* release, and attempt times. It provides mechanisms for monitoring the
* duration for which a lock is held and logging any instances where locks
* are held longer than a specified timeout or fail to be acquired within
* a specified timeout.
*/
public abstract class AbstractMonitoredLock {

private static final Logger LOG = LoggerFactory.getLogger(AbstractMonitoredLock.class);

// Thread-local variable to store the lock start time
private final ThreadLocal<Long> lockStartTime = new ThreadLocal<>();


/**
* Method to be called after successfully acquiring the lock.
* Sets the start time for the lock.
*/
protected void afterLock() {
lockStartTime.set(System.nanoTime());
}

/**
* Method to be called after releasing the lock.
* Calculates the lock hold time and logs a warning if it exceeds the hold timeout.
*/
protected void afterUnlock() {
Long startTime = lockStartTime.get();
if (startTime != null) {
long lockHoldTimeNanos = System.nanoTime() - startTime;
long lockHoldTimeMs = lockHoldTimeNanos >> 20;
if (lockHoldTimeMs > Config.max_lock_hold_threshold_seconds * 1000) {
Thread currentThread = Thread.currentThread();
String stackTrace = getThreadStackTrace(currentThread.getStackTrace());
LOG.warn("Thread ID: {}, Thread Name: {} - Lock held for {} ms, exceeding hold timeout of {} ms "
+ "Thread stack trace:{}",
currentThread.getId(), currentThread.getName(), lockHoldTimeMs, lockHoldTimeMs, stackTrace);
}
lockStartTime.remove();
}
}

/**
* Method to be called after attempting to acquire the lock using tryLock.
* Logs a warning if the lock was not acquired within a reasonable time.
*
* @param acquired Whether the lock was successfully acquired
* @param startTime The start time of the lock attempt
*/
protected void afterTryLock(boolean acquired, long startTime) {
if (acquired) {
afterLock();
return;
}
if (LOG.isDebugEnabled()) {
long elapsedTime = (System.nanoTime() - startTime) >> 20;
Thread currentThread = Thread.currentThread();
String stackTrace = getThreadStackTrace(currentThread.getStackTrace());
LOG.debug("Thread ID: {}, Thread Name: {} - Failed to acquire the lock within {} ms"
+ "\nThread blocking info:\n{}",
currentThread.getId(), currentThread.getName(), elapsedTime, stackTrace);
}
}

/**
* Utility method to format the stack trace of a thread.
*
* @param stackTrace The stack trace elements of the thread
* @return A formatted string of the stack trace
*/
private String getThreadStackTrace(StackTraceElement[] stackTrace) {
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : stackTrace) {
sb.append("\tat ").append(element).append("\n");
}
return sb.toString().replace("\n", "\\n");
}
}


Loading
Loading