-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmprpc_echo_server.cpp
133 lines (98 loc) · 3.74 KB
/
mprpc_echo_server.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#include "solid/frame/manager.hpp"
#include "solid/frame/scheduler.hpp"
#include "solid/frame/service.hpp"
#include "solid/frame/aio/aioresolver.hpp"
#include "solid/frame/mprpc/mprpcconfiguration.hpp"
#include "solid/frame/mprpc/mprpcservice.hpp"
#include "mprpc_echo_messages.hpp"
#include <iostream>
using namespace solid;
using namespace std;
using AioSchedulerT = frame::Scheduler<frame::aio::Reactor<frame::mprpc::EventT>>;
//-----------------------------------------------------------------------------
// Parameters
//-----------------------------------------------------------------------------
struct Parameters {
Parameters()
: listener_port("0")
, listener_addr("0.0.0.0")
{
}
string listener_port;
string listener_addr;
};
//-----------------------------------------------------------------------------
namespace rpc_echo_server {
template <class M>
void complete_message(
frame::mprpc::ConnectionContext& _rctx,
frame::mprpc::MessagePointerT<M>& _rsent_msg_ptr,
frame::mprpc::MessagePointerT<M>& _rrecv_msg_ptr,
ErrorConditionT const& _rerror)
{
solid_check(!_rerror);
if (_rrecv_msg_ptr) {
solid_check(!_rsent_msg_ptr);
solid_check(!_rctx.service().sendResponse(_rctx.recipientId(), std::move(_rrecv_msg_ptr)));
}
if (_rsent_msg_ptr) {
solid_check(!_rrecv_msg_ptr);
}
}
} // namespace rpc_echo_server
//-----------------------------------------------------------------------------
bool parseArguments(Parameters& _par, int argc, char* argv[]);
//-----------------------------------------------------------------------------
// main
//-----------------------------------------------------------------------------
int main(int argc, char* argv[])
{
Parameters p;
if (!parseArguments(p, argc, argv))
return 0;
{
AioSchedulerT scheduler;
frame::Manager manager;
frame::mprpc::ServiceT rpcservice(manager);
ErrorConditionT err;
scheduler.start(1);
{
auto proto = frame::mprpc::serialization_v3::create_protocol<reflection::v1::metadata::Variant, uint8_t>(
reflection::v1::metadata::factory,
[&](auto& _rmap) {
auto lambda = [&]<typename T>(const uint8_t _id, const std::string_view _name, type_identity<T> const& _rtype) {
_rmap.template registerMessage<T>(_id, _name, rpc_echo_server::complete_message<T>);
};
rpc_echo::configure_protocol(lambda);
});
frame::mprpc::Configuration cfg(scheduler, proto);
cfg.server.listener_address_str = p.listener_addr;
cfg.server.listener_address_str += ':';
cfg.server.listener_address_str += p.listener_port;
cfg.server.connection_start_state = frame::mprpc::ConnectionState::Active;
{
frame::mprpc::ServiceStartStatus start_status;
rpcservice.start(start_status, std::move(cfg));
cout << "server listens on: " << start_status.listen_addr_vec_.back() << endl;
}
}
cout << "Press ENTER to stop: ";
cin.ignore();
}
return 0;
}
//-----------------------------------------------------------------------------
bool parseArguments(Parameters& _par, int argc, char* argv[])
{
if (argc == 2) {
size_t pos;
_par.listener_addr = argv[1];
pos = _par.listener_addr.rfind(':');
if (pos != string::npos) {
_par.listener_addr[pos] = '\0';
_par.listener_port.assign(_par.listener_addr.c_str() + pos + 1);
_par.listener_addr.resize(pos);
}
}
return true;
}