Skip to content

Commit

Permalink
Add a new RPC request CreateUserRequest #1445 + Organize the RPC cl…
Browse files Browse the repository at this point in the history
…asses under domains
  • Loading branch information
JamesChenX committed Apr 29, 2024
1 parent 5905009 commit 18d2cad
Show file tree
Hide file tree
Showing 48 changed files with 385 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import im.turms.gateway.infra.proto.TurmsNotificationParser;
import im.turms.server.common.access.client.dto.constant.DeviceType;
import im.turms.server.common.access.client.dto.notification.TurmsNotification;
import im.turms.server.common.domain.notification.service.INotificationService;
import im.turms.server.common.domain.notification.rpc.service.RpcNotificationService;
import im.turms.server.common.domain.session.bo.UserSessionId;
import im.turms.server.common.infra.collection.CollectionUtil;
import im.turms.server.common.infra.exception.IncompatibleInternalChangeException;
Expand All @@ -56,7 +56,7 @@
* @author James Chen
*/
@Component
public class NotificationService implements INotificationService {
public class NotificationService implements RpcNotificationService {

private static final Logger LOGGER = LoggerFactory.getLogger(NotificationService.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import org.springframework.stereotype.Service;

import im.turms.gateway.domain.session.service.SessionService;
import im.turms.server.common.domain.observation.service.IStatisticsService;
import im.turms.server.common.domain.observation.rpc.service.RpcStatisticsService;

/**
* @author James Chen
*/
@Service
public class StatisticsService implements IStatisticsService {
public class StatisticsService implements RpcStatisticsService {

private final SessionService sessionService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
import im.turms.server.common.domain.session.bo.UserSessionInfo;
import im.turms.server.common.domain.session.bo.UserSessionsInfo;
import im.turms.server.common.domain.session.bo.UserSessionsStatus;
import im.turms.server.common.domain.session.rpc.SetUserOfflineRequest;
import im.turms.server.common.domain.session.service.ISessionService;
import im.turms.server.common.domain.session.rpc.dto.SetUserOfflineRequest;
import im.turms.server.common.domain.session.rpc.service.RpcSessionService;
import im.turms.server.common.domain.session.service.SessionLocationService;
import im.turms.server.common.domain.session.service.UserStatusService;
import im.turms.server.common.infra.cluster.node.Node;
Expand Down Expand Up @@ -91,7 +91,7 @@
* @author James Chen
*/
@Service
public class SessionService implements ISessionService {
public class SessionService implements RpcSessionService {

private static final Logger LOGGER = LoggerFactory.getLogger(SessionService.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import im.turms.server.common.domain.blocklist.manager.BlocklistServiceManager;
import im.turms.server.common.domain.session.bo.CloseReason;
import im.turms.server.common.domain.session.bo.SessionCloseStatus;
import im.turms.server.common.domain.session.service.ISessionService;
import im.turms.server.common.domain.session.rpc.service.RpcSessionService;
import im.turms.server.common.infra.cluster.node.Node;
import im.turms.server.common.infra.cluster.node.NodeType;
import im.turms.server.common.infra.collection.ChunkedArrayList;
Expand Down Expand Up @@ -93,7 +93,7 @@ public BlocklistService(
TurmsRedisClient ipBlocklistRedisClient,
TurmsRedisClient userIdBlocklistRedisClient,
TurmsPropertiesManager propertiesManager,
@Autowired(required = false) ISessionService sessionService) {
@Autowired(required = false) RpcSessionService sessionService) {
BlocklistProperties blocklistProperties = propertiesManager.getLocalProperties()
.getSecurity()
.getBlocklist();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.notification.rpc;
package im.turms.server.common.domain.notification.rpc.dto;

import java.util.Set;
import jakarta.annotation.Nullable;
Expand All @@ -28,7 +28,7 @@
import reactor.core.publisher.Mono;

import im.turms.server.common.access.client.dto.constant.DeviceType;
import im.turms.server.common.domain.notification.service.INotificationService;
import im.turms.server.common.domain.notification.rpc.service.RpcNotificationService;
import im.turms.server.common.domain.session.bo.UserSessionId;
import im.turms.server.common.infra.cluster.service.rpc.NodeTypeToHandleRpc;
import im.turms.server.common.infra.cluster.service.rpc.dto.RpcRequest;
Expand All @@ -40,7 +40,7 @@
public class SendNotificationRequest extends RpcRequest<Set<Long>> {

private static final String NAME = "sendNotification";
private static INotificationService notificationService;
private static RpcNotificationService notificationService;

private final ByteBuf notificationBuffer;
private final Set<Long> recipientIds;
Expand Down Expand Up @@ -87,7 +87,7 @@ public boolean isAsync() {
public void setApplicationContext(ApplicationContext applicationContext) {
super.setApplicationContext(applicationContext);
if (notificationService == null) {
notificationService = getBean(INotificationService.class);
notificationService = getBean(RpcNotificationService.class);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.notification.rpc;
package im.turms.server.common.domain.notification.rpc.dto;

import java.util.Collections;
import java.util.Set;
Expand Down Expand Up @@ -115,4 +115,4 @@ public ByteBuf byteBufToComposite(SendNotificationRequest data) {
return data.getNotificationBuffer();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.notification.service;
package im.turms.server.common.domain.notification.rpc.service;

import java.util.Set;
import jakarta.annotation.Nullable;
Expand All @@ -32,7 +32,7 @@
/**
* @author James Chen
*/
public interface INotificationService {
public interface RpcNotificationService {

Mono<Set<Long>> sendNotificationToLocalClients(
@NotNull TracingContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package im.turms.server.common.domain.observation.rpc;
package im.turms.server.common.domain.observation.rpc.dto;

import lombok.Data;
import org.springframework.context.ApplicationContext;

import im.turms.server.common.domain.observation.service.IStatisticsService;
import im.turms.server.common.domain.observation.rpc.service.RpcStatisticsService;
import im.turms.server.common.infra.cluster.service.rpc.NodeTypeToHandleRpc;
import im.turms.server.common.infra.cluster.service.rpc.dto.RpcRequest;

Expand All @@ -31,7 +31,7 @@
public class CountOnlineUsersRequest extends RpcRequest<Integer> {

private static final String NAME = "countOnlineUsers";
private static IStatisticsService statisticsService;
private static RpcStatisticsService statisticsService;

@Override
public String name() {
Expand All @@ -57,7 +57,7 @@ public boolean isAsync() {
public void setApplicationContext(ApplicationContext applicationContext) {
super.setApplicationContext(applicationContext);
if (statisticsService == null) {
statisticsService = getBean(IStatisticsService.class);
statisticsService = getBean(RpcStatisticsService.class);
}
}

Expand All @@ -66,4 +66,4 @@ public Integer call() {
return statisticsService.countLocalOnlineUsers();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.observation.rpc;
package im.turms.server.common.domain.observation.rpc.dto;

import im.turms.server.common.infra.cluster.service.codec.codec.CodecId;
import im.turms.server.common.infra.cluster.service.codec.io.CodecStreamInput;
Expand All @@ -41,4 +41,4 @@ public CountOnlineUsersRequest readRequestData(CodecStreamInput input) {
return new CountOnlineUsersRequest();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.observation.service;
package im.turms.server.common.domain.observation.rpc.service;

import java.util.Map;

Expand All @@ -24,7 +24,7 @@
/**
* @author James Chen
*/
public interface IStatisticsService {
public interface RpcStatisticsService {

default Mono<Map<String, Integer>> countOnlineUsersByNodes() {
return Mono.empty();
Expand All @@ -34,4 +34,4 @@ default int countLocalOnlineUsers() {
return 0;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.session.rpc;
package im.turms.server.common.domain.session.rpc.dto;

import java.util.List;
import java.util.Set;
Expand All @@ -24,7 +24,7 @@
import org.springframework.context.ApplicationContext;

import im.turms.server.common.domain.session.bo.UserSessionsInfo;
import im.turms.server.common.domain.session.service.ISessionService;
import im.turms.server.common.domain.session.rpc.service.RpcSessionService;
import im.turms.server.common.infra.cluster.service.rpc.NodeTypeToHandleRpc;
import im.turms.server.common.infra.cluster.service.rpc.dto.RpcRequest;

Expand All @@ -34,7 +34,7 @@
@Data
public class QueryUserSessionsRequest extends RpcRequest<List<UserSessionsInfo>> {

private static ISessionService sessionService;
private static RpcSessionService sessionService;

private final Set<Long> userIds;

Expand All @@ -61,7 +61,7 @@ public NodeTypeToHandleRpc nodeTypeToRespond() {
public void setApplicationContext(ApplicationContext applicationContext) {
super.setApplicationContext(applicationContext);
if (sessionService == null) {
sessionService = getBean(ISessionService.class);
sessionService = getBean(RpcSessionService.class);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.session.rpc;
package im.turms.server.common.domain.session.rpc.dto;

import im.turms.server.common.infra.cluster.service.codec.codec.CodecId;
import im.turms.server.common.infra.cluster.service.codec.io.CodecStreamInput;
Expand Down Expand Up @@ -50,4 +50,4 @@ public int initialCapacityForRequest(QueryUserSessionsRequest data) {
return Stream.computeVarint32Size(size) + size * Long.BYTES;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.session.rpc;
package im.turms.server.common.domain.session.rpc.dto;

import java.util.Set;
import jakarta.annotation.Nullable;
Expand All @@ -28,7 +28,7 @@
import im.turms.server.common.access.client.dto.constant.DeviceType;
import im.turms.server.common.domain.session.bo.CloseReason;
import im.turms.server.common.domain.session.bo.SessionCloseStatus;
import im.turms.server.common.domain.session.service.ISessionService;
import im.turms.server.common.domain.session.rpc.service.RpcSessionService;
import im.turms.server.common.infra.cluster.service.rpc.NodeTypeToHandleRpc;
import im.turms.server.common.infra.cluster.service.rpc.dto.RpcRequest;

Expand All @@ -39,7 +39,7 @@
public class SetUserOfflineRequest extends RpcRequest<Boolean> {

private static final String NAME = "setUserOffline";
private static ISessionService sessionService;
private static RpcSessionService sessionService;

private final Long userId;
private final Set<DeviceType> deviceTypes;
Expand Down Expand Up @@ -73,7 +73,7 @@ public NodeTypeToHandleRpc nodeTypeToRespond() {
public void setApplicationContext(ApplicationContext applicationContext) {
super.setApplicationContext(applicationContext);
if (sessionService == null) {
sessionService = getBean(ISessionService.class);
sessionService = getBean(RpcSessionService.class);
}
}

Expand All @@ -89,4 +89,4 @@ public Mono<Boolean> callAsync() {
return mono.map(count -> count > 0);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.session.rpc;
package im.turms.server.common.domain.session.rpc.dto;

import java.util.Set;

Expand Down Expand Up @@ -72,4 +72,4 @@ public int initialCapacityForRequest(SetUserOfflineRequest data) {
return Long.BYTES + Short.BYTES + capacityForDeviceTypes;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.session.rpc;
package im.turms.server.common.domain.session.rpc.dto;

import java.util.ArrayList;
import java.util.Date;
Expand Down Expand Up @@ -132,4 +132,4 @@ public int initialCapacity(UserSessionsInfo data) {
return Long.BYTES + Byte.BYTES + Stream.computeVarint32Size(size) + 64 * size;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package im.turms.server.common.domain.session.service;
package im.turms.server.common.domain.session.rpc.service;

import java.util.List;
import java.util.Set;
Expand All @@ -32,7 +32,7 @@
/**
* @author James Chen
*/
public interface ISessionService {
public interface RpcSessionService {

Mono<Integer> closeLocalSessions(@NotNull byte[] ip, @NotNull CloseReason closeReason);

Expand Down
Loading

0 comments on commit 18d2cad

Please sign in to comment.