From 46870a29d6731128e31ceccc7010d4d8f277d8fc Mon Sep 17 00:00:00 2001 From: JamesChenX Date: Wed, 11 Oct 2023 21:36:30 +0800 Subject: [PATCH] Refresh user sessions info actively in local turms-gateway #1265 --- .../session/service/UserStatusService.java | 77 ++++++++++++++----- .../storage/redis/TurmsRedisClient.java | 4 +- .../redis/TurmsRedisClientManager.java | 6 +- .../storage/redis/script/RedisScript.java | 8 +- .../session/try_add_online_user_with_ttl.lua | 12 ++- 5 files changed, 76 insertions(+), 31 deletions(-) diff --git a/turms-server-common/src/main/java/im/turms/server/common/domain/session/service/UserStatusService.java b/turms-server-common/src/main/java/im/turms/server/common/domain/session/service/UserStatusService.java index c381835ddd..fd7e5ee93a 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/domain/session/service/UserStatusService.java +++ b/turms-server-common/src/main/java/im/turms/server/common/domain/session/service/UserStatusService.java @@ -78,17 +78,17 @@ public class UserStatusService { private static final long NODE_STATUS_TTL_MILLIS = 15_000L; - private final RedisScript addOnlineUserScript; - private final RedisScript getUsersDeviceDetailsScript = + private final RedisScript addOnlineUserScript; + private final RedisScript> getUsersDeviceDetailsScript = RedisScript.get(new ClassPathResource("redis/session/get_users_device_details.lua"), ScriptOutputType.MULTI); - private final RedisScript removeUserStatusesScript = + private final RedisScript removeUserStatusesScript = RedisScript.get(new ClassPathResource("redis/session/remove_user_statuses.lua"), ScriptOutputType.BOOLEAN); - private final RedisScript updateUsersTtlScript = + private final RedisScript> updateUsersTtlScript = RedisScript.get(new ClassPathResource("redis/session/update_users_ttl.lua"), ScriptOutputType.MULTI); - private final RedisScript updateOnlineUserStatusIfPresent = RedisScript.get( + private final RedisScript updateOnlineUserStatusIfPresent = RedisScript.get( new ClassPathResource("redis/session/update_online_user_status_if_present.lua"), ScriptOutputType.BOOLEAN); @@ -112,7 +112,7 @@ public class UserStatusService { * | | ... | ... | * +-------------+-------------------------+-----------------------------------+ * - * + *

* "$" is the fixed hash key of the user status value, and its value is the user status value * represented in number. *

