Skip to content

Commit

Permalink
Refresh user sessions info actively in local turms-gateway #1265
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesChenX committed Oct 11, 2023
1 parent a0a49dc commit 46870a2
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ public class UserStatusService {

private static final long NODE_STATUS_TTL_MILLIS = 15_000L;

private final RedisScript addOnlineUserScript;
private final RedisScript getUsersDeviceDetailsScript =
private final RedisScript<ByteBuf> addOnlineUserScript;
private final RedisScript<List<Object>> getUsersDeviceDetailsScript =
RedisScript.get(new ClassPathResource("redis/session/get_users_device_details.lua"),
ScriptOutputType.MULTI);
private final RedisScript removeUserStatusesScript =
private final RedisScript<Boolean> removeUserStatusesScript =
RedisScript.get(new ClassPathResource("redis/session/remove_user_statuses.lua"),
ScriptOutputType.BOOLEAN);
private final RedisScript updateUsersTtlScript =
private final RedisScript<List<ByteBuf>> updateUsersTtlScript =
RedisScript.get(new ClassPathResource("redis/session/update_users_ttl.lua"),
ScriptOutputType.MULTI);
private final RedisScript updateOnlineUserStatusIfPresent = RedisScript.get(
private final RedisScript<Boolean> updateOnlineUserStatusIfPresent = RedisScript.get(
new ClassPathResource("redis/session/update_online_user_status_if_present.lua"),
ScriptOutputType.BOOLEAN);

Expand All @@ -112,7 +112,7 @@ public class UserStatusService {
* | | ... | ... |
* +-------------+-------------------------+-----------------------------------+
* </pre>
*
* <p>
* "$" is the fixed hash key of the user status value, and its value is the user status value
* represented in number.
* <p>
Expand Down Expand Up @@ -156,7 +156,7 @@ public UserStatusService(
deviceStatusTtlMillis = deviceStatusTtlSeconds * 1000L;
addOnlineUserScript = RedisScript.get(
new ClassPathResource("redis/session/try_add_online_user_with_ttl.lua"),
ScriptOutputType.BOOLEAN,
ScriptOutputType.VALUE,
Map.of("DEVICE_DETAILS_TTL",
deviceDetailsExpireAfterSeconds,
"DEVICE_STATUS_TTL",
Expand Down Expand Up @@ -251,11 +251,20 @@ public Mono<Boolean> updateOnlineUserStatusIfPresent(
} catch (ResponseException e) {
return Mono.error(e);
}
Mono<Boolean> result = sessionRedisClientManager.eval(userId,
updateOnlineUserStatusIfPresent,
userId,
(byte) userStatus.getNumber());
return result.timeout(operationTimeout, HashedWheelScheduler.getDaemon());
Mono<Boolean> mono = sessionRedisClientManager
.eval(userId,
updateOnlineUserStatusIfPresent,
userId,
(byte) userStatus.getNumber())
.timeout(operationTimeout, HashedWheelScheduler.getDaemon());
if (cacheUserSessionsStatus) {
return mono.doOnNext(exists -> {
if (!exists) {
userIdToStatusCache.invalidate(userId);
}
});
}
return mono;
}

public Mono<Set<Long>> updateOnlineUsersTtl(
Expand All @@ -279,13 +288,16 @@ public Mono<Set<Long>> updateOnlineUsersTtl(
}
int size = userIdGenerator.estimatedSize();
Set<Long> nonexistentUserIds = CollectionUtil.newSetWithExpectedSize(size);
for (Object value : objects) {
List<ByteBuf> buffers = (List<ByteBuf>) value;
for (List<ByteBuf> buffers : objects) {
if (buffers.size() == 1 && buffers.get(0) == null) {
return Collections.emptySet();
}
for (ByteBuf buffer : buffers) {
nonexistentUserIds.add(buffer.readLong());
Long userId = buffer.readLong();
nonexistentUserIds.add(userId);
if (cacheUserSessionsStatus) {
userIdToStatusCache.invalidate(userId);
}
}
}
return nonexistentUserIds;
Expand Down Expand Up @@ -527,8 +539,7 @@ public Mono<Map<Long, Map<String, String>>> fetchDeviceDetails(
.map(results -> {
Map<Long, Map<String, String>> userIdToDetails =
CollectionUtil.newMapWithExpectedSize(userIds.size());
for (Object rawElements : results) {
List<Object> elements = (List<Object>) rawElements;
for (List<Object> elements : results) {
int elementCount = elements.size();
if (elementCount == 0) {
continue;
Expand Down Expand Up @@ -590,6 +601,9 @@ public Mono<Boolean> removeStatusByUserIdAndDeviceTypes(
return Mono.error(new InputOutputException("Failed to encode arguments", e));
}
Mono<Boolean> mono = sessionRedisClientManager.eval(userId, removeUserStatusesScript, args);
if (cacheUserSessionsStatus) {
mono = mono.doOnSuccess(ignored -> userIdToStatusCache.invalidate(userId));
}
return mono.timeout(operationTimeout, HashedWheelScheduler.getDaemon());
}

Expand Down Expand Up @@ -678,7 +692,34 @@ public Mono<Boolean> addOnlineDeviceIfAbsent(
ReferenceCountUtil.ensureReleased(args, 0, index);
return Mono.error(new InputOutputException("Failed to encode arguments", e));
}
return sessionRedisClientManager.eval(userId, addOnlineUserScript, args);
return sessionRedisClientManager.eval(userId, addOnlineUserScript, args)
.map(buffer -> {
byte returnCode = buffer.readByte();
return switch (returnCode) {
case '0' -> false;
case '1' -> {
if (cacheUserSessionsStatus) {
Map<DeviceType, UserDeviceSessionInfo> deviceTypeToSessions =
new FastEnumMap<>(DeviceType.class);
deviceTypeToSessions.put(deviceType,
new UserDeviceSessionInfo(
this.localNodeId,
System.currentTimeMillis(),
true));
userIdToStatusCache.put(userId,
new UserSessionsStatus(
userId,
userStatus,
deviceTypeToSessions));
}
yield true;
}
case '2' -> true;
default -> throw new RuntimeException(
"Unexpected return code: "
+ returnCode);
};
});
}

private record NodeStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,14 @@ public Mono<Long> georem(Object key, Object... members) {

// Scripting

public <T> Mono<T> eval(RedisScript script, ByteBuf... keys) {
public <T> Mono<T> eval(RedisScript<T> script, ByteBuf... keys) {
return eval(script, keys.length, keys);
}

/**
* @param keyLength the real key length
*/
public <T> Mono<T> eval(RedisScript script, int keyLength, ByteBuf... keys) {
public <T> Mono<T> eval(RedisScript<T> script, int keyLength, ByteBuf... keys) {
return Mono.defer(() -> {
ByteBuf key;
for (int i = 0, length = keys.length; i < length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ public Mono<Long> georem(Long shardKey, Object key, Object... members) {

// Scripting

public <T> Mono<T> eval(Long shardKey, RedisScript script, Object... keys) {
public <T> Mono<T> eval(Long shardKey, RedisScript<T> script, Object... keys) {
ByteBuf[] buffers = ByteBufUtil.writeObjects(keys);
return getClient(shardKey).eval(script, buffers);
}

public <T> Mono<T> eval(Long shardKey, RedisScript script, ByteBuf[] buffers) {
public <T> Mono<T> eval(Long shardKey, RedisScript<T> script, ByteBuf[] buffers) {
return getClient(shardKey).eval(script, buffers);
}

Expand All @@ -165,7 +165,7 @@ public <T> Mono<T> eval(Long shardKey, RedisScript script, ByteBuf[] buffers) {
* will throw
*/
public <T> Flux<T> eval(
RedisScript script,
RedisScript<T> script,
short firstKey,
byte[] secondKey,
LongKeyGenerator keyGenerator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* @author James Chen
*/
public record RedisScript(
public record RedisScript<T>(
ByteBuf script,
ByteBuf digest,
ScriptOutputType outputType
Expand All @@ -49,14 +49,14 @@ public record RedisScript(
/**
* @param outputType {@link BaseRedisCommandBuilder#newScriptOutput}
*/
public static RedisScript get(ClassPathResource resource, ScriptOutputType outputType) {
public static <T> RedisScript<T> get(ClassPathResource resource, ScriptOutputType outputType) {
return get(resource, outputType, null);
}

/**
* @param outputType {@link BaseRedisCommandBuilder#newScriptOutput}
*/
public static RedisScript get(
public static <T> RedisScript<T> get(
ClassPathResource resource,
ScriptOutputType outputType,
@Nullable Map<String, Object> placeholders) {
Expand All @@ -81,7 +81,7 @@ public static RedisScript get(
+ bytes.length;
throw new InputOutputException(message);
}
return new RedisScript(
return new RedisScript<>(
ByteBufUtil.getUnreleasableDirectBuffer(bytes),
ByteBufUtil
.getUnreleasableDirectBuffer(StringUtil.getBytes(Base16.digest(bytes))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ local existing_status = values[2]
local now
if existing_node_id then
if existing_node_id == node_id then
return false
return '0'
end
local existing_device_timestamp = tonumber(redis_call('HGET', user_id, existing_node_id))
if not existing_device_timestamp then
return false
return '0'
end
now = tonumber(redis_call('TIME')[1])
local diff = now - existing_device_timestamp
Expand All @@ -36,7 +36,7 @@ if existing_node_id then
or expected_device_timestamp == ''
or expected_existing_node_id ~= existing_node_id
or expected_device_timestamp ~= existing_device_timestamp) then
return false
return '0'
end
end

Expand Down Expand Up @@ -85,4 +85,8 @@ if count - 7 > 0 then
end
end

return true
if redis_call('HLEN', user_id) == 3 then
return '1'
else
return '2'
end

0 comments on commit 46870a2

Please sign in to comment.