Skip to content

Commit

Permalink
Use monotonic clock to measure time for robustness at the cost of per…
Browse files Browse the repository at this point in the history
…formance
  • Loading branch information
JamesChenX committed Dec 18, 2024
1 parent fa62569 commit b980316
Show file tree
Hide file tree
Showing 44 changed files with 273 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import im.turms.server.common.infra.property.TurmsPropertiesManager;
import im.turms.server.common.infra.proto.ProtoDecoder;
import im.turms.server.common.infra.proto.ProtoEncoder;
import im.turms.server.common.infra.time.DateTimeUtil;
import im.turms.server.common.infra.tracing.TracingCloseableContext;
import im.turms.server.common.infra.tracing.TracingContext;

Expand Down Expand Up @@ -166,6 +167,7 @@ public Mono<ByteBuf> handleRequest0(
}
// Parse and handle service requests
long requestTime = System.currentTimeMillis();
long startTime = System.nanoTime();
int requestSize = serviceRequestBuffer.readableBytes();
SimpleTurmsRequest request;
SimpleTurmsRequest tempRequest;
Expand Down Expand Up @@ -250,7 +252,7 @@ public Mono<ByteBuf> handleRequest0(
requestSize,
requestTime,
notification,
System.currentTimeMillis() - requestTime);
(System.nanoTime() - startTime) / DateTimeUtil.NANOS_PER_MILLI);
}
}
return ProtoEncoder.getDirectByteBuffer(notification);
Expand Down Expand Up @@ -287,8 +289,7 @@ public Mono<TurmsNotification> handleServiceRequest(
}