@@ -156,7 +156,7 @@ public UserStatusService( deviceStatusTtlMillis = deviceStatusTtlSeconds * 1000L; addOnlineUserScript = RedisScript.get( new ClassPathResource("redis/session/try_add_online_user_with_ttl.lua"), - ScriptOutputType.BOOLEAN, + ScriptOutputType.VALUE, Map.of("DEVICE_DETAILS_TTL", deviceDetailsExpireAfterSeconds, "DEVICE_STATUS_TTL", @@ -251,11 +251,20 @@ public Mono updateOnlineUserStatusIfPresent( } catch (ResponseException e) { return Mono.error(e); } - Mono result = sessionRedisClientManager.eval(userId, - updateOnlineUserStatusIfPresent, - userId, - (byte) userStatus.getNumber()); - return result.timeout(operationTimeout, HashedWheelScheduler.getDaemon()); + Mono mono = sessionRedisClientManager + .eval(userId, + updateOnlineUserStatusIfPresent, + userId, + (byte) userStatus.getNumber()) + .timeout(operationTimeout, HashedWheelScheduler.getDaemon()); + if (cacheUserSessionsStatus) { + return mono.doOnNext(exists -> { + if (!exists) { + userIdToStatusCache.invalidate(userId); + } + }); + } + return mono; } public Mono> updateOnlineUsersTtl( @@ -279,13 +288,16 @@ public Mono> updateOnlineUsersTtl( } int size = userIdGenerator.estimatedSize(); Set nonexistentUserIds = CollectionUtil.newSetWithExpectedSize(size); - for (Object value : objects) { - List buffers = (List) value; + for (List buffers : objects) { if (buffers.size() == 1 && buffers.get(0) == null) { return Collections.emptySet(); } for (ByteBuf buffer : buffers) { - nonexistentUserIds.add(buffer.readLong()); + Long userId = buffer.readLong(); + nonexistentUserIds.add(userId); + if (cacheUserSessionsStatus) { + userIdToStatusCache.invalidate(userId); + } } } return nonexistentUserIds; @@ -527,8 +539,7 @@ public Mono>> fetchDeviceDetails( .map(results -> { Map> userIdToDetails = CollectionUtil.newMapWithExpectedSize(userIds.size()); - for (Object rawElements : results) { - List elements = (List) rawElements; + for (List elements : results) { int elementCount = elements.size(); if (elementCount == 0) { continue; @@ -590,6 +601,9 @@ public Mono removeStatusByUserIdAndDeviceTypes( return Mono.error(new InputOutputException("Failed to encode arguments", e)); } Mono mono = sessionRedisClientManager.eval(userId, removeUserStatusesScript, args); + if (cacheUserSessionsStatus) { + mono = mono.doOnSuccess(ignored -> userIdToStatusCache.invalidate(userId)); + } return mono.timeout(operationTimeout, HashedWheelScheduler.getDaemon()); } @@ -678,7 +692,34 @@ public Mono addOnlineDeviceIfAbsent( ReferenceCountUtil.ensureReleased(args, 0, index); return Mono.error(new InputOutputException("Failed to encode arguments", e)); } - return sessionRedisClientManager.eval(userId, addOnlineUserScript, args); + return sessionRedisClientManager.eval(userId, addOnlineUserScript, args) + .map(buffer -> { + byte returnCode = buffer.readByte(); + return switch (returnCode) { + case '0' -> false; + case '1' -> { + if (cacheUserSessionsStatus) { + Map deviceTypeToSessions = + new FastEnumMap<>(DeviceType.class); + deviceTypeToSessions.put(deviceType, + new UserDeviceSessionInfo( + this.localNodeId, + System.currentTimeMillis(), + true)); + userIdToStatusCache.put(userId, + new UserSessionsStatus( + userId, + userStatus, + deviceTypeToSessions)); + } + yield true; + } + case '2' -> true; + default -> throw new RuntimeException( + "Unexpected return code: " + + returnCode); + }; + }); } private record NodeStatus( diff --git a/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClient.java b/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClient.java index a20b9e0164..7aef46ed42 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClient.java +++ b/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClient.java @@ -244,14 +244,14 @@ public Mono georem(Object key, Object... members) { // Scripting - public Mono eval(RedisScript script, ByteBuf... keys) { + public Mono eval(RedisScript script, ByteBuf... keys) { return eval(script, keys.length, keys); } /** * @param keyLength the real key length */ - public Mono eval(RedisScript script, int keyLength, ByteBuf... keys) { + public Mono eval(RedisScript script, int keyLength, ByteBuf... keys) { return Mono.defer(() -> { ByteBuf key; for (int i = 0, length = keys.length; i < length; i++) { diff --git a/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClientManager.java b/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClientManager.java index bb7cbf6881..d52fa9e6af 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClientManager.java +++ b/turms-server-common/src/main/java/im/turms/server/common/storage/redis/TurmsRedisClientManager.java @@ -149,12 +149,12 @@ public Mono georem(Long shardKey, Object key, Object... members) { // Scripting - public Mono eval(Long shardKey, RedisScript script, Object... keys) { + public Mono eval(Long shardKey, RedisScript script, Object... keys) { ByteBuf[] buffers = ByteBufUtil.writeObjects(keys); return getClient(shardKey).eval(script, buffers); } - public Mono eval(Long shardKey, RedisScript script, ByteBuf[] buffers) { + public Mono eval(Long shardKey, RedisScript script, ByteBuf[] buffers) { return getClient(shardKey).eval(script, buffers); } @@ -165,7 +165,7 @@ public Mono eval(Long shardKey, RedisScript script, ByteBuf[] buffers) { * will throw */ public Flux eval( - RedisScript script, + RedisScript script, short firstKey, byte[] secondKey, LongKeyGenerator keyGenerator) { diff --git a/turms-server-common/src/main/java/im/turms/server/common/storage/redis/script/RedisScript.java b/turms-server-common/src/main/java/im/turms/server/common/storage/redis/script/RedisScript.java index 510bd275ca..e34b3823a7 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/storage/redis/script/RedisScript.java +++ b/turms-server-common/src/main/java/im/turms/server/common/storage/redis/script/RedisScript.java @@ -38,7 +38,7 @@ /** * @author James Chen */ -public record RedisScript( +public record RedisScript( ByteBuf script, ByteBuf digest, ScriptOutputType outputType @@ -49,14 +49,14 @@ public record RedisScript( /** * @param outputType {@link BaseRedisCommandBuilder#newScriptOutput} */ - public static RedisScript get(ClassPathResource resource, ScriptOutputType outputType) { + public static RedisScript get(ClassPathResource resource, ScriptOutputType outputType) { return get(resource, outputType, null); } /** * @param outputType {@link BaseRedisCommandBuilder#newScriptOutput} */ - public static RedisScript get( + public static RedisScript get( ClassPathResource resource, ScriptOutputType outputType, @Nullable Map placeholders) { @@ -81,7 +81,7 @@ public static RedisScript get( + bytes.length; throw new InputOutputException(message); } - return new RedisScript( + return new RedisScript<>( ByteBufUtil.getUnreleasableDirectBuffer(bytes), ByteBufUtil .getUnreleasableDirectBuffer(StringUtil.getBytes(Base16.digest(bytes))), diff --git a/turms-server-common/src/main/resources/redis/session/try_add_online_user_with_ttl.lua b/turms-server-common/src/main/resources/redis/session/try_add_online_user_with_ttl.lua index 52d8e8790c..4d207b3626 100644 --- a/turms-server-common/src/main/resources/redis/session/try_add_online_user_with_ttl.lua +++ b/turms-server-common/src/main/resources/redis/session/try_add_online_user_with_ttl.lua @@ -21,11 +21,11 @@ local existing_status = values[2] local now if existing_node_id then if existing_node_id == node_id then - return false + return '0' end local existing_device_timestamp = tonumber(redis_call('HGET', user_id, existing_node_id)) if not existing_device_timestamp then - return false + return '0' end now = tonumber(redis_call('TIME')[1]) local diff = now - existing_device_timestamp @@ -36,7 +36,7 @@ if existing_node_id then or expected_device_timestamp == '' or expected_existing_node_id ~= existing_node_id or expected_device_timestamp ~= existing_device_timestamp) then - return false + return '0' end end @@ -85,4 +85,8 @@ if count - 7 > 0 then end end -return true \ No newline at end of file +if redis_call('HLEN', user_id) == 3 then + return '1' +else + return '2' +end \ No newline at end of file