diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 42affa36daf6013..0d94752760a6932 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3023,4 +3023,17 @@ public static int metaServiceRpcRetryTimes() { //========================================================================== // end of cloud config //========================================================================== + //========================================================================== + // start of lock config + @ConfField(description = {"是否开启死锁检测", + "Whether to enable deadlock detection"}) + public static boolean enable_deadlock_detection = false; + + @ConfField(description = {"死锁检测间隔时间,单位分钟", + "Deadlock detection interval time, unit minute"}) + public static long deadlock_detection_interval_minute = 5; + + @ConfField(mutable = true, description = {"表示最大锁持有时间,超过该时间会打印告警日志,单位秒", + "Maximum lock hold time; logs a warning if exceeded"}) + public static long max_lock_hold_threshold_seconds = 10; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index de7feda719f7b3f..d028f3aeae14371 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -27,6 +27,7 @@ import org.apache.doris.common.LogUtils; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.Version; +import org.apache.doris.common.lock.DeadlockMonitor; import org.apache.doris.common.util.JdkUtils; import org.apache.doris.common.util.NetUtils; import org.apache.doris.httpv2.HttpServer; @@ -60,6 +61,7 @@ import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.nio.file.StandardOpenOption; +import java.util.concurrent.TimeUnit; public class DorisFE { private static final Logger LOG = LogManager.getLogger(DorisFE.class); @@ -95,6 +97,13 @@ public static void main(String[] args) { start(DORIS_HOME_DIR, PID_DIR, args, options); } + private static void startMonitor() { + if (Config.enable_deadlock_detection) { + DeadlockMonitor deadlockMonitor = new DeadlockMonitor(); + deadlockMonitor.startMonitoring(Config.deadlock_detection_interval_minute, TimeUnit.MINUTES); + } + } + // entrance for doris frontend public static void start(String dorisHomeDir, String pidDir, String[] args, StartupOptions options) { if (System.getenv("DORIS_LOG_TO_STDERR") != null) { @@ -214,7 +223,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star } ThreadPoolManager.registerAllThreadPoolMetric(); - + startMonitor(); while (true) { Thread.sleep(2000); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index 28fc0ad55b8523c..d253e4bae8c0133 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -26,6 +26,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.persist.ColocatePersistInfo; @@ -57,7 +58,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -183,7 +183,7 @@ public static boolean isGlobalGroupName(String groupName) { // save some error msg of the group for show. no need to persist private Map group2ErrMsgs = Maps.newHashMap(); - private transient ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private transient MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock(); public ColocateTableIndex() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index 5637e27e0d765b0..6862c3b61c02a31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -29,9 +29,9 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.common.util.QueryableReentrantReadWriteLock; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.persist.CreateTableInfo; @@ -87,7 +87,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf, @SerializedName(value = "fullQualifiedName") private volatile String fullQualifiedName; - private QueryableReentrantReadWriteLock rwLock; + private MonitoredReentrantReadWriteLock rwLock; // table family group map private final Map idToTable; @@ -138,7 +138,7 @@ public Database(long id, String name) { if (this.fullQualifiedName == null) { this.fullQualifiedName = ""; } - this.rwLock = new QueryableReentrantReadWriteLock(true); + this.rwLock = new MonitoredReentrantReadWriteLock(true); this.idToTable = Maps.newConcurrentMap(); this.nameToTable = Maps.newConcurrentMap(); this.lowerCaseToTableName = Maps.newConcurrentMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index eb8b10bbfe4d8b0..9df93357f012569 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -112,6 +112,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.io.Text; +import org.apache.doris.common.lock.MonitoredReentrantLock; import org.apache.doris.common.publish.TopicPublisher; import org.apache.doris.common.publish.TopicPublisherThread; import org.apache.doris.common.publish.WorkloadGroupPublisher; @@ -123,7 +124,6 @@ import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.common.util.QueryableReentrantLock; import org.apache.doris.common.util.SmallFileMgr; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; @@ -367,7 +367,7 @@ public class Env { // We use fair ReentrantLock to avoid starvation. Do not use this lock in critical code pass // because fair lock has poor performance. // Using QueryableReentrantLock to print owner thread in debug mode. - private QueryableReentrantLock lock; + private MonitoredReentrantLock lock; private CatalogMgr catalogMgr; private GlobalFunctionMgr globalFunctionMgr; @@ -689,7 +689,7 @@ public Env(boolean isCheckpointCatalog) { this.syncJobManager = new SyncJobManager(); this.alter = new Alter(); this.consistencyChecker = new ConsistencyChecker(); - this.lock = new QueryableReentrantLock(true); + this.lock = new MonitoredReentrantLock(true); this.backupHandler = new BackupHandler(this); this.metaDir = Config.meta_dir; this.publishVersionDaemon = new PublishVersionDaemon(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java index 2d1df457c050a25..ae2885d1103adb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java @@ -26,6 +26,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest; import org.apache.doris.rpc.BackendServiceProxy; @@ -43,7 +44,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.locks.ReentrantReadWriteLock; public class StorageVaultMgr { private static final Logger LOG = LogManager.getLogger(StorageVaultMgr.class); @@ -51,8 +51,10 @@ public class StorageVaultMgr { private final SystemInfoService systemInfoService; // private Pair defaultVaultInfo; + private Map vaultNameToVaultId = new HashMap<>(); - private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + + private MonitoredReentrantReadWriteLock rwLock = new MonitoredReentrantReadWriteLock(); public StorageVaultMgr(SystemInfoService systemInfoService) { this.systemInfoService = systemInfoService; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 548bea2c3844139..906a710f369c621 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -27,7 +27,8 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.QueryableReentrantReadWriteLock; +import org.apache.doris.common.lock.MonitoredReentrantLock; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; import org.apache.doris.persist.gson.GsonPostProcessable; @@ -58,7 +59,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** @@ -83,11 +83,11 @@ public abstract class Table extends MetaObject implements Writable, TableIf, Gso protected TableType type; @SerializedName(value = "createTime") protected long createTime; - protected QueryableReentrantReadWriteLock rwLock; + protected MonitoredReentrantReadWriteLock rwLock; // Used for queuing commit transactifon tasks to avoid fdb transaction conflicts, // especially to reduce conflicts when obtaining delete bitmap update locks for // MoW table - protected ReentrantLock commitLock; + protected MonitoredReentrantLock commitLock; /* * fullSchema and nameToColumn should contains all columns, both visible and shadow. @@ -133,11 +133,11 @@ public Table(TableType type) { this.type = type; this.fullSchema = Lists.newArrayList(); this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - this.rwLock = new QueryableReentrantReadWriteLock(true); + this.rwLock = new MonitoredReentrantReadWriteLock(true); if (Config.check_table_lock_leaky) { this.readLockThreads = Maps.newConcurrentMap(); } - this.commitLock = new ReentrantLock(true); + this.commitLock = new MonitoredReentrantLock(true); } public Table(long id, String tableName, TableType type, List fullSchema) { @@ -157,12 +157,12 @@ public Table(long id, String tableName, TableType type, List fullSchema) // Only view in with-clause have null base Preconditions.checkArgument(type == TableType.VIEW, "Table has no columns"); } - this.rwLock = new QueryableReentrantReadWriteLock(true); + this.rwLock = new MonitoredReentrantReadWriteLock(true); this.createTime = Instant.now().getEpochSecond(); if (Config.check_table_lock_leaky) { this.readLockThreads = Maps.newConcurrentMap(); } - this.commitLock = new ReentrantLock(true); + this.commitLock = new MonitoredReentrantLock(true); } public void markDropped() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 6380ba8e3aa0e6d..4102f4f117e464a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -27,6 +27,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; @@ -51,7 +52,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -128,7 +128,7 @@ public TabletHealth() { private long cooldownReplicaId = -1; @SerializedName(value = "ctm", alternate = {"cooldownTerm"}) private long cooldownTerm = -1; - private ReentrantReadWriteLock cooldownConfLock = new ReentrantReadWriteLock(); + private MonitoredReentrantReadWriteLock cooldownConfLock = new MonitoredReentrantReadWriteLock(); // last time that the tablet checker checks this tablet. // no need to persist diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/lock/AbstractMonitoredLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/lock/AbstractMonitoredLock.java new file mode 100644 index 000000000000000..7389ed0d61b6b21 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/lock/AbstractMonitoredLock.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.lock; + +import org.apache.doris.common.Config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for a monitored lock that tracks lock acquisition, + * release, and attempt times. It provides mechanisms for monitoring the + * duration for which a lock is held and logging any instances where locks + * are held longer than a specified timeout or fail to be acquired within + * a specified timeout. + */ +public abstract class AbstractMonitoredLock { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractMonitoredLock.class); + + // Thread-local variable to store the lock start time + private final ThreadLocal lockStartTime = new ThreadLocal<>(); + + + /** + * Method to be called after successfully acquiring the lock. + * Sets the start time for the lock. + */ + protected void afterLock() { + lockStartTime.set(System.nanoTime()); + } + + /** + * Method to be called after releasing the lock. + * Calculates the lock hold time and logs a warning if it exceeds the hold timeout. + */ + protected void afterUnlock() { + Long startTime = lockStartTime.get(); + if (startTime != null) { + long lockHoldTimeNanos = System.nanoTime() - startTime; + long lockHoldTimeMs = lockHoldTimeNanos >> 20; + if (lockHoldTimeMs > Config.max_lock_hold_threshold_seconds * 1000) { + Thread currentThread = Thread.currentThread(); + String stackTrace = getThreadStackTrace(currentThread.getStackTrace()); + LOG.warn("Thread ID: {}, Thread Name: {} - Lock held for {} ms, exceeding hold timeout of {} ms " + + "Thread stack trace:{}", + currentThread.getId(), currentThread.getName(), lockHoldTimeMs, lockHoldTimeMs, stackTrace); + } + lockStartTime.remove(); + } + } + + /** + * Method to be called after attempting to acquire the lock using tryLock. + * Logs a warning if the lock was not acquired within a reasonable time. + * + * @param acquired Whether the lock was successfully acquired + * @param startTime The start time of the lock attempt + */ + protected void afterTryLock(boolean acquired, long startTime) { + if (acquired) { + afterLock(); + return; + } + if (LOG.isDebugEnabled()) { + long elapsedTime = (System.nanoTime() - startTime) >> 20; + Thread currentThread = Thread.currentThread(); + String stackTrace = getThreadStackTrace(currentThread.getStackTrace()); + LOG.debug("Thread ID: {}, Thread Name: {} - Failed to acquire the lock within {} ms" + + "\nThread blocking info:\n{}", + currentThread.getId(), currentThread.getName(), elapsedTime, stackTrace); + } + } + + /** + * Utility method to format the stack trace of a thread. + * + * @param stackTrace The stack trace elements of the thread + * @return A formatted string of the stack trace + */ + private String getThreadStackTrace(StackTraceElement[] stackTrace) { + StringBuilder sb = new StringBuilder(); + for (StackTraceElement element : stackTrace) { + sb.append("\tat ").append(element).append("\n"); + } + return sb.toString().replace("\n", "\\n"); + } +} + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/lock/DeadlockMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/common/lock/DeadlockMonitor.java new file mode 100644 index 000000000000000..4fcda97dbd1ad52 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/lock/DeadlockMonitor.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.lock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A utility class for monitoring and reporting deadlocks in a Java application. + *

+ * This class uses the Java Management API to periodically check for deadlocked threads + * and logs detailed information about any detected deadlocks. It can be configured to + * run at a fixed interval. + *

+ */ +public class DeadlockMonitor { + private static final Logger LOG = LoggerFactory.getLogger(DeadlockMonitor.class); + private final ThreadMXBean threadMXBean; + private final ScheduledExecutorService scheduler; + + public DeadlockMonitor() { + this.threadMXBean = ManagementFactory.getThreadMXBean(); + this.scheduler = Executors.newScheduledThreadPool(1); + } + + /** + * Starts monitoring for deadlocks at a fixed rate. + * + * @param period the period between successive executions + * @param unit the time unit of the period parameter + */ + public void startMonitoring(long period, TimeUnit unit) { + scheduler.scheduleAtFixedRate(this::detectAndReportDeadlocks, 5, period, unit); + } + + /** + * Detects and reports deadlocks if any are found. + */ + public void detectAndReportDeadlocks() { + // Get IDs of threads that are deadlocked + long[] deadlockedThreadIds = threadMXBean.findDeadlockedThreads(); + + // Check if there are no deadlocked threads + if (deadlockedThreadIds == null || deadlockedThreadIds.length == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("No deadlocks detected."); + } + return; + } + + // Get information about deadlocked threads + ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreadIds, true, true); + String deadlockReportString = Arrays.toString(threadInfos).replace("\n", "\\n"); + // Log the deadlock report + LOG.warn("Deadlocks detected {}", deadlockReportString); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantLock.java new file mode 100644 index 000000000000000..60211a6a8a8c9cb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantLock.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.lock; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A monitored version of ReentrantLock that provides additional monitoring capabilities + * for lock acquisition and release. + */ +public class MonitoredReentrantLock extends ReentrantLock { + private static final long serialVersionUID = 1L; + + // Monitor for tracking lock acquisition and release + private final AbstractMonitoredLock lockMonitor = new AbstractMonitoredLock() { + }; + + // Constructor for creating a monitored lock with fairness option + public MonitoredReentrantLock(boolean fair) { + super(fair); + } + + // Constructor for creating a monitored lock with fairness option + public MonitoredReentrantLock() { + } + + /** + * Acquires the lock. + * Records the time when the lock is acquired. + */ + @Override + public void lock() { + super.lock(); + lockMonitor.afterLock(); + } + + /** + * Releases the lock. + * Records the time when the lock is released and logs the duration. + */ + @Override + public void unlock() { + lockMonitor.afterUnlock(); + super.unlock(); + } + + /** + * Tries to acquire the lock. + * Records the time when the lock attempt started and logs the result. + * + * @return true if the lock was acquired, false otherwise + */ + @Override + public boolean tryLock() { + long start = System.nanoTime(); // Record start time + boolean acquired = super.tryLock(); // Attempt to acquire the lock + lockMonitor.afterTryLock(acquired, start); // Log result and elapsed time + return acquired; + } + + /** + * Tries to acquire the lock within the specified time limit. + * Records the time when the lock attempt started and logs the result. + * + * @param timeout the time to wait for the lock + * @param unit the time unit of the timeout argument + * @return true if the lock was acquired, false if the waiting time elapsed before the lock was acquired + * @throws InterruptedException if the current thread is interrupted while waiting + */ + @Override + public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { + long start = System.nanoTime(); // Record start time + boolean acquired = super.tryLock(timeout, unit); // Attempt to acquire the lock + lockMonitor.afterTryLock(acquired, start); // Log result and elapsed time + return acquired; + } + + @Override + public Thread getOwner() { + return super.getOwner(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantReadWriteLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantReadWriteLock.java new file mode 100644 index 000000000000000..7a6f0db5938b23c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/lock/MonitoredReentrantReadWriteLock.java @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.lock; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A monitored version of ReentrantReadWriteLock that provides additional + * monitoring capabilities for read and write locks. + */ +public class MonitoredReentrantReadWriteLock extends ReentrantReadWriteLock { + // Monitored read and write lock instances + private final ReadLock readLock = new ReadLock(this); + private final WriteLock writeLock = new WriteLock(this); + + // Constructor for creating a monitored lock with fairness option + public MonitoredReentrantReadWriteLock(boolean fair) { + super(fair); + } + + public MonitoredReentrantReadWriteLock() { + } + + /** + * Monitored read lock class that extends ReentrantReadWriteLock.ReadLock. + */ + public class ReadLock extends ReentrantReadWriteLock.ReadLock { + private static final long serialVersionUID = 1L; + private final AbstractMonitoredLock monitor = new AbstractMonitoredLock() {}; + + /** + * Constructs a new ReadLock instance. + * + * @param lock The ReentrantReadWriteLock this lock is associated with + */ + protected ReadLock(ReentrantReadWriteLock lock) { + super(lock); + } + + /** + * Acquires the read lock. + * Records the time when the lock is acquired. + */ + @Override + public void lock() { + super.lock(); + monitor.afterLock(); + } + + /** + * Releases the read lock. + * Records the time when the lock is released and logs the duration. + */ + @Override + public void unlock() { + monitor.afterUnlock(); + super.unlock(); + } + } + + /** + * Monitored write lock class that extends ReentrantReadWriteLock.WriteLock. + */ + public class WriteLock extends ReentrantReadWriteLock.WriteLock { + private static final long serialVersionUID = 1L; + private final AbstractMonitoredLock monitor = new AbstractMonitoredLock() {}; + + /** + * Constructs a new WriteLock instance. + * + * @param lock The ReentrantReadWriteLock this lock is associated with + */ + protected WriteLock(ReentrantReadWriteLock lock) { + super(lock); + } + + /** + * Acquires the write lock. + * Records the time when the lock is acquired. + */ + @Override + public void lock() { + super.lock(); + monitor.afterLock(); + } + + /** + * Releases the write lock. + * Records the time when the lock is released and logs the duration. + */ + @Override + public void unlock() { + monitor.afterUnlock(); + super.unlock(); + } + } + + /** + * Returns the read lock associated with this lock. + * + * @return The monitored read lock + */ + @Override + public ReadLock readLock() { + return readLock; + } + + /** + * Returns the write lock associated with this lock. + * + * @return The monitored write lock + */ + @Override + public WriteLock writeLock() { + return writeLock; + } + + @Override + public Thread getOwner() { + return super.getOwner(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java deleted file mode 100644 index 1f0283434f99a03..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -import java.util.concurrent.locks.ReentrantLock; - -/* - * This Lock is for exposing the getOwner() method, - * which is a protected method of ReentrantLock - */ -public class QueryableReentrantLock extends ReentrantLock { - private static final long serialVersionUID = 1L; - - public QueryableReentrantLock() { - super(); - } - - public QueryableReentrantLock(boolean fair) { - super(fair); - } - - @Override - public Thread getOwner() { - return super.getOwner(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantReadWriteLock.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantReadWriteLock.java deleted file mode 100644 index 3f55b54229710fb..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryableReentrantReadWriteLock.java +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/* - * This Lock is for exposing the getOwner() method, - * which is a protected method of ReentrantLock - */ -public class QueryableReentrantReadWriteLock extends ReentrantReadWriteLock { - private static final long serialVersionUID = 1L; - - public QueryableReentrantReadWriteLock() { - super(); - } - - public QueryableReentrantReadWriteLock(boolean fair) { - super(fair); - } - - @Override - public Thread getOwner() { - return super.getOwner(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index c2a5ea673c22d99..c3ffd208d5558b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -42,6 +42,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; @@ -71,7 +72,6 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -89,7 +89,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { public static final String METADATA_REFRESH_INTERVAL_SEC = "metadata_refresh_interval_sec"; public static final String CATALOG_TYPE_PROP = "type"; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private final MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock(true); @SerializedName(value = "idToCatalog") private final Map>> idToCatalog = Maps.newConcurrentMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index e2fe6cdd7a5b044..d653a5a178e484a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -28,6 +28,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase; import org.apache.doris.datasource.infoschema.ExternalInfoSchemaTable; @@ -58,7 +59,6 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Base class of external database. @@ -69,7 +69,7 @@ public abstract class ExternalDatabase implements DatabaseIf, Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(ExternalDatabase.class); - protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); + protected MonitoredReentrantReadWriteLock rwLock = new MonitoredReentrantReadWriteLock(true); @SerializedName(value = "id") protected long id; @@ -446,7 +446,7 @@ public void gsonPostProcess() throws IOException { } } idToTbl = tmpIdToTbl; - rwLock = new ReentrantReadWriteLock(true); + rwLock = new MonitoredReentrantReadWriteLock(true); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 2b7f6090bb9ecda..f67dc6189486b1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -131,13 +131,13 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.common.lock.MonitoredReentrantLock; import org.apache.doris.common.util.DbUtil; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.IdGeneratorUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.common.util.QueryableReentrantLock; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; @@ -220,7 +220,7 @@ public class InternalCatalog implements CatalogIf { private static final Logger LOG = LogManager.getLogger(InternalCatalog.class); - private QueryableReentrantLock lock = new QueryableReentrantLock(true); + private MonitoredReentrantLock lock = new MonitoredReentrantLock(true); private transient ConcurrentHashMap idToDb = new ConcurrentHashMap<>(); private transient ConcurrentHashMap fullNameToDb = new ConcurrentHashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java index 60765d705d554d5..d5e8a39e605a8b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; @@ -43,15 +44,13 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @Data public class TablePartitionValues { public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__"; - private final ReadWriteLock readWriteLock; + private final MonitoredReentrantReadWriteLock readWriteLock; private long lastUpdateTimestamp; private long nextPartitionId; private final Map idToPartitionItem; @@ -68,7 +67,7 @@ public class TablePartitionValues { private Map> singleUidToColumnRangeMap; public TablePartitionValues() { - readWriteLock = new ReentrantReadWriteLock(); + readWriteLock = new MonitoredReentrantReadWriteLock(); lastUpdateTimestamp = 0; nextPartitionId = 0; idToPartitionItem = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java index c510ca99a0fcb00..b9f5b599e6b36d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserManager.java @@ -28,7 +28,7 @@ import org.apache.doris.common.PatternMatcherException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.QueryableReentrantReadWriteLock; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.mysql.MysqlPassword; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; @@ -60,7 +60,7 @@ public class UserManager implements Writable, GsonPostProcessable { public static final String ANY_HOST = "%"; private static final Logger LOG = LogManager.getLogger(UserManager.class); - private static final QueryableReentrantReadWriteLock rwLock = new QueryableReentrantReadWriteLock(false); + private static final MonitoredReentrantReadWriteLock rwLock = new MonitoredReentrantReadWriteLock(false); private static final Lock rlock = rwLock.readLock(); private static final Lock wlock = rwLock.writeLock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java index 88454eaecdc4e0f..c0ea85dc9f21779 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java @@ -23,6 +23,7 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.insert.InsertJob; @@ -33,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -41,7 +41,7 @@ */ public class LabelProcessor { private final Map>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>(); - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private final MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock(true); private void readLock() { lock.readLock().lock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java index 11fc547e6b40cc5..6a0442c0569f76d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.UserException; +import org.apache.doris.common.lock.MonitoredReentrantLock; import org.apache.doris.proto.Types; import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.system.Backend; @@ -34,8 +35,6 @@ import java.util.List; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** * Use consistent hashing to find the BE corresponding to the key to @@ -48,7 +47,7 @@ public class CacheCoordinator { public boolean debugModel = false; private Hashtable realNodes = new Hashtable<>(); private SortedMap virtualNodes = new TreeMap<>(); - private static Lock belock = new ReentrantLock(); + private static MonitoredReentrantLock belock = new MonitoredReentrantLock(); private long lastRefreshTime; private static CacheCoordinator cachePartition; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 4da88db08ccf3a6..5e9c22bede7e82c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -45,6 +45,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.InternalDatabaseUtil; @@ -94,7 +95,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -123,7 +123,7 @@ private enum PublishResult { // the lock is used to control the access to transaction states // no other locks should be inside this lock - private final ReentrantReadWriteLock transactionLock = new ReentrantReadWriteLock(true); + private final MonitoredReentrantReadWriteLock transactionLock = new MonitoredReentrantReadWriteLock(true); // transactionId -> running TransactionState private final Map idToRunningTransactionState = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java index f8f7b2178f971bb..1608b1d6efa3e55 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java @@ -17,6 +17,8 @@ package org.apache.doris.common.util; +import org.apache.doris.common.lock.MonitoredReentrantLock; + import org.junit.Assert; import org.junit.Test; @@ -24,7 +26,7 @@ public class QueryableReentrantLockTest { - private QueryableReentrantLock lock = new QueryableReentrantLock(true); + private MonitoredReentrantLock lock = new MonitoredReentrantLock(true); @Test public void test() throws InterruptedException { diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 24853b0a0c6cd28..e9448b340144ac0 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -79,3 +79,6 @@ master_sync_policy = WRITE_NO_SYNC replica_sync_policy = WRITE_NO_SYNC enable_advance_next_id = true +# enable deadlock detection +enable_deadlock_detection = true +max_lock_hold_threshold_seconds = 10