From b9803169ce403ee9a91e4d89958cb2e4f25a4017 Mon Sep 17 00:00:00 2001 From: JamesChenX Date: Wed, 18 Dec 2024 07:30:22 +0800 Subject: [PATCH] Use monotonic clock to measure time for robustness at the cost of performance --- .../common/ClientRequestDispatcher.java | 7 ++- .../client/common/IpRequestThrottler.java | 19 ++++--- .../access/client/common/UserSession.java | 22 +++++++- .../service/ServiceRequestService.java | 2 +- .../session/manager/HeartbeatManager.java | 56 +++++++++---------- .../session/service/SessionService.java | 13 +++-- .../infra/logging/ClientApiLogging.java | 8 +-- .../BaseAdminApiRateLimitingManager.java | 4 +- .../admin/web/HttpRequestDispatcher.java | 4 +- .../blocklist/manager/AutoBlockManager.java | 24 ++++---- .../session/service/UserStatusService.java | 12 ++-- .../common/infra/address/IpDetector.java | 17 +++--- .../application/TurmsApplicationContext.java | 15 ++--- .../service/connection/ConnectionService.java | 22 ++++---- .../service/connection/TurmsConnection.java | 4 +- .../healthcheck/MemoryHealthChecker.java | 31 +++++----- .../server/common/infra/json/JsonUtil.java | 4 +- .../common/infra/logging/AdminApiLogging.java | 4 +- .../core/layout/TurmsTemplateLayout.java | 6 +- .../server/common/infra/task/TaskManager.java | 14 ++--- .../infra/thread/ThreadDumpFormatter.java | 4 +- .../common/infra/throttle/TokenBucket.java | 30 +++++----- .../infra/throttle/TokenBucketContext.java | 5 +- .../server/common/infra/time/DateRange.java | 4 +- .../time/{DateUtil.java => DateTimeUtil.java} | 22 ++++++-- .../storage/redis/TurmsRedisClient.java | 16 +++--- .../infra/throttle/TokenBucketTests.java | 19 ++++--- ...eUtilTests.java => DateTimeUtilTests.java} | 8 +-- .../turms/server/common/testing/JsonUtil.java | 2 +- .../dispatcher/ServiceRequestDispatcher.java | 4 +- .../codec/BlockedClientSerializer.java | 4 +- .../admin/controller/BaseController.java | 6 +- .../group/service/GroupBlocklistService.java | 6 +- .../group/service/GroupInvitationService.java | 6 +- .../service/GroupJoinRequestService.java | 4 +- .../group/service/GroupMemberService.java | 4 +- .../group/service/GroupQuestionService.java | 4 +- .../domain/group/service/GroupService.java | 10 ++-- .../message/service/MessageService.java | 5 +- .../service/UserFriendRequestService.java | 4 +- .../service/UserRelationshipGroupService.java | 4 +- .../user/service/UserRelationshipService.java | 6 +- .../infra/logging/ClientApiLogging.java | 4 +- ...eUtilTests.java => DateTimeUtilTests.java} | 25 +++++---- 44 files changed, 273 insertions(+), 221 deletions(-) rename turms-server-common/src/main/java/im/turms/server/common/infra/time/{DateUtil.java => DateTimeUtil.java} (91%) rename turms-server-common/src/test/java/unit/im/turms/server/common/infra/time/{DateUtilTests.java => DateTimeUtilTests.java} (92%) rename turms-service/src/test/java/unit/im/turms/service/infra/time/{DateUtilTests.java => DateTimeUtilTests.java} (64%) diff --git a/turms-gateway/src/main/java/im/turms/gateway/access/client/common/ClientRequestDispatcher.java b/turms-gateway/src/main/java/im/turms/gateway/access/client/common/ClientRequestDispatcher.java index 06b9ffa7af..fc015c6703 100644 --- a/turms-gateway/src/main/java/im/turms/gateway/access/client/common/ClientRequestDispatcher.java +++ b/turms-gateway/src/main/java/im/turms/gateway/access/client/common/ClientRequestDispatcher.java @@ -53,6 +53,7 @@ import im.turms.server.common.infra.property.TurmsPropertiesManager; import im.turms.server.common.infra.proto.ProtoDecoder; import im.turms.server.common.infra.proto.ProtoEncoder; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.tracing.TracingCloseableContext; import im.turms.server.common.infra.tracing.TracingContext; @@ -166,6 +167,7 @@ public Mono handleRequest0( } // Parse and handle service requests long requestTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); int requestSize = serviceRequestBuffer.readableBytes(); SimpleTurmsRequest request; SimpleTurmsRequest tempRequest; @@ -250,7 +252,7 @@ public Mono handleRequest0( requestSize, requestTime, notification, - System.currentTimeMillis() - requestTime); + (System.nanoTime() - startTime) / DateTimeUtil.NANOS_PER_MILLI); } } return ProtoEncoder.getDirectByteBuffer(notification); @@ -287,8 +289,7 @@ public Mono handleServiceRequest( } // Rate limiting - long now = System.currentTimeMillis(); - if (!ipRequestThrottler.tryAcquireToken(sessionWrapper.getIp(), now)) { + if (!ipRequestThrottler.tryAcquireToken(sessionWrapper.getIp(), System.nanoTime())) { blocklistService.tryBlockIpForFrequentRequest(sessionWrapper.getIp()); UserSession userSession = sessionWrapper.getUserSession(); if (userSession != null) { diff --git a/turms-gateway/src/main/java/im/turms/gateway/access/client/common/IpRequestThrottler.java b/turms-gateway/src/main/java/im/turms/gateway/access/client/common/IpRequestThrottler.java index 5ba6037f14..4e1072f55c 100644 --- a/turms-gateway/src/main/java/im/turms/gateway/access/client/common/IpRequestThrottler.java +++ b/turms-gateway/src/main/java/im/turms/gateway/access/client/common/IpRequestThrottler.java @@ -32,6 +32,7 @@ import im.turms.server.common.infra.thread.NamedThreadFactory; import im.turms.server.common.infra.throttle.TokenBucket; import im.turms.server.common.infra.throttle.TokenBucketContext; +import im.turms.server.common.infra.time.DateTimeUtil; /** * @author James Chen @@ -41,10 +42,10 @@ public class IpRequestThrottler { private static final Logger LOGGER = LoggerFactory.getLogger(IpRequestThrottler.class); - private static final long IDLE_ENTRY_TTL = 30 * 60 * 1000L; - private static final long INTERVAL_TO_CHECK = 30 * 60 * 1000L; + private static final long IDLE_ENTRY_TTL_NANOS = 30 * 60 * DateTimeUtil.NANOS_PER_SECOND; + private static final long INTERVAL_TO_CHECK_MILLIS = 30L * 60 * 1000; private static final int BATCH_SIZE = 10_000; - private static final long SLEEP_THRESHOLD_MILLIS = 1000; + private static final long SLEEP_THRESHOLD_NANOS = 1000 * DateTimeUtil.NANOS_PER_MILLI; private static final long SLEEP_MILLIS = 1000; /** @@ -94,7 +95,7 @@ public IpRequestThrottler( LOGGER.error("Failed to remove expired request token buckets", e); } try { - Thread.sleep(INTERVAL_TO_CHECK); + Thread.sleep(INTERVAL_TO_CHECK_MILLIS); } catch (InterruptedException e) { break; } @@ -107,11 +108,11 @@ private void removeExpiredRequestTokenBuckets() throws InterruptedException { Iterator iterator = ipToRequestTokenBucket.values() .iterator(); int processed = 0; - long startTime = System.currentTimeMillis(); + long startTimeNanos = System.nanoTime(); while (iterator.hasNext()) { TokenBucket bucket = iterator.next(); - long lastAccessTime = bucket.getLastRefillTime(); - if ((startTime - lastAccessTime) > IDLE_ENTRY_TTL + long lastAccessTimeNanos = bucket.getLastRefillTimeNanos(); + if ((startTimeNanos - lastAccessTimeNanos) > IDLE_ENTRY_TTL_NANOS && bucket.isTokensMoreThanOrEqualsToInitialTokens()) { iterator.remove(); } @@ -120,9 +121,9 @@ private void removeExpiredRequestTokenBuckets() throws InterruptedException { // and cause the server cannot serve for users if (processed >= BATCH_SIZE) { processed = 0; - if (System.currentTimeMillis() - startTime > SLEEP_THRESHOLD_MILLIS) { + if (System.nanoTime() - startTimeNanos > SLEEP_THRESHOLD_NANOS) { Thread.sleep(SLEEP_MILLIS); - startTime = System.currentTimeMillis(); + startTimeNanos = System.nanoTime(); } } } diff --git a/turms-gateway/src/main/java/im/turms/gateway/access/client/common/UserSession.java b/turms-gateway/src/main/java/im/turms/gateway/access/client/common/UserSession.java index 154b036245..83a8a2d24d 100644 --- a/turms-gateway/src/main/java/im/turms/gateway/access/client/common/UserSession.java +++ b/turms-gateway/src/main/java/im/turms/gateway/access/client/common/UserSession.java @@ -85,13 +85,15 @@ public final class UserSession { * other types of requests */ private volatile long lastHeartbeatRequestTimestampMillis; + private volatile long lastHeartbeatRequestTimestampNanos; /** * Record the timestamp of the last requests except heartbeat requests */ private volatile long lastRequestTimestampMillis; + private volatile long lastRequestTimestampNanos; // No need to add volatile because it can only be accessed by one thread // (the thread "turms-client-heartbeat-refresher" in HeartbeatManager) - private long lastHeartbeatUpdateTimestampMillis; + private long lastHeartbeatUpdateTimestampNanos; /** * Note that it is acceptable that the session is still open even if the connection is closed @@ -118,6 +120,8 @@ public UserSession( @Nullable Map deviceDetails, @Nullable Location loginLocation) { Date now = new Date(); + long nowMillis = now.getTime(); + long nowNanos = System.nanoTime(); this.version = version; this.permissions = permissions; this.userId = userId; @@ -127,8 +131,10 @@ public UserSession( : deviceDetails; this.loginLocation = loginLocation; this.loginDate = now; - this.lastHeartbeatRequestTimestampMillis = now.getTime(); - this.lastRequestTimestampMillis = now.getTime(); + this.lastHeartbeatRequestTimestampMillis = nowMillis; + this.lastHeartbeatRequestTimestampNanos = nowNanos; + this.lastRequestTimestampMillis = nowMillis; + this.lastRequestTimestampNanos = nowNanos; } public void setConnection(NetConnection connection, ByteArrayWrapper ip) { @@ -136,6 +142,16 @@ public void setConnection(NetConnection connection, ByteArrayWrapper ip) { this.ip = ip; } + public void setLastHeartbeatRequestTimestampToNow() { + lastHeartbeatRequestTimestampMillis = System.currentTimeMillis(); + lastHeartbeatRequestTimestampNanos = System.nanoTime(); + } + + public void setLastRequestTimestampToNow() { + lastRequestTimestampMillis = System.currentTimeMillis(); + lastRequestTimestampNanos = System.nanoTime(); + } + /** * A session cannot reopen once closed, but the connection can close and reconnect. * diff --git a/turms-gateway/src/main/java/im/turms/gateway/domain/servicerequest/service/ServiceRequestService.java b/turms-gateway/src/main/java/im/turms/gateway/domain/servicerequest/service/ServiceRequestService.java index 812bb5886a..31bffd6e7e 100644 --- a/turms-gateway/src/main/java/im/turms/gateway/domain/servicerequest/service/ServiceRequestService.java +++ b/turms-gateway/src/main/java/im/turms/gateway/domain/servicerequest/service/ServiceRequestService.java @@ -56,7 +56,7 @@ public Mono handleServiceRequest( ServiceRequest serviceRequest) { try { // Update request timestamp - session.setLastRequestTimestampMillis(System.currentTimeMillis()); + session.setLastRequestTimestampToNow(); // Forward request serviceRequest.getTurmsRequestBuffer() .retain(); diff --git a/turms-gateway/src/main/java/im/turms/gateway/domain/session/manager/HeartbeatManager.java b/turms-gateway/src/main/java/im/turms/gateway/domain/session/manager/HeartbeatManager.java index cd3ee96301..2874d08a97 100644 --- a/turms-gateway/src/main/java/im/turms/gateway/domain/session/manager/HeartbeatManager.java +++ b/turms-gateway/src/main/java/im/turms/gateway/domain/session/manager/HeartbeatManager.java @@ -39,6 +39,7 @@ import im.turms.server.common.infra.logging.core.logger.Logger; import im.turms.server.common.infra.logging.core.logger.LoggerFactory; import im.turms.server.common.infra.thread.TurmsThread; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.time.DurationConst; /** @@ -70,12 +71,12 @@ public class HeartbeatManager { private final Map userIdToSessionsManager; private final TurmsThread workerThread; private int closeIdleSessionAfterSeconds; - private int closeIdleSessionAfterMillis; + private long closeIdleSessionAfterNanos; private int expectedFractionPerSecond; @Setter - private int minHeartbeatIntervalMillis; + private long minHeartbeatIntervalNanos; @Setter - private int switchProtocolAfterMillis; + private long switchProtocolAfterNanos; public HeartbeatManager( SessionService sessionService, @@ -90,8 +91,8 @@ public HeartbeatManager( this.userIdToSessionsManager = userIdToSessionsManager; setClientHeartbeatIntervalSeconds(clientHeartbeatIntervalSeconds); setCloseIdleSessionAfterSeconds(closeIdleSessionAfterSeconds); - this.minHeartbeatIntervalMillis = minHeartbeatIntervalSeconds * 1000; - this.switchProtocolAfterMillis = switchProtocolAfterSeconds * 1000; + this.minHeartbeatIntervalNanos = DateTimeUtil.secondsToNanos(minHeartbeatIntervalSeconds); + this.switchProtocolAfterNanos = DateTimeUtil.secondsToNanos(switchProtocolAfterSeconds); workerThread = TurmsThread.create(ThreadNameConst.CLIENT_HEARTBEAT_REFRESHER, true, () -> { Thread thread = Thread.currentThread(); while (!thread.isInterrupted()) { @@ -115,7 +116,7 @@ public HeartbeatManager( public void setCloseIdleSessionAfterSeconds(int closeIdleSessionAfterSeconds) { this.closeIdleSessionAfterSeconds = closeIdleSessionAfterSeconds; - closeIdleSessionAfterMillis = closeIdleSessionAfterSeconds * 1000; + closeIdleSessionAfterNanos = DateTimeUtil.secondsToNanos(closeIdleSessionAfterSeconds); } public void setClientHeartbeatIntervalSeconds(int clientHeartbeatIntervalSeconds) { @@ -167,7 +168,7 @@ private LongKeyGenerator collectOnlineUsersAndUpdateStatus( * onlineUserCount / expectedFractionPerSecond); Iterator iterator = managers.iterator(); return new LongKeyGenerator() { - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); @Override public int estimatedSize() { @@ -196,38 +197,37 @@ public long next() { } @Nullable - private Long closeOrUpdateSession(UserSession session, long now) { + private Long closeOrUpdateSession(UserSession session, long nowNanos) { if (!session.isOpen()) { return null; } if (UdpRequestDispatcher.isEnabled() && session.supportsSwitchingToUdp() - && session.isConnected()) { - int requestElapsedTime = (int) (now - session.getLastRequestTimestampMillis()); - if (requestElapsedTime > switchProtocolAfterMillis) { - session.getConnection() - .switchToUdp(); - return null; - } + && session.isConnected() + && nowNanos - session.getLastRequestTimestampNanos() > switchProtocolAfterNanos) { + session.getConnection() + .switchToUdp(); + return null; } // Limit the frequency of sending heartbeat requests to Redis - long lastHeartbeatUpdateTimestamp = session.getLastHeartbeatUpdateTimestampMillis(); - int localMinHeartbeatIntervalMillis = minHeartbeatIntervalMillis; - if (localMinHeartbeatIntervalMillis > 0 - && now - lastHeartbeatUpdateTimestamp < localMinHeartbeatIntervalMillis) { + long lastHeartbeatUpdateTimestampNanos = session.getLastHeartbeatUpdateTimestampNanos(); + long localMinHeartbeatIntervalNanos = minHeartbeatIntervalNanos; + if (localMinHeartbeatIntervalNanos > 0 + && nowNanos - lastHeartbeatUpdateTimestampNanos < localMinHeartbeatIntervalNanos) { return null; } // Only sends heartbeat requests to Redis if the client has // sent any request to the local node after the last heartbeat update request - long lastHeartbeatRequestTimestamp = - Math.max(session.getLastHeartbeatRequestTimestampMillis(), - session.getLastRequestTimestampMillis()); - if (lastHeartbeatRequestTimestamp <= lastHeartbeatUpdateTimestamp) { + long lastHeartbeatRequestTimestampNanos = + Math.max(session.getLastHeartbeatRequestTimestampNanos(), + session.getLastRequestTimestampNanos()); + if (lastHeartbeatRequestTimestampNanos <= lastHeartbeatUpdateTimestampNanos) { return null; } - int localCloseIdleSessionAfterMillis = closeIdleSessionAfterMillis; - if (localCloseIdleSessionAfterMillis > 0 - && (int) (now - lastHeartbeatRequestTimestamp) > localCloseIdleSessionAfterMillis) { + long localCloseIdleSessionAfterNanos = closeIdleSessionAfterNanos; + if (localCloseIdleSessionAfterNanos > 0 + && nowNanos + - lastHeartbeatRequestTimestampNanos > localCloseIdleSessionAfterNanos) { sessionService .closeLocalSession(session.getUserId(), session.getDeviceType(), @@ -239,7 +239,7 @@ private Long closeOrUpdateSession(UserSession session, long now) { HEARTBEAT_TIMEOUT)); return null; } - session.setLastHeartbeatUpdateTimestampMillis(now); + session.setLastHeartbeatUpdateTimestampNanos(nowNanos); return session.getUserId(); } -} +} \ No newline at end of file diff --git a/turms-gateway/src/main/java/im/turms/gateway/domain/session/service/SessionService.java b/turms-gateway/src/main/java/im/turms/gateway/domain/session/service/SessionService.java index 96b030597c..78209b4eba 100644 --- a/turms-gateway/src/main/java/im/turms/gateway/domain/session/service/SessionService.java +++ b/turms-gateway/src/main/java/im/turms/gateway/domain/session/service/SessionService.java @@ -82,6 +82,7 @@ import im.turms.server.common.infra.property.env.gateway.session.SessionProperties; import im.turms.server.common.infra.reactor.PublisherPool; import im.turms.server.common.infra.reactor.PublisherUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.ValidDeviceType; import im.turms.server.common.infra.validation.Validator; @@ -180,10 +181,10 @@ public SessionService( newSessionProperties.getClientHeartbeatIntervalSeconds()); heartbeatManager.setCloseIdleSessionAfterSeconds( newSessionProperties.getCloseIdleSessionAfterSeconds()); - heartbeatManager.setMinHeartbeatIntervalMillis( - newSessionProperties.getMinHeartbeatIntervalSeconds() * 1000); - heartbeatManager.setSwitchProtocolAfterMillis( - newSessionProperties.getSwitchProtocolAfterSeconds() * 1000); + heartbeatManager.setMinHeartbeatIntervalNanos(DateTimeUtil + .secondsToNanos(newSessionProperties.getMinHeartbeatIntervalSeconds())); + heartbeatManager.setSwitchProtocolAfterNanos(DateTimeUtil + .secondsToNanos(newSessionProperties.getSwitchProtocolAfterSeconds())); }); propertiesManager.addLocalPropertiesChangeListener(this::updateLocalProperties); @@ -222,7 +223,7 @@ public Mono destroy() { } public void handleHeartbeatUpdateRequest(UserSession session) { - session.setLastHeartbeatRequestTimestampMillis(System.currentTimeMillis()); + session.setLastHeartbeatRequestTimestampToNow(); } public Mono handleLoginRequest( @@ -596,7 +597,7 @@ public UserSession authAndUpdateHeartbeatTimestamp( && session.getId() == sessionId && !session.getConnection() .isConnectionRecovering()) { - session.setLastHeartbeatRequestTimestampMillis(System.currentTimeMillis()); + session.setLastHeartbeatRequestTimestampToNow(); return session; } } diff --git a/turms-gateway/src/main/java/im/turms/gateway/infra/logging/ClientApiLogging.java b/turms-gateway/src/main/java/im/turms/gateway/infra/logging/ClientApiLogging.java index 53e805251b..b6d1d37706 100644 --- a/turms-gateway/src/main/java/im/turms/gateway/infra/logging/ClientApiLogging.java +++ b/turms-gateway/src/main/java/im/turms/gateway/infra/logging/ClientApiLogging.java @@ -27,7 +27,7 @@ import im.turms.server.common.infra.lang.NumberFormatter; import im.turms.server.common.infra.lang.StringUtil; import im.turms.server.common.infra.netty.ByteBufUtil; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import static im.turms.server.common.infra.logging.CommonLogger.CLIENT_API_LOGGER; import static im.turms.server.common.infra.logging.CommonLogger.LOG_FIELD_DELIMITER; @@ -70,7 +70,7 @@ public static void log( NumberFormatter.toCharBytes(requestId), requestType.name(), NumberFormatter.toCharBytes(requestSize), - DateUtil.toBytes(requestTime), + DateTimeUtil.toBytes(requestTime), // response information NumberFormatter.toCharBytes(response.getCode()), response.hasData() @@ -107,7 +107,7 @@ public static void log( NumberFormatter.toCharBytes(requestId), requestType.name(), NumberFormatter.toCharBytes(requestSize), - DateUtil.toBytes(requestTime), + DateTimeUtil.toBytes(requestTime), // response information NumberFormatter.toCharBytes(responseCode), null, // Response data type @@ -142,7 +142,7 @@ public static void log( NumberFormatter.toCharBytes(requestId), requestType, NumberFormatter.toCharBytes(requestSize), - DateUtil.toBytes(requestTime), + DateTimeUtil.toBytes(requestTime), // response information NumberFormatter.toCharBytes(responseCode), responseDataType, diff --git a/turms-server-common/src/main/java/im/turms/server/common/access/admin/throttle/BaseAdminApiRateLimitingManager.java b/turms-server-common/src/main/java/im/turms/server/common/access/admin/throttle/BaseAdminApiRateLimitingManager.java index 4b8343dca9..ab253a8a9d 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/access/admin/throttle/BaseAdminApiRateLimitingManager.java +++ b/turms-server-common/src/main/java/im/turms/server/common/access/admin/throttle/BaseAdminApiRateLimitingManager.java @@ -44,7 +44,7 @@ protected BaseAdminApiRateLimitingManager( () -> { Iterator iterator = ipToTokenBucket.values() .iterator(); - long now = System.currentTimeMillis(); + long now = System.nanoTime(); while (iterator.hasNext()) { TokenBucket bucket = iterator.next(); bucket.refill(now); @@ -60,7 +60,7 @@ protected BaseAdminApiRateLimitingManager( public boolean tryAcquireTokenByIp(String ip) { TokenBucket bucket = ipToTokenBucket.computeIfAbsent(ip, key -> new TokenBucket(tokenBucketContext)); - return bucket.tryAcquire(System.currentTimeMillis()); + return bucket.tryAcquire(System.nanoTime()); } } \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/access/admin/web/HttpRequestDispatcher.java b/turms-server-common/src/main/java/im/turms/server/common/access/admin/web/HttpRequestDispatcher.java index 22c973efad..b9ac99d35d 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/access/admin/web/HttpRequestDispatcher.java +++ b/turms-server-common/src/main/java/im/turms/server/common/access/admin/web/HttpRequestDispatcher.java @@ -80,6 +80,7 @@ import im.turms.server.common.infra.property.TurmsPropertiesManager; import im.turms.server.common.infra.property.env.common.adminapi.AdminHttpProperties; import im.turms.server.common.infra.property.env.common.adminapi.BaseAdminApiProperties; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.time.DurationConst; import im.turms.server.common.infra.tracing.TracingCloseableContext; import im.turms.server.common.infra.tracing.TracingContext; @@ -310,6 +311,7 @@ private Mono handle(HttpServerRequest request, HttpServerResponse response .send(); } long requestTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); TracingContext tracingContext = new TracingContext(); RequestContext requestContext = new RequestContext(); String ip = request.remoteAddress() @@ -336,7 +338,7 @@ private Mono handle(HttpServerRequest request, HttpServerResponse response requestTime, requestContext.getAction(), requestContext.getParamValues(), - (int) (System.currentTimeMillis() - requestTime), + (int) ((System.nanoTime() - startTime) / DateTimeUtil.NANOS_PER_MILLI), signal.getThrowable()); }) .onErrorResume(t -> { diff --git a/turms-server-common/src/main/java/im/turms/server/common/domain/blocklist/manager/AutoBlockManager.java b/turms-server-common/src/main/java/im/turms/server/common/domain/blocklist/manager/AutoBlockManager.java index 4b083f2af6..c765a5dde1 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/domain/blocklist/manager/AutoBlockManager.java +++ b/turms-server-common/src/main/java/im/turms/server/common/domain/blocklist/manager/AutoBlockManager.java @@ -27,6 +27,7 @@ import im.turms.server.common.infra.collection.CollectionUtil; import im.turms.server.common.infra.property.env.common.security.AutoBlockItemProperties; import im.turms.server.common.infra.property.env.common.security.AutoBlockItemProperties.BlockLevel; +import im.turms.server.common.infra.time.DateTimeUtil; /** * @author James Chen @@ -66,19 +67,20 @@ public void tryBlockClient(T id) { return; } blockedClientIdToStatus.compute(id, (key, status) -> { - long now = System.currentTimeMillis(); + long now = System.nanoTime(); if (status == null) { status = new BlockStatus(UNSET_BLOCK_LEVEL, null, 0, now); + } else { + status.lastBlockTriggerTimeNanos = now; } // Update status - long previousBlockTriggerTime = status.lastBlockTriggerTime; - status.lastBlockTriggerTime = now; - int reduceOneTriggerTimeInterval = + long previousBlockTriggerTimeNanos = status.lastBlockTriggerTimeNanos; + int reduceOneTriggerTimeIntervalMillis = status.currentLevelProperties.getReduceOneTriggerTimeIntervalMillis(); int times = status.triggerTimes; - if (reduceOneTriggerTimeInterval > 0) { - times -= (int) (status.lastBlockTriggerTime - previousBlockTriggerTime) - / reduceOneTriggerTimeInterval; + if (reduceOneTriggerTimeIntervalMillis > 0) { + times -= (int) ((status.lastBlockTriggerTimeNanos - previousBlockTriggerTimeNanos) + / (reduceOneTriggerTimeIntervalMillis * DateTimeUtil.NANOS_PER_MILLI)); if (times < 0) { times = 0; } @@ -116,7 +118,7 @@ public void evictExpiredBlockedClients() { if (!isEnabled) { return; } - long now = System.currentTimeMillis(); + long now = System.nanoTime(); Iterator iterator = blockedClientIdToStatus.values() .iterator(); while (iterator.hasNext()) { @@ -124,8 +126,8 @@ public void evictExpiredBlockedClients() { int reduceOneTriggerTimeInterval = status.currentLevelProperties.getReduceOneTriggerTimeIntervalMillis(); if (reduceOneTriggerTimeInterval > 0) { - int times = status.triggerTimes - - (int) (now - status.lastBlockTriggerTime) / reduceOneTriggerTimeInterval; + int times = status.triggerTimes - (int) ((now - status.lastBlockTriggerTimeNanos) + / (reduceOneTriggerTimeInterval * DateTimeUtil.NANOS_PER_MILLI)); if (times <= 0) { iterator.remove(); } @@ -138,7 +140,7 @@ private static class BlockStatus { private int currentLevel; private BlockLevel currentLevelProperties; private int triggerTimes; - private long lastBlockTriggerTime; + private long lastBlockTriggerTimeNanos; } } \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/domain/session/service/UserStatusService.java b/turms-server-common/src/main/java/im/turms/server/common/domain/session/service/UserStatusService.java index d8422ce60f..8d7444a808 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/domain/session/service/UserStatusService.java +++ b/turms-server-common/src/main/java/im/turms/server/common/domain/session/service/UserStatusService.java @@ -65,6 +65,7 @@ import im.turms.server.common.infra.reactor.HashedWheelScheduler; import im.turms.server.common.infra.reactor.PublisherPool; import im.turms.server.common.infra.test.VisibleForTesting; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.ValidDeviceType; import im.turms.server.common.infra.validation.Validator; import im.turms.server.common.storage.redis.TurmsRedisClientManager; @@ -77,7 +78,7 @@ @Service public class UserStatusService extends BaseService { - private static final long NODE_STATUS_TTL_MILLIS = 15_000L; + private static final long NODE_STATUS_TTL_NANOS = 15 * DateTimeUtil.NANOS_PER_SECOND; private final RedisScript addOnlineUserScript; private final RedisScript> getUsersDeviceDetailsScript = @@ -483,18 +484,17 @@ private Mono fetchNodeStatus(String nodeId) { Mono nodeStatusMono = nodeIdToStatusCache.computeIfAbsent(nodeId, id -> node.getDiscoveryService() .checkIfMemberExists(nodeId) - .map(isActive -> new NodeStatus(System.currentTimeMillis(), isActive))); + .map(isActive -> new NodeStatus(System.nanoTime(), isActive))); return nodeStatusMono // To not cache error .doOnError(t -> nodeIdToStatusCache.remove(nodeId, nodeStatusMono)) .flatMap(status -> { - if ((System.currentTimeMillis() - - status.recordTimestampMillis) < NODE_STATUS_TTL_MILLIS) { + if ((System.nanoTime() - status.recordTimestampNanos) < NODE_STATUS_TTL_NANOS) { return Mono.just(status); } Mono newStatus = Mono.defer(() -> node.getDiscoveryService() .checkIfMemberExists(nodeId) - .map(isActive -> new NodeStatus(System.currentTimeMillis(), isActive))); + .map(isActive -> new NodeStatus(System.nanoTime(), isActive))); boolean replaced = nodeIdToStatusCache.replace(nodeId, nodeStatusMono, newStatus); return replaced @@ -724,7 +724,7 @@ public Mono addOnlineDeviceIfAbsent( } private record NodeStatus( - long recordTimestampMillis, + long recordTimestampNanos, boolean isActive ) { } diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/address/IpDetector.java b/turms-server-common/src/main/java/im/turms/server/common/infra/address/IpDetector.java index 68f1bcba8c..db54ab43d5 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/address/IpDetector.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/address/IpDetector.java @@ -34,6 +34,7 @@ import im.turms.server.common.infra.net.InetAddressUtil; import im.turms.server.common.infra.property.TurmsPropertiesManager; import im.turms.server.common.infra.property.env.common.IpProperties; +import im.turms.server.common.infra.time.DateTimeUtil; /** * @author James Chen @@ -49,10 +50,10 @@ public class IpDetector { private final TurmsPropertiesManager propertiesManager; @Nullable private String cachedPrivateIp; - private long privateIpLastUpdatedDate; + private long privateIpLastUpdatedTimeNanos; @Nullable private String cachedPublicIp; - private long publicIpLastUpdatedDate; + private long publicIpLastUpdatedTimeNanos; public IpDetector(TurmsPropertiesManager propertiesManager) { this.propertiesManager = propertiesManager; @@ -65,8 +66,8 @@ public String queryPrivateIp() { String localCachedPrivateIp = cachedPrivateIp; if (cachedPrivateIpExpireAfterMillis > 0 && localCachedPrivateIp != null - && System.currentTimeMillis() - - privateIpLastUpdatedDate < cachedPrivateIpExpireAfterMillis) { + && System.nanoTime() - privateIpLastUpdatedTimeNanos < DateTimeUtil + .millisToNanos(cachedPrivateIpExpireAfterMillis)) { return localCachedPrivateIp; } DatagramChannel channel = null; @@ -83,7 +84,7 @@ public String queryPrivateIp() { + ip + ") is not a site local IP address"); } - privateIpLastUpdatedDate = System.currentTimeMillis(); + privateIpLastUpdatedTimeNanos = System.nanoTime(); cachedPrivateIp = ip; return ip; } catch (Exception e) { @@ -105,8 +106,8 @@ public Mono queryPublicIp() { String localCachedPublicIp = cachedPublicIp; if (cachedPublicIpExpireAfterMillis > 0 && localCachedPublicIp != null - && System.currentTimeMillis() - - publicIpLastUpdatedDate < cachedPublicIpExpireAfterMillis) { + && System.nanoTime() - publicIpLastUpdatedTimeNanos < DateTimeUtil + .millisToNanos(cachedPublicIpExpireAfterMillis)) { return Mono.just(localCachedPublicIp); } List ipDetectorAddresses = ipProperties.getPublicIpDetectorAddresses(); @@ -134,7 +135,7 @@ public Mono queryPublicIp() { } return Mono.firstWithValue(monos) .doOnNext(ip -> { - publicIpLastUpdatedDate = System.currentTimeMillis(); + publicIpLastUpdatedTimeNanos = System.nanoTime(); cachedPublicIp = ip; }) .switchIfEmpty(EXCEPTION_NO_AVAILABLE_ADDRESS_FOUND) diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/application/TurmsApplicationContext.java b/turms-server-common/src/main/java/im/turms/server/common/infra/application/TurmsApplicationContext.java index 21fd78d2dd..00753a6a7c 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/application/TurmsApplicationContext.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/application/TurmsApplicationContext.java @@ -56,6 +56,7 @@ import im.turms.server.common.infra.system.SystemUtil; import im.turms.server.common.infra.thread.NamedThreadFactory; import im.turms.server.common.infra.thread.ThreadNameConst; +import im.turms.server.common.infra.time.DateTimeUtil; /** * @author James Chen @@ -86,7 +87,7 @@ public class TurmsApplicationContext { private final BuildProperties buildProperties; private long shutdownJobGracefulTimeoutMillis; - private long shutdownJobForcedTimeoutMillis; + private long shutdownJobForcedTimeoutNanos; private final TreeMap shutdownHooks = new TreeMap<>(); public TurmsApplicationContext(Environment environment) { @@ -177,8 +178,8 @@ public void handleContextRefreshedEvent(ContextRefreshedEvent event) { ShutdownProperties properties = propertiesManager.getLocalProperties() .getShutdown(); shutdownJobGracefulTimeoutMillis = properties.getJobGracefulTimeoutMillis(); - shutdownJobForcedTimeoutMillis = - Math.max(properties.getJobForcedTimeoutMillis(), shutdownJobGracefulTimeoutMillis); + shutdownJobForcedTimeoutNanos = DateTimeUtil.millisToNanos( + Math.max(properties.getJobForcedTimeoutMillis(), shutdownJobGracefulTimeoutMillis)); } @EventListener(classes = ContextClosedEvent.class) @@ -193,20 +194,20 @@ public void handleContextClosedEvent() { Future> shutdownFuture = executor.submit(() -> orderAndJob.getValue() .run(shutdownJobGracefulTimeoutMillis)); try { - long time = System.currentTimeMillis(); + long time = System.nanoTime(); Mono mono = - shutdownFuture.get(shutdownJobForcedTimeoutMillis, TimeUnit.MILLISECONDS); + shutdownFuture.get(shutdownJobForcedTimeoutNanos, TimeUnit.NANOSECONDS); if (mono == null) { throw new IllegalArgumentException( "The result of the job \"" + jobName + "\" must not be null"); } - time = shutdownJobForcedTimeoutMillis - (System.currentTimeMillis() - time); + time = shutdownJobForcedTimeoutNanos - (System.nanoTime() - time); if (time <= 0) { throw new TimeoutException(); } - mono.block(Duration.ofMillis(time)); + mono.block(Duration.ofNanos(time)); } catch (TimeoutException e) { shutdownFuture.cancel(true); if (!isClosingLogProcessor) { diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/cluster/service/connection/ConnectionService.java b/turms-server-common/src/main/java/im/turms/server/common/infra/cluster/service/connection/ConnectionService.java index 72fd52abbb..68112445fe 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/cluster/service/connection/ConnectionService.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/cluster/service/connection/ConnectionService.java @@ -64,6 +64,7 @@ import im.turms.server.common.infra.thread.NamedThreadFactory; import im.turms.server.common.infra.thread.ThreadNameConst; import im.turms.server.common.infra.thread.TurmsThread; +import im.turms.server.common.infra.time.DateTimeUtil; import static im.turms.server.common.infra.metrics.CommonMetricNameConst.TURMS_RPC_CLIENT_TCP; @@ -94,8 +95,8 @@ public class ConnectionService implements ClusterService { private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionService.class); private final SslProperties clientSsl; - private final long keepaliveIntervalMillis; - private final long keepaliveTimeoutMillis; + private final long keepaliveIntervalNanos; + private final long keepaliveTimeoutNanos; private final Duration reconnectInterval; // Thread resources @@ -132,8 +133,10 @@ public ConnectionService(ConnectionProperties connectionProperties) { serverProperties = connectionProperties.getServer(); ConnectionClientProperties clientProperties = connectionProperties.getClient(); clientSsl = clientProperties.getSsl(); - keepaliveIntervalMillis = clientProperties.getKeepaliveIntervalSeconds() * 1000L; - keepaliveTimeoutMillis = clientProperties.getKeepaliveTimeoutSeconds() * 1000L; + keepaliveIntervalNanos = + DateTimeUtil.secondsToNanos(clientProperties.getKeepaliveIntervalSeconds()); + keepaliveTimeoutNanos = + DateTimeUtil.secondsToNanos(clientProperties.getKeepaliveTimeoutSeconds()); reconnectInterval = Duration.ofSeconds(clientProperties.getReconnectIntervalSeconds()); eventLoopGroupForClients = new NioEventLoopGroup( Runtime.getRuntime() @@ -356,7 +359,7 @@ public void keepalive(String nodeId) { "Received a keepalive request from a non-connected node: " + nodeId); } - connection.setLastKeepaliveTimestamp(System.currentTimeMillis()); + connection.setLastKeepaliveTimestampNanos(System.nanoTime()); } private void sendKeepaliveToConnectionsForever() { @@ -391,23 +394,22 @@ private void sendKeepalive(Iterator> iterator if (!connection.isLocalNodeClient()) { return; } - long now = System.currentTimeMillis(); - long elapsedTime = now - connection.getLastKeepaliveTimestamp(); - if (elapsedTime > keepaliveTimeoutMillis) { + long elapsedTime = System.nanoTime() - connection.getLastKeepaliveTimestampNanos(); + if (elapsedTime > keepaliveTimeoutNanos) { LOGGER.warn("Reconnecting to the member ({}) due to keepalive timeout", nodeId); // onConnectionClosed() will reconnect the member disconnectConnection(connection); iterator.remove(); return; } - if (elapsedTime < keepaliveIntervalMillis) { + if (elapsedTime < keepaliveIntervalNanos) { return; } rpcService.requestResponse(nodeId, new KeepaliveRequest()) .subscribe(null, t -> LOGGER.warn("Failed to send a keepalive request to the member: " + nodeId, t), - () -> connection.setLastKeepaliveTimestamp(System.currentTimeMillis())); + () -> connection.setLastKeepaliveTimestampNanos(System.nanoTime())); } // Handshake diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/cluster/service/connection/TurmsConnection.java b/turms-server-common/src/main/java/im/turms/server/common/infra/cluster/service/connection/TurmsConnection.java index 74e6d34a15..ad55824745 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/cluster/service/connection/TurmsConnection.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/cluster/service/connection/TurmsConnection.java @@ -46,7 +46,7 @@ public class TurmsConnection { private boolean isClosing; private final boolean isLocalNodeClient; - private volatile long lastKeepaliveTimestamp; + private volatile long lastKeepaliveTimestampNanos; private final List listeners; @@ -62,7 +62,7 @@ public TurmsConnection( ? Collections.emptyList() : listeners; - lastKeepaliveTimestamp = System.currentTimeMillis(); + lastKeepaliveTimestampNanos = System.nanoTime(); isClosing = false; } } \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/healthcheck/MemoryHealthChecker.java b/turms-server-common/src/main/java/im/turms/server/common/infra/healthcheck/MemoryHealthChecker.java index 59c03cf1c6..45965666fa 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/healthcheck/MemoryHealthChecker.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/healthcheck/MemoryHealthChecker.java @@ -34,6 +34,7 @@ import im.turms.server.common.infra.logging.core.logger.LoggerFactory; import im.turms.server.common.infra.logging.core.model.LogLevel; import im.turms.server.common.infra.property.env.common.healthcheck.MemoryHealthCheckProperties; +import im.turms.server.common.infra.time.DateTimeUtil; import static im.turms.server.common.infra.unit.ByteSizeUnit.MB; @@ -73,13 +74,13 @@ public final class MemoryHealthChecker extends HealthChecker { private final int directMemoryWarningThresholdPercentage; private final int heapMemoryWarningThresholdPercentage; - private final int minMemoryWarningIntervalMillis; - private long lastDirectMemoryWarningTimestamp; - private long lastHeapMemoryWarningTimestamp; + private final long minMemoryWarningIntervalNanos; + private long lastDirectMemoryWarningTimestampNanos; + private long lastHeapMemoryWarningTimestampNanos; private final int heapMemoryGcThresholdPercentage; - private final int minHeapMemoryGcIntervalMillis; - private long lastHeapMemoryGcTimestamp; + private final long minHeapMemoryGcIntervalNanos; + private long lastHeapMemoryGcTimestampNanos; private boolean isMemoryHealthy; @Nullable @@ -162,10 +163,12 @@ public MemoryHealthChecker(MemoryHealthCheckProperties properties) { directMemoryWarningThresholdPercentage = properties.getDirectMemoryWarningThresholdPercentage(); heapMemoryWarningThresholdPercentage = properties.getHeapMemoryWarningThresholdPercentage(); - minMemoryWarningIntervalMillis = properties.getMinMemoryWarningIntervalSeconds() * 1000; + minMemoryWarningIntervalNanos = + DateTimeUtil.secondsToNanos(properties.getMinMemoryWarningIntervalSeconds()); minFreeSystemMemory = properties.getMinFreeSystemMemoryBytes(); heapMemoryGcThresholdPercentage = properties.getHeapMemoryGcThresholdPercentage(); - minHeapMemoryGcIntervalMillis = properties.getMinHeapMemoryGcIntervalSeconds() * 1000; + minHeapMemoryGcIntervalNanos = + DateTimeUtil.secondsToNanos(properties.getMinHeapMemoryGcIntervalSeconds()); } @Override @@ -261,12 +264,12 @@ private void tryLog(boolean isHealthy) { asMbString(usedNonHeapMemory)); } - long now = System.currentTimeMillis(); + long now = System.nanoTime(); float usedMemoryPercentage = 100F * usedDirectMemory / maxDirectMemory; if (directMemoryWarningThresholdPercentage > 0 && directMemoryWarningThresholdPercentage < usedMemoryPercentage - && minMemoryWarningIntervalMillis < (now - lastDirectMemoryWarningTimestamp)) { - lastDirectMemoryWarningTimestamp = now; + && minMemoryWarningIntervalNanos < (now - lastDirectMemoryWarningTimestampNanos)) { + lastDirectMemoryWarningTimestampNanos = now; LOGGER.warn("The used direct memory has exceeded the warning threshold: {}/{}/{}/{}", asMbString(usedDirectMemory), asMbString(maxDirectMemory), @@ -276,8 +279,8 @@ private void tryLog(boolean isHealthy) { usedMemoryPercentage = 100F * usedHeapMemory / maxHeapMemory; if (heapMemoryWarningThresholdPercentage > 0 && heapMemoryWarningThresholdPercentage < usedMemoryPercentage - && minMemoryWarningIntervalMillis < (now - lastHeapMemoryWarningTimestamp)) { - lastHeapMemoryWarningTimestamp = now; + && minMemoryWarningIntervalNanos < (now - lastHeapMemoryWarningTimestampNanos)) { + lastHeapMemoryWarningTimestampNanos = now; LOGGER.warn("The used heap memory has exceeded the warning threshold: {}/{}/{}/{}", asMbString(usedHeapMemory), asMbString(maxHeapMemory), @@ -288,8 +291,8 @@ private void tryLog(boolean isHealthy) { if (!isHealthy && heapMemoryGcThresholdPercentage > 0 && heapMemoryGcThresholdPercentage < usedMemoryPercentage - && minHeapMemoryGcIntervalMillis < (now - lastHeapMemoryGcTimestamp)) { - lastHeapMemoryGcTimestamp = now; + && minHeapMemoryGcIntervalNanos < (now - lastHeapMemoryGcTimestampNanos)) { + lastHeapMemoryGcTimestampNanos = now; LOGGER.info( "Trying to start GC because the available memory has exceeded and the used heap memory has exceeded the GC threshold: {}/{}/{}/{}", asMbString(usedHeapMemory), diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/json/JsonUtil.java b/turms-server-common/src/main/java/im/turms/server/common/infra/json/JsonUtil.java index 6d8fbf2582..81dd0c1311 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/json/JsonUtil.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/json/JsonUtil.java @@ -38,7 +38,7 @@ import im.turms.server.common.infra.lang.StringUtil; import im.turms.server.common.infra.logging.core.logger.Logger; import im.turms.server.common.infra.logging.core.logger.LoggerFactory; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; /** * @author James Chen @@ -177,7 +177,7 @@ private static int estimateNonRecordSize(Object value) { } yield size; } - case Date ignored -> DateUtil.DATE_TIME_LENGTH; + case Date ignored -> DateTimeUtil.DATE_TIME_LENGTH; case String str -> StringUtil.getLength(str); case byte[] array -> array.length; case Object[] array -> { diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/logging/AdminApiLogging.java b/turms-server-common/src/main/java/im/turms/server/common/infra/logging/AdminApiLogging.java index f3e5b545d4..ccb412e95f 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/logging/AdminApiLogging.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/logging/AdminApiLogging.java @@ -25,7 +25,7 @@ import im.turms.server.common.infra.lang.NumberFormatter; import im.turms.server.common.infra.lang.StringUtil; import im.turms.server.common.infra.netty.ByteBufUtil; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; /** * @author James Chen @@ -61,7 +61,7 @@ public static void log( ip, // Request requestId, - DateUtil.toBytes(requestTime), + DateTimeUtil.toBytes(requestTime), action, params.toString(), // Response diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/logging/core/layout/TurmsTemplateLayout.java b/turms-server-common/src/main/java/im/turms/server/common/infra/logging/core/layout/TurmsTemplateLayout.java index 0a7309632e..2b83d7fdb5 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/logging/core/layout/TurmsTemplateLayout.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/logging/core/layout/TurmsTemplateLayout.java @@ -34,7 +34,7 @@ import im.turms.server.common.infra.logging.core.context.LogThreadContext; import im.turms.server.common.infra.logging.core.model.LogLevel; import im.turms.server.common.infra.netty.ReferenceCountUtil; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.tracing.TracingContext; /** @@ -141,7 +141,7 @@ private ByteBuf format0( @Nullable CharSequence msg, @Nullable Object[] args, @Nullable Throwable throwable) { - byte[] timestamp = DateUtil.toBytes(System.currentTimeMillis()); + byte[] timestamp = DateTimeUtil.toBytes(System.currentTimeMillis()); String threadName = Thread.currentThread() .getName(); @@ -193,7 +193,7 @@ private ByteBuf format0( @Nullable byte[] className, LogLevel level, ByteBuf msg) { - byte[] timestamp = DateUtil.toBytes(System.currentTimeMillis()); + byte[] timestamp = DateTimeUtil.toBytes(System.currentTimeMillis()); String threadName = Thread.currentThread() .getName(); diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/task/TaskManager.java b/turms-server-common/src/main/java/im/turms/server/common/infra/task/TaskManager.java index ee9cbc4c51..c9630dadf3 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/task/TaskManager.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/task/TaskManager.java @@ -32,6 +32,7 @@ import im.turms.server.common.infra.logging.core.logger.LoggerFactory; import im.turms.server.common.infra.thread.NamedThreadFactory; import im.turms.server.common.infra.thread.ThreadNameConst; +import im.turms.server.common.infra.time.DateTimeUtil; /** * Handle tasks in a thread @@ -42,7 +43,7 @@ public class TaskManager { private static final Logger LOGGER = LoggerFactory.getLogger(TaskManager.class); - private static final int SLOW_LOG_THRESHOLD_MILLIS = 1000; + private static final long SLOW_LOG_THRESHOLD_NANOS = 1000 * DateTimeUtil.NANOS_PER_MILLI; private final Map> scheduledTaskMap; @@ -78,7 +79,7 @@ public Task(String taskName, Runnable runnable) { @Override public void run() { - long startTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); try { runnable.run(); } catch (Exception e) { @@ -89,19 +90,18 @@ public void run() { .getName(), e); } - long endTime = System.currentTimeMillis(); - long diff = endTime - startTime; - if (diff > SLOW_LOG_THRESHOLD_MILLIS) { + long took = System.nanoTime() - startTime; + if (took > SLOW_LOG_THRESHOLD_NANOS) { LOGGER.warn("The task \"" + taskName + "\" defined in the class (" + runnable.getClass() .getName() + ") was slow and took (" - + diff + + (took / DateTimeUtil.NANOS_PER_MILLI) + ") millis to execute"); } } } -} +} \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/thread/ThreadDumpFormatter.java b/turms-server-common/src/main/java/im/turms/server/common/infra/thread/ThreadDumpFormatter.java index db00c8503d..8a26c61a7e 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/thread/ThreadDumpFormatter.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/thread/ThreadDumpFormatter.java @@ -30,7 +30,7 @@ import im.turms.server.common.infra.lang.NumberFormatter; import im.turms.server.common.infra.lang.StringUtil; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; /** * @author James Chen @@ -72,7 +72,7 @@ private ThreadDumpFormatter() { public static ByteBuf format(ThreadInfo[] threads) { ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(threads.length * 512); - buffer.writeBytes(DateUtil.toBytes(System.currentTimeMillis())) + buffer.writeBytes(DateTimeUtil.toBytes(System.currentTimeMillis())) .writeByte('\n') .writeBytes(FULL_THREAD_DUMP) .writeByte('\n'); diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/throttle/TokenBucket.java b/turms-server-common/src/main/java/im/turms/server/common/infra/throttle/TokenBucket.java index 859c99f992..5faeb3ae16 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/throttle/TokenBucket.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/throttle/TokenBucket.java @@ -38,7 +38,7 @@ public class TokenBucket { @Getter private volatile int tokens; @Getter - private volatile long lastRefillTime; + private volatile long lastRefillTimeNanos; /** * @implNote We don't validate properties here, and it should be validated when the properties @@ -47,22 +47,22 @@ public class TokenBucket { public TokenBucket(TokenBucketContext context) { this.context = context; tokens = context.initialTokens; - lastRefillTime = System.currentTimeMillis(); + lastRefillTimeNanos = System.nanoTime(); } - public boolean tryAcquire(long time) { + public boolean tryAcquire(long timestampNanos) { int tokenCount = tokens; if (tokenCount > 0) { if (TOKENS_UPDATER.compareAndSet(this, tokenCount, tokenCount - 1)) { return true; } - return tryAcquire(time); + return tryAcquire(timestampNanos); } - int refillInterval = context.refillIntervalMillis; - if (refillInterval <= 0) { + long refillIntervalNanos = context.refillIntervalNanos; + if (refillIntervalNanos <= 0) { return false; } - int periods = (int) (time - lastRefillTime) / refillInterval; + int periods = MathUtil.toInt((timestampNanos - lastRefillTimeNanos) / refillIntervalNanos); if (periods <= 0) { return false; } @@ -75,18 +75,18 @@ public boolean tryAcquire(long time) { tokenCount = capacity - 1; } if (TOKENS_UPDATER.compareAndSet(this, 0, tokenCount)) { - lastRefillTime = time; + lastRefillTimeNanos = timestampNanos; return true; } - return tryAcquire(time); + return tryAcquire(timestampNanos); } - public void refill(long time) { - int refillInterval = context.refillIntervalMillis; - if (refillInterval <= 0) { + public void refill(long timeNanos) { + long refillIntervalNanos = context.refillIntervalNanos; + if (refillIntervalNanos <= 0) { return; } - int periods = (int) (time - lastRefillTime) / refillInterval; + int periods = MathUtil.toInt((timeNanos - lastRefillTimeNanos) / refillIntervalNanos); if (periods <= 0) { return; } @@ -99,9 +99,9 @@ public void refill(long time) { newTokenCount = capacity; } if (TOKENS_UPDATER.compareAndSet(this, tokenCount, newTokenCount)) { - lastRefillTime = time; + lastRefillTimeNanos = timeNanos; } else { - refill(time); + refill(timeNanos); } } diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/throttle/TokenBucketContext.java b/turms-server-common/src/main/java/im/turms/server/common/infra/throttle/TokenBucketContext.java index ab1b42d205..bb9d433a8d 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/throttle/TokenBucketContext.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/throttle/TokenBucketContext.java @@ -21,6 +21,7 @@ import lombok.NoArgsConstructor; import im.turms.server.common.infra.property.env.common.BaseRateLimitingProperties; +import im.turms.server.common.infra.time.DateTimeUtil; /** * @author James Chen @@ -30,7 +31,7 @@ public class TokenBucketContext { int capacity; int tokensPerPeriod; - int refillIntervalMillis; + long refillIntervalNanos; int initialTokens; public TokenBucketContext(BaseRateLimitingProperties properties) { @@ -41,7 +42,7 @@ public void updateRequestTokenBucket(BaseRateLimitingProperties properties) { capacity = properties.getCapacity(); initialTokens = properties.getInitialTokens(); tokensPerPeriod = properties.getTokensPerPeriod(); - refillIntervalMillis = properties.getRefillIntervalMillis(); + refillIntervalNanos = DateTimeUtil.millisToNanos(properties.getRefillIntervalMillis()); } } \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/time/DateRange.java b/turms-server-common/src/main/java/im/turms/server/common/infra/time/DateRange.java index 658c7b873d..71e831b0e6 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/time/DateRange.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/time/DateRange.java @@ -50,7 +50,9 @@ public static boolean hasDate(@Nullable DateRange range) { public DateRange intersect(@Nullable DateRange range) { return range == null || equals(range) ? this - : new DateRange(DateUtil.max(start, range.start()), DateUtil.min(end, range.end())); + : new DateRange( + DateTimeUtil.max(start, range.start()), + DateTimeUtil.min(end, range.end())); } public DateRange move(long millis) { diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/time/DateUtil.java b/turms-server-common/src/main/java/im/turms/server/common/infra/time/DateTimeUtil.java similarity index 91% rename from turms-server-common/src/main/java/im/turms/server/common/infra/time/DateUtil.java rename to turms-server-common/src/main/java/im/turms/server/common/infra/time/DateTimeUtil.java index c09edb43a0..bcebbfbdb4 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/time/DateUtil.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/time/DateTimeUtil.java @@ -29,13 +29,17 @@ import org.apache.commons.lang3.tuple.Pair; import im.turms.server.common.infra.collection.ArrayUtil; +import im.turms.server.common.infra.lang.MathUtil; import im.turms.server.common.infra.lang.NumberFormatter; import im.turms.server.common.infra.lang.StringUtil; /** * @author James Chen */ -public final class DateUtil { +public final class DateTimeUtil { + + public static final long NANOS_PER_MILLI = 1_000_000L; + public static final long NANOS_PER_SECOND = 1_000_000_000L; private static final FastThreadLocal CALENDAR_THREAD_LOCAL = new FastThreadLocal<>() { @Override @@ -46,7 +50,7 @@ protected Calendar initialValue() { // "1970-01-01 00:00:00.000" public static final int DATE_TIME_LENGTH = 23; - private static final long HOURS_IN_MILLIS = 60 * 60 * 1000L; + private static final long MILLIS_PER_HOUR = 60 * 60 * 1000L; private static final int MAX_TWO_DIGITS_CACHE_NUMBER = 59; private static final int MAX_THREE_DIGITS_CACHE_NUMBER = 100; @@ -75,11 +79,19 @@ protected Calendar initialValue() { } } - private DateUtil() { + private DateTimeUtil() { + } + + public static long secondsToNanos(long seconds) { + return MathUtil.multiply(seconds, NANOS_PER_SECOND, Long.MAX_VALUE); + } + + public static long millisToNanos(long millis) { + return MathUtil.multiply(millis, NANOS_PER_MILLI, Long.MAX_VALUE); } public static Date addHours(long date, int hours) { - return new Date(date + hours * HOURS_IN_MILLIS); + return new Date(date + hours * MILLIS_PER_HOUR); } public static String toStr(long timeInMillis) { @@ -194,4 +206,4 @@ private static byte[] threeDigitBytes(int i) { return NumberFormatter.toCharBytes(i); } -} +} \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClient.java b/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClient.java index 9ce38a7e27..a797003f06 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClient.java +++ b/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClient.java @@ -51,6 +51,7 @@ import im.turms.server.common.infra.netty.ReferenceCountUtil; import im.turms.server.common.infra.reactor.PublisherUtil; import im.turms.server.common.infra.thread.ThreadNameConst; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.storage.redis.codec.TurmsRedisCodecAdapter; import im.turms.server.common.storage.redis.codec.context.RedisCodecContext; import im.turms.server.common.storage.redis.command.TurmsCommandEncoder; @@ -125,16 +126,17 @@ public void afterChannelInitialized(Channel channel) { public Mono destroy(long timeoutMillis) { return Mono.defer(() -> { - long startTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); return Mono .fromFuture(nativeClient.shutdownAsync(0, timeoutMillis, TimeUnit.MILLISECONDS)) .then(Mono.defer(() -> { - long elapsedTime = System.currentTimeMillis() - startTime; - long timeout = timeoutMillis - elapsedTime; - return PublisherUtil.whenDelayError(eventExecutorGroup - .shutdownGracefully(0, Math.max(1, timeout), TimeUnit.MILLISECONDS), - eventLoopGroupProvider - .shutdown(0, Math.max(1, timeout), TimeUnit.MILLISECONDS)); + long elapsedTime = + (System.nanoTime() - startTime) / DateTimeUtil.NANOS_PER_MILLI; + long timeout = Math.max(1, timeoutMillis - elapsedTime); + return PublisherUtil.whenDelayError( + eventExecutorGroup + .shutdownGracefully(0, timeout, TimeUnit.MILLISECONDS), + eventLoopGroupProvider.shutdown(0, timeout, TimeUnit.MILLISECONDS)); })); }); } diff --git a/turms-server-common/src/test/java/unit/im/turms/server/common/infra/throttle/TokenBucketTests.java b/turms-server-common/src/test/java/unit/im/turms/server/common/infra/throttle/TokenBucketTests.java index 729cbae567..49dea84abb 100644 --- a/turms-server-common/src/test/java/unit/im/turms/server/common/infra/throttle/TokenBucketTests.java +++ b/turms-server-common/src/test/java/unit/im/turms/server/common/infra/throttle/TokenBucketTests.java @@ -21,6 +21,7 @@ import im.turms.server.common.infra.throttle.TokenBucket; import im.turms.server.common.infra.throttle.TokenBucketContext; +import im.turms.server.common.infra.time.DateTimeUtil; import static org.assertj.core.api.Assertions.assertThat; @@ -34,7 +35,7 @@ void shouldNotAcquire_ifNoTokensAndNoRefill() { TokenBucketContext context = new TokenBucketContext(); context.setInitialTokens(0); TokenBucket bucket = new TokenBucket(context); - boolean acquired = bucket.tryAcquire(System.currentTimeMillis()); + boolean acquired = bucket.tryAcquire(System.nanoTime()); assertThat(acquired).isFalse(); } @@ -44,23 +45,23 @@ void shouldAcquire_ifTokensAreEnough() { context.setCapacity(1); context.setInitialTokens(1); TokenBucket bucket = new TokenBucket(context); - boolean acquired = bucket.tryAcquire(System.currentTimeMillis()); + boolean acquired = bucket.tryAcquire(System.nanoTime()); assertThat(acquired).isTrue(); } @Test void shouldAcquire_afterRefill() { int tokensPerPeriod = 10; - int refillIntervalMillis = 1; + long refillIntervalNanos = DateTimeUtil.NANOS_PER_MILLI; TokenBucketContext context = new TokenBucketContext(); context.setTokensPerPeriod(tokensPerPeriod); context.setCapacity(tokensPerPeriod); - context.setRefillIntervalMillis(refillIntervalMillis); + context.setRefillIntervalNanos(refillIntervalNanos); TokenBucket bucket = new TokenBucket(context); - long time = System.currentTimeMillis(); + long time = System.nanoTime(); drain(bucket, time); - time += refillIntervalMillis; + time += refillIntervalNanos; boolean acquired; for (int i = 0; i < tokensPerPeriod; i++) { acquired = bucket.tryAcquire(time); @@ -70,11 +71,11 @@ void shouldAcquire_afterRefill() { assertThat(acquired).isFalse(); } - private void drain(TokenBucket bucket, long time) { + private void drain(TokenBucket bucket, long timestampNanos) { boolean acquired; do { - acquired = bucket.tryAcquire(time); + acquired = bucket.tryAcquire(timestampNanos); } while (acquired); } -} +} \ No newline at end of file diff --git a/turms-server-common/src/test/java/unit/im/turms/server/common/infra/time/DateUtilTests.java b/turms-server-common/src/test/java/unit/im/turms/server/common/infra/time/DateTimeUtilTests.java similarity index 92% rename from turms-server-common/src/test/java/unit/im/turms/server/common/infra/time/DateUtilTests.java rename to turms-server-common/src/test/java/unit/im/turms/server/common/infra/time/DateTimeUtilTests.java index 55b9acafbd..29dc49e551 100644 --- a/turms-server-common/src/test/java/unit/im/turms/server/common/infra/time/DateUtilTests.java +++ b/turms-server-common/src/test/java/unit/im/turms/server/common/infra/time/DateTimeUtilTests.java @@ -24,14 +24,14 @@ import org.junit.jupiter.api.Test; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import static org.assertj.core.api.Assertions.assertThat; /** * @author James Chen */ -class DateUtilTests { +class DateTimeUtilTests { @Test void test() { @@ -50,9 +50,9 @@ void test() { dateTimes.forEach(dateTime -> { long millis = formatter.parse(dateTime, Instant::from) .toEpochMilli(); - String actual = DateUtil.toStr(millis); + String actual = DateTimeUtil.toStr(millis); assertThat(actual).isEqualTo(dateTime); }); } -} +} \ No newline at end of file diff --git a/turms-server-test-common/src/main/java/im/turms/server/common/testing/JsonUtil.java b/turms-server-test-common/src/main/java/im/turms/server/common/testing/JsonUtil.java index 7d9dc68ce4..54bfba3104 100644 --- a/turms-server-test-common/src/main/java/im/turms/server/common/testing/JsonUtil.java +++ b/turms-server-test-common/src/main/java/im/turms/server/common/testing/JsonUtil.java @@ -63,7 +63,7 @@ public static void assertEqual(Map actual, InputStream expected) public static JsonNode getSortedMapJsonNode(Map map) { Map sortedMap = sortMapEntries(map); - String json = null; + String json; try { json = MAPPER.writeValueAsString(sortedMap); return MAPPER.readTree(json); diff --git a/turms-service/src/main/java/im/turms/service/access/servicerequest/dispatcher/ServiceRequestDispatcher.java b/turms-service/src/main/java/im/turms/service/access/servicerequest/dispatcher/ServiceRequestDispatcher.java index a0a853ede7..db4548ab68 100644 --- a/turms-service/src/main/java/im/turms/service/access/servicerequest/dispatcher/ServiceRequestDispatcher.java +++ b/turms-service/src/main/java/im/turms/service/access/servicerequest/dispatcher/ServiceRequestDispatcher.java @@ -65,6 +65,7 @@ import im.turms.server.common.infra.property.TurmsPropertiesManager; import im.turms.server.common.infra.proto.ProtoDecoder; import im.turms.server.common.infra.proto.ProtoEncoder; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.tracing.TracingCloseableContext; import im.turms.server.common.infra.tracing.TracingContext; import im.turms.service.access.servicerequest.dto.ClientRequest; @@ -235,6 +236,7 @@ public Mono dispatch(TracingContext context, ServiceRequest ser private Mono dispatch0(TracingContext context, ServiceRequest serviceRequest) { long requestTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); // 1. Validate ServiceResponse Long userId = serviceRequest.getUserId(); DeviceType deviceType = serviceRequest.getDeviceType(); @@ -393,7 +395,7 @@ private Mono dispatch0(TracingContext context, ServiceRequest s requestSize, requestTime, response, - System.currentTimeMillis() - requestTime); + (System.nanoTime() - startTime) / DateTimeUtil.NANOS_PER_MILLI); } return response; }); diff --git a/turms-service/src/main/java/im/turms/service/domain/blocklist/codec/BlockedClientSerializer.java b/turms-service/src/main/java/im/turms/service/domain/blocklist/codec/BlockedClientSerializer.java index eec0ebf3f6..bbea8be252 100644 --- a/turms-service/src/main/java/im/turms/service/domain/blocklist/codec/BlockedClientSerializer.java +++ b/turms-service/src/main/java/im/turms/service/domain/blocklist/codec/BlockedClientSerializer.java @@ -27,7 +27,7 @@ import im.turms.server.common.domain.blocklist.bo.BlockedClient; import im.turms.server.common.infra.lang.ByteArrayWrapper; import im.turms.server.common.infra.net.InetAddressUtil; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; /** * @author James Chen @@ -47,7 +47,7 @@ public void serialize(BlockedClient value, JsonGenerator gen, SerializerProvider gen.writeStringField("id", InetAddressUtil.ipBytesToString(((ByteArrayWrapper) id).getBytes())); } - gen.writeStringField("blockEndTime", DateUtil.toStr(value.blockEndTimeMillis())); + gen.writeStringField("blockEndTime", DateTimeUtil.toStr(value.blockEndTimeMillis())); gen.writeEndObject(); } } diff --git a/turms-service/src/main/java/im/turms/service/domain/common/access/admin/controller/BaseController.java b/turms-service/src/main/java/im/turms/service/domain/common/access/admin/controller/BaseController.java index c2b13bec86..f76c740902 100644 --- a/turms-service/src/main/java/im/turms/service/domain/common/access/admin/controller/BaseController.java +++ b/turms-service/src/main/java/im/turms/service/domain/common/access/admin/controller/BaseController.java @@ -34,7 +34,7 @@ import im.turms.server.common.infra.property.TurmsPropertiesManager; import im.turms.server.common.infra.property.env.service.env.adminapi.AdminApiProperties; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.time.DivideBy; import im.turms.service.domain.common.access.admin.dto.response.StatisticsRecordDTO; @@ -80,7 +80,7 @@ public Mono> queryBetweenDate( @Nullable Boolean areGroupMessages, @Nullable Boolean areSystemMessages) { List> dates = - DateUtil.divideDuration(dateRange.start(), dateRange.end(), divideBy); + DateTimeUtil.divideDuration(dateRange.start(), dateRange.end(), divideBy); List> monos = new ArrayList<>(dates.size()); for (Pair datePair : dates) { Mono result = @@ -97,7 +97,7 @@ public Mono> queryBetweenDate( DivideBy divideBy, Function> function) { List> dates = - DateUtil.divideDuration(dateRange.start(), dateRange.end(), divideBy); + DateTimeUtil.divideDuration(dateRange.start(), dateRange.end(), divideBy); List> monos = new ArrayList<>(dates.size()); for (Pair datePair : dates) { DateRange range = DateRange.of(datePair.getLeft(), datePair.getRight()); diff --git a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupBlocklistService.java b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupBlocklistService.java index 3903399b6b..2389ede271 100644 --- a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupBlocklistService.java +++ b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupBlocklistService.java @@ -50,7 +50,7 @@ import im.turms.server.common.infra.recycler.Recyclable; import im.turms.server.common.infra.recycler.SetRecycler; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.ValidGroupBlockedUserKey; import im.turms.server.common.infra.validation.Validator; import im.turms.server.common.storage.mongo.IMongoCollectionInitializer; @@ -267,7 +267,7 @@ public Mono queryGroupBlockedUserIdsWithVersion( } return groupVersionService.queryBlocklistVersion(groupId) .flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } Recyclable> recyclableList = ListRecycler.obtain(); @@ -298,7 +298,7 @@ public Mono queryGroupBlockedUserInfosWithVersion( } return groupVersionService.queryBlocklistVersion(groupId) .flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } Recyclable> recyclableList = ListRecycler.obtain(); diff --git a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupInvitationService.java b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupInvitationService.java index c6246fa4bb..af9faf6b11 100644 --- a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupInvitationService.java +++ b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupInvitationService.java @@ -54,7 +54,7 @@ import im.turms.server.common.infra.recycler.Recyclable; import im.turms.server.common.infra.task.TaskManager; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.ValidRequestStatus; import im.turms.server.common.infra.validation.ValidResponseAction; import im.turms.server.common.infra.validation.Validator; @@ -425,7 +425,7 @@ public Mono queryUserGroupInvitationsWithVersion( ? userVersionService.querySentGroupInvitationsLastUpdatedDate(userId) : userVersionService.queryReceivedGroupInvitationsLastUpdatedDate(userId); return versionMono.flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } Flux invitationFlux = areSentByUser @@ -469,7 +469,7 @@ public Mono authAndQueryGroupInvitationsWithVersion } return groupVersionService.queryGroupInvitationsVersion(groupId) .flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } Recyclable> recyclableList = diff --git a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupJoinRequestService.java b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupJoinRequestService.java index 735212f183..1ecf994264 100644 --- a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupJoinRequestService.java +++ b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupJoinRequestService.java @@ -50,7 +50,7 @@ import im.turms.server.common.infra.property.env.service.business.group.GroupJoinRequestProperties; import im.turms.server.common.infra.task.TaskManager; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.ValidRequestStatus; import im.turms.server.common.infra.validation.ValidResponseAction; import im.turms.server.common.infra.validation.Validator; @@ -338,7 +338,7 @@ public Mono authAndQueryGroupJoinRequestsWithVersi }) : userVersionService.queryGroupJoinRequestsVersion(requesterId); return versionMono.flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } Flux requestFlux = searchRequestsByGroupId diff --git a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupMemberService.java b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupMemberService.java index b1d564a224..80c23e511f 100644 --- a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupMemberService.java +++ b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupMemberService.java @@ -68,7 +68,7 @@ import im.turms.server.common.infra.recycler.Recyclable; import im.turms.server.common.infra.recycler.SetRecycler; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.ValidGroupMemberRole; import im.turms.server.common.infra.validation.Validator; import im.turms.server.common.storage.mongo.IMongoCollectionInitializer; @@ -1186,7 +1186,7 @@ public Mono authAndQueryGroupMembersWithVersion( : Mono.error(ResponseException.get( ResponseStatusCode.NOT_GROUP_MEMBER_TO_QUERY_GROUP_MEMBER_INFO))) .flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } return queryGroupMembers(Set.of(groupId), null, null, null, null, null, null) diff --git a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupQuestionService.java b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupQuestionService.java index 22396869ed..2418425020 100644 --- a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupQuestionService.java +++ b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupQuestionService.java @@ -55,7 +55,7 @@ import im.turms.server.common.infra.reactor.PublisherPool; import im.turms.server.common.infra.recycler.ListRecycler; import im.turms.server.common.infra.recycler.Recyclable; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.ValidGroupQuestionIdAndAnswer; import im.turms.server.common.infra.validation.Validator; import im.turms.server.common.storage.mongo.IMongoCollectionInitializer; @@ -384,7 +384,7 @@ public Mono authAndQueryGroupJoinQuestionsWithVer : Mono.error(ResponseException.get( ResponseStatusCode.NOT_GROUP_OWNER_OR_MANAGER_TO_QUERY_GROUP_QUESTION_ANSWER))) .flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } Recyclable> recyclableList = ListRecycler.obtain(); diff --git a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupService.java b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupService.java index 8583c9318f..a46e345b90 100644 --- a/turms-service/src/main/java/im/turms/service/domain/group/service/GroupService.java +++ b/turms-service/src/main/java/im/turms/service/domain/group/service/GroupService.java @@ -69,7 +69,7 @@ import im.turms.server.common.infra.recycler.ListRecycler; import im.turms.server.common.infra.recycler.Recyclable; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.Validator; import im.turms.server.common.storage.mongo.IMongoCollectionInitializer; import im.turms.server.common.storage.mongo.operation.OperationResultConvertor; @@ -240,12 +240,12 @@ public Mono createGroup( .then(Mono.defer(() -> { createdGroupsCounter.increment(); return groupVersionService.upsert(groupId, now) - .onErrorResume(t -> { + .onErrorComplete(t -> { LOGGER.error( "Caught an error while upserting a version for the group ({}) after creating the group", groupId, t); - return Mono.empty(); + return true; }); })) .thenReturn(group); @@ -947,7 +947,7 @@ public Mono queryJoinedGroupIdsWithVersion( @Nullable Date lastUpdatedDate) { return userVersionService.queryJoinedGroupVersion(memberId) .flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } return groupMemberService.queryUserJoinedGroupIds(memberId) @@ -970,7 +970,7 @@ public Mono queryJoinedGroupsWithVersion( @Nullable Date lastUpdatedDate) { return userVersionService.queryJoinedGroupVersion(memberId) .flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } return queryJoinedGroups(memberId).collect(CollectorUtil.toChunkedList()) diff --git a/turms-service/src/main/java/im/turms/service/domain/message/service/MessageService.java b/turms-service/src/main/java/im/turms/service/domain/message/service/MessageService.java index f34bc42bd0..b0c7f34ff2 100644 --- a/turms-service/src/main/java/im/turms/service/domain/message/service/MessageService.java +++ b/turms-service/src/main/java/im/turms/service/domain/message/service/MessageService.java @@ -83,7 +83,7 @@ import im.turms.server.common.infra.task.TaskManager; import im.turms.server.common.infra.test.VisibleForTesting; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.Validator; import im.turms.server.common.storage.mongo.IMongoCollectionInitializer; import im.turms.server.common.storage.mongo.operation.OperationResultConvertor; @@ -725,7 +725,8 @@ public Flux queryExpiredMessageIds(@NotNull Integer retentionPeriodHours) } catch (ResponseException e) { return Flux.error(e); } - Date expirationDate = DateUtil.addHours(System.currentTimeMillis(), -retentionPeriodHours); + Date expirationDate = + DateTimeUtil.addHours(System.currentTimeMillis(), -retentionPeriodHours); return messageRepository.findExpiredMessageIds(expirationDate); } diff --git a/turms-service/src/main/java/im/turms/service/domain/user/service/UserFriendRequestService.java b/turms-service/src/main/java/im/turms/service/domain/user/service/UserFriendRequestService.java index c539e88c49..ac4b4afb31 100644 --- a/turms-service/src/main/java/im/turms/service/domain/user/service/UserFriendRequestService.java +++ b/turms-service/src/main/java/im/turms/service/domain/user/service/UserFriendRequestService.java @@ -49,7 +49,7 @@ import im.turms.server.common.infra.property.env.service.business.user.FriendRequestProperties; import im.turms.server.common.infra.task.TaskManager; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.ValidRequestStatus; import im.turms.server.common.infra.validation.ValidResponseAction; import im.turms.server.common.infra.validation.Validator; @@ -567,7 +567,7 @@ public Mono queryFriendRequestsWithVersion( ? userVersionService.querySentFriendRequestsVersion(userId) : userVersionService.queryReceivedFriendRequestsVersion(userId); return versionMono.flatMap(version -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, version)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, version)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } Flux requestFlux = areSentByUser diff --git a/turms-service/src/main/java/im/turms/service/domain/user/service/UserRelationshipGroupService.java b/turms-service/src/main/java/im/turms/service/domain/user/service/UserRelationshipGroupService.java index 279aac65d7..85e5be195a 100644 --- a/turms-service/src/main/java/im/turms/service/domain/user/service/UserRelationshipGroupService.java +++ b/turms-service/src/main/java/im/turms/service/domain/user/service/UserRelationshipGroupService.java @@ -49,7 +49,7 @@ import im.turms.server.common.infra.random.RandomUtil; import im.turms.server.common.infra.reactor.PublisherPool; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.validation.ValidUserRelationshipGroupKey; import im.turms.server.common.infra.validation.ValidUserRelationshipKey; import im.turms.server.common.infra.validation.Validator; @@ -153,7 +153,7 @@ public Mono queryRelationshipGroupsInfosWithV } return userVersionService.queryRelationshipGroupsLastUpdatedDate(ownerId) .flatMap(date -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, date)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, date)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } UserRelationshipGroupsWithVersion.Builder builder = diff --git a/turms-service/src/main/java/im/turms/service/domain/user/service/UserRelationshipService.java b/turms-service/src/main/java/im/turms/service/domain/user/service/UserRelationshipService.java index de9e1068b7..1933411a3a 100644 --- a/turms-service/src/main/java/im/turms/service/domain/user/service/UserRelationshipService.java +++ b/turms-service/src/main/java/im/turms/service/domain/user/service/UserRelationshipService.java @@ -53,7 +53,7 @@ import im.turms.server.common.infra.recycler.Recyclable; import im.turms.server.common.infra.recycler.SetRecycler; import im.turms.server.common.infra.time.DateRange; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.time.DurationConst; import im.turms.server.common.infra.validation.ValidUserRelationshipKey; import im.turms.server.common.infra.validation.Validator; @@ -332,7 +332,7 @@ public Mono queryRelatedUserIdsWithVersion( @Nullable Date lastUpdatedDate) { return userVersionService.queryRelationshipsLastUpdatedDate(ownerId) .flatMap(date -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, date)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, date)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } Recyclable> recyclableSet = SetRecycler.obtain(); @@ -370,7 +370,7 @@ public Mono queryRelationshipsWithVersion( @Nullable Date lastUpdatedDate) { return userVersionService.queryRelationshipsLastUpdatedDate(ownerId) .flatMap(date -> { - if (DateUtil.isAfterOrSame(lastUpdatedDate, date)) { + if (DateTimeUtil.isAfterOrSame(lastUpdatedDate, date)) { return ResponseExceptionPublisherPool.alreadyUpToUpdate(); } Recyclable> recyclableSet = SetRecycler.obtain(); diff --git a/turms-service/src/main/java/im/turms/service/infra/logging/ClientApiLogging.java b/turms-service/src/main/java/im/turms/service/infra/logging/ClientApiLogging.java index 360e5c5b73..bc6eef8437 100644 --- a/turms-service/src/main/java/im/turms/service/infra/logging/ClientApiLogging.java +++ b/turms-service/src/main/java/im/turms/service/infra/logging/ClientApiLogging.java @@ -24,7 +24,7 @@ import im.turms.server.common.access.servicerequest.dto.ServiceResponse; import im.turms.server.common.infra.lang.NumberFormatter; import im.turms.server.common.infra.netty.ByteBufUtil; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.service.access.servicerequest.dto.ClientRequest; import static im.turms.server.common.infra.logging.CommonLogger.CLIENT_API_LOGGER; @@ -69,7 +69,7 @@ public static void log( .getKindCase() .name(), NumberFormatter.toCharBytes(requestSize), - DateUtil.toBytes(requestTime), + DateTimeUtil.toBytes(requestTime), // response information NumberFormatter.toCharBytes(response.code() .getBusinessCode()), diff --git a/turms-service/src/test/java/unit/im/turms/service/infra/time/DateUtilTests.java b/turms-service/src/test/java/unit/im/turms/service/infra/time/DateTimeUtilTests.java similarity index 64% rename from turms-service/src/test/java/unit/im/turms/service/infra/time/DateUtilTests.java rename to turms-service/src/test/java/unit/im/turms/service/infra/time/DateTimeUtilTests.java index dd6212fb54..41de0728be 100644 --- a/turms-service/src/test/java/unit/im/turms/service/infra/time/DateUtilTests.java +++ b/turms-service/src/test/java/unit/im/turms/service/infra/time/DateTimeUtilTests.java @@ -24,7 +24,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Test; -import im.turms.server.common.infra.time.DateUtil; +import im.turms.server.common.infra.time.DateTimeUtil; import im.turms.server.common.infra.time.DivideBy; import static org.assertj.core.api.Assertions.assertThat; @@ -32,37 +32,38 @@ /** * @author James Chen */ -class DateUtilTests { +class DateTimeUtilTests { @Test void divideDuration() { Date earlierDate = Date.from(Instant.parse("2011-12-03T10:15:30Z")); Date laterDate = Date.from(Instant.parse("2012-06-03T10:15:30Z")); List> hours = - DateUtil.divideDuration(earlierDate, laterDate, DivideBy.HOUR); - List> days = DateUtil.divideDuration(earlierDate, laterDate, DivideBy.DAY); + DateTimeUtil.divideDuration(earlierDate, laterDate, DivideBy.HOUR); + List> days = + DateTimeUtil.divideDuration(earlierDate, laterDate, DivideBy.DAY); List> months = - DateUtil.divideDuration(earlierDate, laterDate, DivideBy.MONTH); + DateTimeUtil.divideDuration(earlierDate, laterDate, DivideBy.MONTH); assertThat(hours).hasSize(4392); assertThat(days).hasSize(183); assertThat(months).hasSize(6); - hours = DateUtil.divideDuration(laterDate, earlierDate, DivideBy.HOUR); - days = DateUtil.divideDuration(laterDate, earlierDate, DivideBy.DAY); - months = DateUtil.divideDuration(laterDate, earlierDate, DivideBy.MONTH); + hours = DateTimeUtil.divideDuration(laterDate, earlierDate, DivideBy.HOUR); + days = DateTimeUtil.divideDuration(laterDate, earlierDate, DivideBy.DAY); + months = DateTimeUtil.divideDuration(laterDate, earlierDate, DivideBy.MONTH); assertThat(hours).isEmpty(); assertThat(days).isEmpty(); assertThat(months).isEmpty(); - hours = DateUtil.divideDuration(earlierDate, earlierDate, DivideBy.HOUR); - days = DateUtil.divideDuration(earlierDate, earlierDate, DivideBy.DAY); - months = DateUtil.divideDuration(earlierDate, earlierDate, DivideBy.MONTH); + hours = DateTimeUtil.divideDuration(earlierDate, earlierDate, DivideBy.HOUR); + days = DateTimeUtil.divideDuration(earlierDate, earlierDate, DivideBy.DAY); + months = DateTimeUtil.divideDuration(earlierDate, earlierDate, DivideBy.MONTH); assertThat(hours).isEmpty(); assertThat(days).isEmpty(); assertThat(months).isEmpty(); } -} +} \ No newline at end of file