Skip to content

Commit

Permalink
Rename the client driver service (Driver)MessageService to `Protoco…
Browse files Browse the repository at this point in the history
…lMessageService` to be distinguished from the business service `MessageService`
  • Loading branch information
JamesChenX committed Sep 15, 2024
1 parent c0070ee commit 0848fda
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef TURMS_CLIENT_DRIVER_SERVICE_MESSAGE_SERVICE_H
#define TURMS_CLIENT_DRIVER_SERVICE_MESSAGE_SERVICE_H
#ifndef TURMS_CLIENT_DRIVER_SERVICE_PROTOCOL_MESSAGE_SERVICE_H
#define TURMS_CLIENT_DRIVER_SERVICE_PROTOCOL_MESSAGE_SERVICE_H

#include <boost/thread/future.hpp>
#include <random>
Expand All @@ -17,7 +17,7 @@ namespace client {
namespace driver {
namespace service {

class MessageService : public BaseService, private std::enable_shared_from_this<MessageService> {
class ProtocolMessageService : public BaseService, private std::enable_shared_from_this<ProtocolMessageService> {
private:
using ResponseException = exception::ResponseException;
using TurmsNotification = model::proto::TurmsNotification;
Expand All @@ -26,7 +26,7 @@ class MessageService : public BaseService, private std::enable_shared_from_this<
public:
using NotificationHandler = std::function<void(const TurmsNotification&)>;

MessageService(boost::asio::io_context& ioContext,
ProtocolMessageService(boost::asio::io_context& ioContext,
StateStore& stateStore,
const boost::optional<int>& requestTimeout,
const boost::optional<int>& minRequestInterval);
Expand Down Expand Up @@ -74,4 +74,4 @@ class MessageService : public BaseService, private std::enable_shared_from_this<
} // namespace client
} // namespace turms

#endif // TURMS_CLIENT_DRIVER_SERVICE_MESSAGE_SERVICE_H
#endif // TURMS_CLIENT_DRIVER_SERVICE_PROTOCOL_MESSAGE_SERVICE_H
10 changes: 5 additions & 5 deletions turms-client-cpp/include/turms/client/driver/turms_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include "turms/client/driver/service/connection_service.h"
#include "turms/client/driver/service/heartbeat_service.h"
#include "turms/client/driver/service/message_service.h"
#include "turms/client/driver/service/protocol_message_service.h"
#include "turms/client/driver/state_store.h"
#include "turms/client/model/proto/notification/turms_notification.pb.h"
#include "turms/client/transport/tcp_metrics.h"
Expand All @@ -21,10 +21,10 @@ class TurmsDriver : private boost::noncopyable, private std::enable_shared_from_
private:
using TurmsNotification = model::proto::TurmsNotification;
using TurmsRequest = model::proto::TurmsRequest;
using StateStore = driver::StateStore;
using StateStore = StateStore;
using ConnectionService = service::ConnectionService;
using HeartbeatService = service::HeartbeatService;
using MessageService = service::MessageService;
using ProtocolMessageService = service::ProtocolMessageService;
using TcpMetrics = transport::TcpMetrics;

public:
Expand Down Expand Up @@ -80,7 +80,7 @@ class TurmsDriver : private boost::noncopyable, private std::enable_shared_from_

template <typename T>
auto addNotificationListener(T&& listener) -> void {
messageService_.addNotificationListener(std::forward<T>(listener));
protocolMessageService_.addNotificationListener(std::forward<T>(listener));
}

// State
Expand All @@ -91,7 +91,7 @@ class TurmsDriver : private boost::noncopyable, private std::enable_shared_from_
std::shared_ptr<boost::asio::io_context> ioContext_;
ConnectionService connectionService_;
HeartbeatService heartbeatService_;
MessageService messageService_;
ProtocolMessageService protocolMessageService_;

// Intermediary functions as a mediator between services
auto onConnectionDisconnected(const boost::optional<std::exception>& e = boost::none) -> void;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#include "turms/client/driver/service/message_service.h"
#include "turms/client/driver/service/protocol_message_service.h"

namespace turms {
namespace client {
namespace driver {
namespace service {
MessageService::MessageService(boost::asio::io_context& ioContext,
ProtocolMessageService::ProtocolMessageService(boost::asio::io_context& ioContext,
StateStore& stateStore,
const boost::optional<int>& requestTimeout,
const boost::optional<int>& minRequestInterval)
Expand All @@ -14,7 +14,7 @@ MessageService::MessageService(boost::asio::io_context& ioContext,
minRequestInterval_(minRequestInterval.get_value_or(0)) {
}

auto MessageService::sendRequest(model::proto::TurmsRequest& request)
auto ProtocolMessageService::sendRequest(model::proto::TurmsRequest& request)
-> boost::future<TurmsNotification> {
if (request.has_create_session_request()) {
if (stateStore_.isSessionOpen) {
Expand Down Expand Up @@ -58,7 +58,7 @@ auto MessageService::sendRequest(model::proto::TurmsRequest& request)

if (enableRequestTimeout_) {
boost::asio::steady_timer timer{ioContext_, requestTimeout_};
timer.async_wait([weakThis = std::weak_ptr<MessageService>(shared_from_this()),
timer.async_wait([weakThis = std::weak_ptr<ProtocolMessageService>(shared_from_this()),
requestId](const boost::system::error_code& e) {
if (e == boost::asio::error::operation_aborted) {
return;
Expand All @@ -82,7 +82,7 @@ auto MessageService::sendRequest(model::proto::TurmsRequest& request)
}
}

auto MessageService::didReceiveNotification(const MessageService::TurmsNotification& notification)
auto ProtocolMessageService::didReceiveNotification(const ProtocolMessageService::TurmsNotification& notification)
-> void {
const bool isResponse = !notification.has_relayed_request() && notification.has_request_id();
if (isResponse) {
Expand Down Expand Up @@ -110,32 +110,32 @@ auto MessageService::didReceiveNotification(const MessageService::TurmsNotificat
notifyNotificationListeners(notification);
}

auto MessageService::close() -> boost::future<void> {
auto ProtocolMessageService::close() -> boost::future<void> {
onDisconnected(boost::none);
return boost::make_ready_future();
}

auto MessageService::onDisconnected(const boost::optional<std::exception>& exception) -> void {
auto ProtocolMessageService::onDisconnected(const boost::optional<std::exception>& exception) -> void {
rejectRequestPromises(exception::ResponseException{
model::ResponseStatusCode::kClientSessionHasBeenClosed, "", exception});
}

auto MessageService::generateRandomId() const -> int64_t {
auto ProtocolMessageService::generateRandomId() const -> int64_t {
int64_t id;
do {
id = random::nextPositiveInt64();
} while (idToRequest_.count(id) > 0);
return id;
}

auto MessageService::notifyNotificationListeners(
const MessageService::TurmsNotification& notification) const -> void {
auto ProtocolMessageService::notifyNotificationListeners(
const TurmsNotification& notification) const -> void {
for (const auto& listener : notificationListeners_) {
listener(notification);
}
}

auto MessageService::rejectRequestPromises(const std::exception& exception) -> void {
auto ProtocolMessageService::rejectRequestPromises(const std::exception& exception) -> void {
for (auto it = idToRequest_.begin(); it != idToRequest_.end();) {
it = idToRequest_.erase(it);
TurmsRequestContext& context = it->second;
Expand Down
12 changes: 6 additions & 6 deletions turms-client-cpp/src/turms/client/driver/turms_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ TurmsDriver::TurmsDriver(const std::shared_ptr<boost::asio::io_context>& ioConte
: ioContext_(ioContext),
connectionService_(*ioContext, stateStore_, host, port, connectTimeoutMillis),
heartbeatService_(*ioContext, stateStore_, heartbeatIntervalMillis),
messageService_(*ioContext, stateStore_, requestTimeoutMillis, minRequestIntervalMillis) {
protocolMessageService_(*ioContext, stateStore_, requestTimeoutMillis, minRequestIntervalMillis) {
connectionService_.addOnDisconnectedListener([this](const boost::optional<std::exception>& e) {
onConnectionDisconnected(e);
});
Expand All @@ -36,7 +36,7 @@ auto TurmsDriver::close() -> boost::future<void> {
promise->set_value();
}
});
messageService_.close().then([count, promise](const boost::future<void>&) mutable {
protocolMessageService_.close().then([count, promise](const boost::future<void>&) mutable {
if (--(*count) == 0) {
promise->set_value();
}
Expand Down Expand Up @@ -82,7 +82,7 @@ auto TurmsDriver::connectionMetrics() const -> boost::optional<TcpMetrics> {

auto TurmsDriver::send(TurmsRequest& request) -> boost::future<TurmsNotification> {
const bool isCreateSessionRequest = request.has_create_session_request();
return messageService_.sendRequest(request).then(
return protocolMessageService_.sendRequest(request).then(
[weakThis = std::weak_ptr<TurmsDriver>(shared_from_this()),
isCreateSessionRequest](boost::future<TurmsNotification> response) {
if (auto sharedThis = weakThis.lock()) {
Expand All @@ -101,7 +101,7 @@ auto TurmsDriver::stateStore() -> StateStore& {
auto TurmsDriver::onConnectionDisconnected(const boost::optional<std::exception>& e) -> void {
stateStore_.reset();
heartbeatService_.onDisconnected(e);
messageService_.onDisconnected(e);
protocolMessageService_.onDisconnected(e);
}

auto TurmsDriver::onMessage(const std::vector<uint8_t>& message) -> void {
Expand All @@ -120,7 +120,7 @@ auto TurmsDriver::onMessage(const std::vector<uint8_t>& message) -> void {
}
if (notification.has_close_status()) {
stateStore_.isSessionOpen = false;
messageService_.didReceiveNotification(notification);
protocolMessageService_.didReceiveNotification(notification);
// We must close the connection after finishing handling the notification
// to ensure notification handlers will always be triggered before connection close
// handlers.
Expand All @@ -132,7 +132,7 @@ auto TurmsDriver::onMessage(const std::vector<uint8_t>& message) -> void {
stateStore_.sessionId = session.session_id();
stateStore_.serverId = session.server_id();
}
messageService_.didReceiveNotification(notification);
protocolMessageService_.didReceiveNotification(notification);
}
}
} // namespace driver
Expand Down
12 changes: 6 additions & 6 deletions turms-client-cpp/src/turms/client/turms_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,27 @@ auto TurmsClient::close() -> boost::future<void> {
return driver_.close();
}

auto TurmsClient::driver() noexcept -> TurmsClient::TurmsDriver& {
auto TurmsClient::driver() noexcept -> TurmsDriver& {
return driver_;
}

auto TurmsClient::userService() noexcept -> TurmsClient::UserService& {
auto TurmsClient::userService() noexcept -> UserService& {
return userService_;
}

auto TurmsClient::groupService() noexcept -> TurmsClient::GroupService& {
auto TurmsClient::groupService() noexcept -> GroupService& {
return groupService_;
}

auto TurmsClient::conversationService() noexcept -> TurmsClient::ConversationService& {
auto TurmsClient::conversationService() noexcept -> ConversationService& {
return conversationService_;
}

auto TurmsClient::messageService() noexcept -> TurmsClient::MessageService& {
auto TurmsClient::messageService() noexcept -> MessageService& {
return messageService_;
}

auto TurmsClient::notificationService() noexcept -> TurmsClient::NotificationService& {
auto TurmsClient::notificationService() noexcept -> NotificationService& {
return notificationService_;
}
} // namespace client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TurmsRequestContext {
TurmsRequestContext(this.completer, this.timeoutTimer);
}

class DriverMessageService extends BaseService {
class ProtocolMessageService extends BaseService {
static const int randomMax = 1 << 32;

final Random _random = Random();
Expand All @@ -27,7 +27,7 @@ class DriverMessageService extends BaseService {
final List<NotificationListener> _notificationListeners = [];
final Map<int, TurmsRequestContext> _idToRequest = {};

DriverMessageService(super.stateStore, int? requestTimeoutMillis,
ProtocolMessageService(super.stateStore, int? requestTimeoutMillis,
int? minRequestIntervalMillis) {
_requestTimeoutMillis =
requestTimeoutMillis == null || requestTimeoutMillis <= 0
Expand Down
21 changes: 11 additions & 10 deletions turms-client-dart/lib/src/driver/turms_driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import '../model/proto/request/turms_request.pb.dart';
import '../transport/tcp_metrics.dart';
import 'service/connection_service.dart';
import 'service/heartbeat_service.dart';
import 'service/message_service.dart';
import 'service/protocol_message_service.dart';
import 'state_store.dart';

final _requestMessageNameToTagNumber = <String, int>{};
Expand All @@ -17,7 +17,7 @@ class TurmsDriver {

late final ConnectionService _connectionService;
late final HeartbeatService _heartbeatService;
late final DriverMessageService _messageService;
late final ProtocolMessageService _protocolMessageService;

TurmsDriver(
String? host,
Expand All @@ -31,7 +31,7 @@ class TurmsDriver {
..addOnDisconnectedListener(_onConnectionDisconnected)
..addMessageListener(_onMessage);
_heartbeatService = HeartbeatService(_stateStore, heartbeatIntervalMillis);
_messageService = DriverMessageService(
_protocolMessageService = ProtocolMessageService(
_stateStore, requestTimeoutMillis, minRequestIntervalMillis);
}

Expand All @@ -44,7 +44,7 @@ class TurmsDriver {
Future<void> close() => Future.wait([
_connectionService.close(),
_heartbeatService.close(),
_messageService.close()
_protocolMessageService.close()
]);

// Heartbeat Service
Expand Down Expand Up @@ -110,25 +110,26 @@ class TurmsDriver {
_requestMessageNameToTagNumber[name] = tagNumber;
}
final request = TurmsRequest.create()..setField(tagNumber, message);
final notification = await _messageService.sendRequest(request);
final notification = await _protocolMessageService.sendRequest(request);
if (request.hasCreateSessionRequest()) {
_heartbeatService.start();
}
return notification;
}

void addNotificationListener(NotificationListener listener) =>
_messageService.addNotificationListener(listener);
_protocolMessageService.addNotificationListener(listener);

void removeNotificationListener(NotificationListener listener) =>
_messageService.removeNotificationListener(listener);
_protocolMessageService.removeNotificationListener(listener);

// Intermediary functions as a mediator between services

void _onConnectionDisconnected({Object? error, StackTrace? stackTrace}) {
_stateStore.reset();
_heartbeatService.onDisconnected(error: error, stackTrace: stackTrace);
_messageService.onDisconnected(error: error, stackTrace: stackTrace);
_protocolMessageService.onDisconnected(
error: error, stackTrace: stackTrace);
}

void _onMessage(List<int> message) {
Expand All @@ -145,7 +146,7 @@ class TurmsDriver {
}
if (notification.hasCloseStatus()) {
_stateStore.isSessionOpen = false;
_messageService.didReceiveNotification(notification);
_protocolMessageService.didReceiveNotification(notification);
// We must close the connection after finishing handling the notification
// to ensure notification handlers will always be triggered before connection close handlers.
_connectionService.disconnect();
Expand All @@ -157,7 +158,7 @@ class TurmsDriver {
..sessionId = session.sessionId
..serverId = session.serverId;
}
_messageService.didReceiveNotification(notification);
_protocolMessageService.didReceiveNotification(notification);
} else {
_heartbeatService.resolveHeartbeatCompleters();
}
Expand Down
3 changes: 2 additions & 1 deletion turms-client-dart/lib/turms_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ library turms_client_dart;
export 'src/driver/service/base_service.dart' show BaseService;
export 'src/driver/service/connection_service.dart' show ConnectionService;
export 'src/driver/service/heartbeat_service.dart' show HeartbeatService;
export 'src/driver/service/message_service.dart' show DriverMessageService;
export 'src/driver/service/protocol_message_service.dart'
show ProtocolMessageService;
export 'src/driver/state_store.dart' show StateStore;
export 'src/driver/turms_driver.dart' show TurmsDriver;
export 'src/exception/response_exception.dart' show ResponseException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ interface RequestPromiseSeal {
/**
* Handle TurmsRequest and TurnsNotification
*/
export default class MessageService extends BaseService {
export default class ProtocolMessageService extends BaseService {

private readonly _requestTimeout: number;
private readonly _minRequestInterval: number;
Expand Down
Loading

0 comments on commit 0848fda

Please sign in to comment.