Skip to content

Commit

Permalink
Reflect local relationship changes to local relationship-related caches
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesChenX committed Dec 25, 2023
1 parent 9db96f6 commit 4cd2b1f
Showing 1 changed file with 77 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package im.turms.service.domain.user.service;

import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -170,6 +171,7 @@ public Mono<DeleteResult> deleteOneSidedRelationships(
t);
return Mono.empty();
}))
.doOnSuccess(unused -> invalidateRelationshipCache(keys))
.thenReturn(result)))
.retryWhen(TRANSACTION_RETRY);
}
Expand Down Expand Up @@ -208,6 +210,7 @@ public Mono<Void> deleteOneSidedRelationship(
t);
return Mono.empty();
}))
.doOnSuccess(unused -> invalidateRelationshipCache(ownerId, relatedUserId))
.then();
}

Expand Down Expand Up @@ -240,13 +243,23 @@ public Mono<LongsWithVersion> queryRelatedUserIdsWithVersion(
Recyclable<Set<Long>> recyclableSet = SetRecycler.obtain();
return queryRelatedUserIds(Set.of(ownerId), groupIndexes, isBlocked)
.collect(Collectors.toCollection(recyclableSet::getValue))
.map(ids -> {
if (ids.isEmpty()) {
.map(relatedUserIds -> {
if (relatedUserIds.isEmpty()) {
throw ResponseException.get(ResponseStatusCode.NO_CONTENT);
}
if (isBlocked != null) {
Pair<Long, Long> cacheKey;
for (Long relatedUserId : relatedUserIds) {
cacheKey = Pair.of(ownerId, relatedUserId);
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache
.put(cacheKey, !isBlocked);
ownerIdAndRelatedUserIdToIsBlockedCache.put(cacheKey,
isBlocked);
}
}
return ClientMessagePool.getLongsWithVersionBuilder()
.setLastUpdatedDate(date.getTime())
.addAllLongs(ids)
.addAllLongs(relatedUserIds)
.build();
})
.doFinally(signalType -> recyclableSet.recycle());
Expand Down Expand Up @@ -276,7 +289,16 @@ public Mono<UserRelationshipsWithVersion> queryRelationshipsWithVersion(
UserRelationshipsWithVersion.Builder builder =
ClientMessagePool.getUserRelationshipsWithVersionBuilder()
.setLastUpdatedDate(date.getTime());
UserRelationship.Key relationshipKey;
Pair<Long, Long> cacheKey;
boolean blocked;
for (UserRelationship relationship : relationships) {
relationshipKey = relationship.getKey();
cacheKey = Pair.of(ownerId, relationshipKey.getRelatedUserId());
blocked = relationship.getBlockDate() != null;
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache
.put(cacheKey, !blocked);
ownerIdAndRelatedUserIdToIsBlockedCache.put(cacheKey, blocked);
builder.addUserRelationships(
ProtoModelConvertor.relationship2proto(relationship));
}
Expand Down Expand Up @@ -466,6 +488,14 @@ public Mono<Boolean> friendTwoUsers(
now,
true,
session))
.doOnSuccess(unused -> {
Pair<Long, Long> cacheKey = Pair.of(userOneId, userTwoId);
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache.put(cacheKey, true);
ownerIdAndRelatedUserIdToIsBlockedCache.put(cacheKey, false);
cacheKey = Pair.of(userTwoId, userOneId);
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache.put(cacheKey, true);
ownerIdAndRelatedUserIdToIsBlockedCache.put(cacheKey, false);
})
.then(PublisherPool.TRUE);
}

Expand Down Expand Up @@ -521,12 +551,19 @@ private Mono<Void> upsertOneSidedRelationship(
}
UserRelationship userRelationship =
new UserRelationship(ownerId, relatedUserId, blockDate, establishmentDate);
return upsert
return (upsert
? userRelationshipRepository.upsert(userRelationship, session)
: userRelationshipRepository.insert(userRelationship, session)
.onErrorMap(DuplicateKeyException.class,
e -> ResponseException
.get(ResponseStatusCode.CREATE_EXISTING_RELATIONSHIP));
.get(ResponseStatusCode.CREATE_EXISTING_RELATIONSHIP)))
.doOnSuccess(unused -> {
Pair<Long, Long> cacheKey = Pair.of(ownerId, relatedUserId);
boolean isBlocked = blockDate != null;
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache.put(cacheKey,
!isBlocked);
ownerIdAndRelatedUserIdToIsBlockedCache.put(cacheKey, isBlocked);
});
}

public Mono<Boolean> isBlocked(
Expand All @@ -539,15 +576,21 @@ public Mono<Boolean> isBlocked(
} catch (ResponseException e) {
return Mono.error(e);
}
Pair<Long, Long> key = Pair.of(ownerId, relatedUserId);
Pair<Long, Long> cacheKey = Pair.of(ownerId, relatedUserId);
if (preferCache) {
Boolean isBlocked = ownerIdAndRelatedUserIdToIsBlockedCache.getIfPresent(key);
Boolean isBlocked = ownerIdAndRelatedUserIdToIsBlockedCache.getIfPresent(cacheKey);
if (isBlocked != null) {
return Mono.just(isBlocked);
}
}
return userRelationshipRepository.isBlocked(ownerId, relatedUserId)
.doOnNext(isBlocked -> ownerIdAndRelatedUserIdToIsBlockedCache.put(key, isBlocked));
.doOnNext(isBlocked -> {
if (isBlocked) {
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache.put(cacheKey,
false);
}
ownerIdAndRelatedUserIdToIsBlockedCache.put(cacheKey, isBlocked);
});
}

public Mono<Boolean> hasNoRelationshipOrNotBlocked(
Expand Down Expand Up @@ -576,18 +619,23 @@ public Mono<Boolean> hasRelationshipAndNotBlocked(
} catch (ResponseException e) {
return Mono.error(e);
}
Pair<Long, Long> key = Pair.of(ownerId, relatedUserId);
Pair<Long, Long> cacheKey = Pair.of(ownerId, relatedUserId);
if (preferCache) {
Boolean hasRelationshipAndNotBlocked =
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache.getIfPresent(key);
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache
.getIfPresent(cacheKey);
if (hasRelationshipAndNotBlocked != null) {
return Mono.just(hasRelationshipAndNotBlocked);
}
}
return userRelationshipRepository.hasRelationshipAndNotBlocked(ownerId, relatedUserId)
.doOnNext(
hasRelationshipAndNotBlocked -> ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache
.put(key, hasRelationshipAndNotBlocked));
.doOnNext(hasRelationshipAndNotBlocked -> {
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache.put(cacheKey,
hasRelationshipAndNotBlocked);
if (hasRelationshipAndNotBlocked) {
ownerIdAndRelatedUserIdToIsBlockedCache.put(cacheKey, false);
}
});
}

public Mono<UpdateResult> updateUserOneSidedRelationships(
Expand Down Expand Up @@ -644,4 +692,20 @@ public Mono<Boolean> hasOneSidedRelationship(
return userRelationshipRepository.existsById(key);
}

// Cache

private void invalidateRelationshipCache(Collection<UserRelationship.Key> keys) {
for (UserRelationship.Key key : keys) {
Pair<Long, Long> pair = Pair.of(key.getOwnerId(), key.getRelatedUserId());
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache.invalidate(pair);
ownerIdAndRelatedUserIdToIsBlockedCache.invalidate(pair);
}
}

private void invalidateRelationshipCache(Long ownerId, Long relatedUserId) {
Pair<Long, Long> pair = Pair.of(ownerId, relatedUserId);
ownerIdAndRelatedUserIdToHasRelationshipAndNotBlockedCache.invalidate(pair);
ownerIdAndRelatedUserIdToIsBlockedCache.invalidate(pair);
}

}

0 comments on commit 4cd2b1f

Please sign in to comment.