// Rate limiting
long now = System.currentTimeMillis();
if (!ipRequestThrottler.tryAcquireToken(sessionWrapper.getIp(), now)) {
if (!ipRequestThrottler.tryAcquireToken(sessionWrapper.getIp(), System.nanoTime())) {
blocklistService.tryBlockIpForFrequentRequest(sessionWrapper.getIp());
UserSession userSession = sessionWrapper.getUserSession();
if (userSession != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import im.turms.server.common.infra.thread.NamedThreadFactory;
import im.turms.server.common.infra.throttle.TokenBucket;
import im.turms.server.common.infra.throttle.TokenBucketContext;
import im.turms.server.common.infra.time.DateTimeUtil;

/**
* @author James Chen
Expand All @@ -41,10 +42,10 @@ public class IpRequestThrottler {

private static final Logger LOGGER = LoggerFactory.getLogger(IpRequestThrottler.class);

private static final long IDLE_ENTRY_TTL = 30 * 60 * 1000L;
private static final long INTERVAL_TO_CHECK = 30 * 60 * 1000L;
private static final long IDLE_ENTRY_TTL_NANOS = 30 * 60 * DateTimeUtil.NANOS_PER_SECOND;
private static final long INTERVAL_TO_CHECK_MILLIS = 30L * 60 * 1000;
private static final int BATCH_SIZE = 10_000;
private static final long SLEEP_THRESHOLD_MILLIS = 1000;
private static final long SLEEP_THRESHOLD_NANOS = 1000 * DateTimeUtil.NANOS_PER_MILLI;
private static final long SLEEP_MILLIS = 1000;

/**
Expand Down Expand Up @@ -94,7 +95,7 @@ public IpRequestThrottler(
LOGGER.error("Failed to remove expired request token buckets", e);
}
try {
Thread.sleep(INTERVAL_TO_CHECK);
Thread.sleep(INTERVAL_TO_CHECK_MILLIS);
} catch (InterruptedException e) {
break;
}
Expand All @@ -107,11 +108,11 @@ private void removeExpiredRequestTokenBuckets() throws InterruptedException {
Iterator<TokenBucket> iterator = ipToRequestTokenBucket.values()
.iterator();
int processed = 0;
long startTime = System.currentTimeMillis();
long startTimeNanos = System.nanoTime();
while (iterator.hasNext()) {
TokenBucket bucket = iterator.next();
long lastAccessTime = bucket.getLastRefillTime();
if ((startTime - lastAccessTime) > IDLE_ENTRY_TTL
long lastAccessTimeNanos = bucket.getLastRefillTimeNanos();
if ((startTimeNanos - lastAccessTimeNanos) > IDLE_ENTRY_TTL_NANOS
&& bucket.isTokensMoreThanOrEqualsToInitialTokens()) {
iterator.remove();
}
Expand All @@ -120,9 +121,9 @@ private void removeExpiredRequestTokenBuckets() throws InterruptedException {
// and cause the server cannot serve for users
if (processed >= BATCH_SIZE) {
processed = 0;
if (System.currentTimeMillis() - startTime > SLEEP_THRESHOLD_MILLIS) {
if (System.nanoTime() - startTimeNanos > SLEEP_THRESHOLD_NANOS) {
Thread.sleep(SLEEP_MILLIS);
startTime = System.currentTimeMillis();
startTimeNanos = System.nanoTime();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ public final class UserSession {
* other types of requests
*/
private volatile long lastHeartbeatRequestTimestampMillis;
private volatile long lastHeartbeatRequestTimestampNanos;
/**
* Record the timestamp of the last requests except heartbeat requests
*/
private volatile long lastRequestTimestampMillis;
private volatile long lastRequestTimestampNanos;
// No need to add volatile because it can only be accessed by one thread
// (the thread "turms-client-heartbeat-refresher" in HeartbeatManager)
private long lastHeartbeatUpdateTimestampMillis;
private long lastHeartbeatUpdateTimestampNanos;

/**
* Note that it is acceptable that the session is still open even if the connection is closed
Expand All @@ -118,6 +120,8 @@ public UserSession(
@Nullable Map<String, String> deviceDetails,
@Nullable Location loginLocation) {
Date now = new Date();
long nowMillis = now.getTime();
long nowNanos = System.nanoTime();
this.version = version;
this.permissions = permissions;
this.userId = userId;
Expand All @@ -127,15 +131,27 @@ public UserSession(
: deviceDetails;
this.loginLocation = loginLocation;
this.loginDate = now;
this.lastHeartbeatRequestTimestampMillis = now.getTime();
this.lastRequestTimestampMillis = now.getTime();
this.lastHeartbeatRequestTimestampMillis = nowMillis;
this.lastHeartbeatRequestTimestampNanos = nowNanos;
this.lastRequestTimestampMillis = nowMillis;
this.lastRequestTimestampNanos = nowNanos;
}

public void setConnection(NetConnection connection, ByteArrayWrapper ip) {
this.connection = connection;
this.ip = ip;
}

public void setLastHeartbeatRequestTimestampToNow() {
lastHeartbeatRequestTimestampMillis = System.currentTimeMillis();
lastHeartbeatRequestTimestampNanos = System.nanoTime();
}

public void setLastRequestTimestampToNow() {
lastRequestTimestampMillis = System.currentTimeMillis();
lastRequestTimestampNanos = System.nanoTime();
}

/**
* A session cannot reopen once closed, but the connection can close and reconnect.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Mono<TurmsNotification> handleServiceRequest(
ServiceRequest serviceRequest) {
try {
// Update request timestamp
session.setLastRequestTimestampMillis(System.currentTimeMillis());
session.setLastRequestTimestampToNow();
// Forward request
serviceRequest.getTurmsRequestBuffer()
.retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import im.turms.server.common.infra.logging.core.logger.Logger;
import im.turms.server.common.infra.logging.core.logger.LoggerFactory;
import im.turms.server.common.infra.thread.TurmsThread;
import im.turms.server.common.infra.time.DateTimeUtil;
import im.turms.server.common.infra.time.DurationConst;

/**
Expand Down Expand Up @@ -70,12 +71,12 @@ public class HeartbeatManager {
private final Map<Long, UserSessionsManager> userIdToSessionsManager;
private final TurmsThread workerThread;
private int closeIdleSessionAfterSeconds;
private int closeIdleSessionAfterMillis;
private long closeIdleSessionAfterNanos;
private int expectedFractionPerSecond;
@Setter
private int minHeartbeatIntervalMillis;
private long minHeartbeatIntervalNanos;
@Setter
private int switchProtocolAfterMillis;
private long switchProtocolAfterNanos;

public HeartbeatManager(
SessionService sessionService,
Expand All @@ -90,8 +91,8 @@ public HeartbeatManager(
this.userIdToSessionsManager = userIdToSessionsManager;
setClientHeartbeatIntervalSeconds(clientHeartbeatIntervalSeconds);
setCloseIdleSessionAfterSeconds(closeIdleSessionAfterSeconds);
this.minHeartbeatIntervalMillis = minHeartbeatIntervalSeconds * 1000;
this.switchProtocolAfterMillis = switchProtocolAfterSeconds * 1000;
this.minHeartbeatIntervalNanos = DateTimeUtil.secondsToNanos(minHeartbeatIntervalSeconds);
this.switchProtocolAfterNanos = DateTimeUtil.secondsToNanos(switchProtocolAfterSeconds);
workerThread = TurmsThread.create(ThreadNameConst.CLIENT_HEARTBEAT_REFRESHER, true, () -> {
Thread thread = Thread.currentThread();
while (!thread.isInterrupted()) {
Expand All @@ -115,7 +116,7 @@ public HeartbeatManager(

public void setCloseIdleSessionAfterSeconds(int closeIdleSessionAfterSeconds) {
this.closeIdleSessionAfterSeconds = closeIdleSessionAfterSeconds;
closeIdleSessionAfterMillis = closeIdleSessionAfterSeconds * 1000;
closeIdleSessionAfterNanos = DateTimeUtil.secondsToNanos(closeIdleSessionAfterSeconds);
}

public void setClientHeartbeatIntervalSeconds(int clientHeartbeatIntervalSeconds) {
Expand Down Expand Up @@ -167,7 +168,7 @@ private LongKeyGenerator collectOnlineUsersAndUpdateStatus(
* onlineUserCount / expectedFractionPerSecond);
Iterator<UserSessionsManager> iterator = managers.iterator();
return new LongKeyGenerator() {
final long now = System.currentTimeMillis();
final long now = System.nanoTime();

@Override
public int estimatedSize() {
Expand Down Expand Up @@ -196,38 +197,37 @@ public long next() {
}

@Nullable
private Long closeOrUpdateSession(UserSession session, long now) {
private Long closeOrUpdateSession(UserSession session, long nowNanos) {
if (!session.isOpen()) {
return null;
}
if (UdpRequestDispatcher.isEnabled()
&& session.supportsSwitchingToUdp()
&& session.isConnected()) {
int requestElapsedTime = (int) (now - session.getLastRequestTimestampMillis());
if (requestElapsedTime > switchProtocolAfterMillis) {
session.getConnection()
.switchToUdp();
return null;
}
&& session.isConnected()
&& nowNanos - session.getLastRequestTimestampNanos() > switchProtocolAfterNanos) {
session.getConnection()
.switchToUdp();
return null;
}
// Limit the frequency of sending heartbeat requests to Redis
long lastHeartbeatUpdateTimestamp = session.getLastHeartbeatUpdateTimestampMillis();
int localMinHeartbeatIntervalMillis = minHeartbeatIntervalMillis;
if (localMinHeartbeatIntervalMillis > 0
&& now - lastHeartbeatUpdateTimestamp < localMinHeartbeatIntervalMillis) {
long lastHeartbeatUpdateTimestampNanos = session.getLastHeartbeatUpdateTimestampNanos();
long localMinHeartbeatIntervalNanos = minHeartbeatIntervalNanos;
if (localMinHeartbeatIntervalNanos > 0
&& nowNanos - lastHeartbeatUpdateTimestampNanos < localMinHeartbeatIntervalNanos) {
return null;
}
// Only sends heartbeat requests to Redis if the client has
// sent any request to the local node after the last heartbeat update request
long lastHeartbeatRequestTimestamp =
Math.max(session.getLastHeartbeatRequestTimestampMillis(),
session.getLastRequestTimestampMillis());
if (lastHeartbeatRequestTimestamp <= lastHeartbeatUpdateTimestamp) {
long lastHeartbeatRequestTimestampNanos =
Math.max(session.getLastHeartbeatRequestTimestampNanos(),
session.getLastRequestTimestampNanos());
if (lastHeartbeatRequestTimestampNanos <= lastHeartbeatUpdateTimestampNanos) {
return null;
}
int localCloseIdleSessionAfterMillis = closeIdleSessionAfterMillis;
if (localCloseIdleSessionAfterMillis > 0
&& (int) (now - lastHeartbeatRequestTimestamp) > localCloseIdleSessionAfterMillis) {
long localCloseIdleSessionAfterNanos = closeIdleSessionAfterNanos;
if (localCloseIdleSessionAfterNanos > 0
&& nowNanos
- lastHeartbeatRequestTimestampNanos > localCloseIdleSessionAfterNanos) {
sessionService
.closeLocalSession(session.getUserId(),
session.getDeviceType(),
Expand All @@ -239,7 +239,7 @@ private Long closeOrUpdateSession(UserSession session, long now) {
HEARTBEAT_TIMEOUT));
return null;
}
session.setLastHeartbeatUpdateTimestampMillis(now);
session.setLastHeartbeatUpdateTimestampNanos(nowNanos);
return session.getUserId();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import im.turms.server.common.infra.property.env.gateway.session.SessionProperties;
import im.turms.server.common.infra.reactor.PublisherPool;
import im.turms.server.common.infra.reactor.PublisherUtil;
import im.turms.server.common.infra.time.DateTimeUtil;
import im.turms.server.common.infra.validation.ValidDeviceType;
import im.turms.server.common.infra.validation.Validator;

Expand Down Expand Up @@ -180,10 +181,10 @@ public SessionService(
newSessionProperties.getClientHeartbeatIntervalSeconds());
heartbeatManager.setCloseIdleSessionAfterSeconds(
newSessionProperties.getCloseIdleSessionAfterSeconds());
heartbeatManager.setMinHeartbeatIntervalMillis(
newSessionProperties.getMinHeartbeatIntervalSeconds() * 1000);
heartbeatManager.setSwitchProtocolAfterMillis(
newSessionProperties.getSwitchProtocolAfterSeconds() * 1000);
heartbeatManager.setMinHeartbeatIntervalNanos(DateTimeUtil
.secondsToNanos(newSessionProperties.getMinHeartbeatIntervalSeconds()));
heartbeatManager.setSwitchProtocolAfterNanos(DateTimeUtil
.secondsToNanos(newSessionProperties.getSwitchProtocolAfterSeconds()));
});
propertiesManager.addLocalPropertiesChangeListener(this::updateLocalProperties);

Expand Down Expand Up @@ -222,7 +223,7 @@ public Mono<Void> destroy() {
}

public void handleHeartbeatUpdateRequest(UserSession session) {
session.setLastHeartbeatRequestTimestampMillis(System.currentTimeMillis());
session.setLastHeartbeatRequestTimestampToNow();
}

public Mono<UserSession> handleLoginRequest(
Expand Down Expand Up @@ -596,7 +597,7 @@ public UserSession authAndUpdateHeartbeatTimestamp(
&& session.getId() == sessionId
&& !session.getConnection()
.isConnectionRecovering()) {
session.setLastHeartbeatRequestTimestampMillis(System.currentTimeMillis());
session.setLastHeartbeatRequestTimestampToNow();
return session;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import im.turms.server.common.infra.lang.NumberFormatter;
import im.turms.server.common.infra.lang.StringUtil;
import im.turms.server.common.infra.netty.ByteBufUtil;
import im.turms.server.common.infra.time.DateUtil;
import im.turms.server.common.infra.time.DateTimeUtil;

import static im.turms.server.common.infra.logging.CommonLogger.CLIENT_API_LOGGER;
import static im.turms.server.common.infra.logging.CommonLogger.LOG_FIELD_DELIMITER;
Expand Down Expand Up @@ -70,7 +70,7 @@ public static void log(
NumberFormatter.toCharBytes(requestId),
requestType.name(),
NumberFormatter.toCharBytes(requestSize),
DateUtil.toBytes(requestTime),
DateTimeUtil.toBytes(requestTime),
// response information
NumberFormatter.toCharBytes(response.getCode()),
response.hasData()
Expand Down Expand Up @@ -107,7 +107,7 @@ public static void log(
NumberFormatter.toCharBytes(requestId),
requestType.name(),
NumberFormatter.toCharBytes(requestSize),
DateUtil.toBytes(requestTime),
DateTimeUtil.toBytes(requestTime),
// response information
NumberFormatter.toCharBytes(responseCode),
null, // Response data type
Expand Down Expand Up @@ -142,7 +142,7 @@ public static void log(
NumberFormatter.toCharBytes(requestId),
requestType,
NumberFormatter.toCharBytes(requestSize),
DateUtil.toBytes(requestTime),
DateTimeUtil.toBytes(requestTime),
// response information
NumberFormatter.toCharBytes(responseCode),
responseDataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected BaseAdminApiRateLimitingManager(
() -> {
Iterator<TokenBucket> iterator = ipToTokenBucket.values()
.iterator();
long now = System.currentTimeMillis();
long now = System.nanoTime();
while (iterator.hasNext()) {
TokenBucket bucket = iterator.next();
bucket.refill(now);
Expand All @@ -60,7 +60,7 @@ protected BaseAdminApiRateLimitingManager(
public boolean tryAcquireTokenByIp(String ip) {
TokenBucket bucket =
ipToTokenBucket.computeIfAbsent(ip, key -> new TokenBucket(tokenBucketContext));
return bucket.tryAcquire(System.currentTimeMillis());
return bucket.tryAcquire(System.nanoTime());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import im.turms.server.common.infra.property.TurmsPropertiesManager;
import im.turms.server.common.infra.property.env.common.adminapi.AdminHttpProperties;
import im.turms.server.common.infra.property.env.common.adminapi.BaseAdminApiProperties;
import im.turms.server.common.infra.time.DateTimeUtil;
import im.turms.server.common.infra.time.DurationConst;
import im.turms.server.common.infra.tracing.TracingCloseableContext;
import im.turms.server.common.infra.tracing.TracingContext;
Expand Down Expand Up @@ -310,6 +311,7 @@ private Mono<Void> handle(HttpServerRequest request, HttpServerResponse response
.send();
}
long requestTime = System.currentTimeMillis();
long startTime = System.nanoTime();
TracingContext tracingContext = new TracingContext();
RequestContext requestContext = new RequestContext();
String ip = request.remoteAddress()
Expand All @@ -336,7 +338,7 @@ private Mono<Void> handle(HttpServerRequest request, HttpServerResponse response
requestTime,
requestContext.getAction(),
requestContext.getParamValues(),
(int) (System.currentTimeMillis() - requestTime),
(int) ((System.nanoTime() - startTime) / DateTimeUtil.NANOS_PER_MILLI),
signal.getThrowable());
})
.onErrorResume(t -> {
Expand Down
Loading

0 comments on commit b980316

Please sign in to comment.