Skip to content

Commit

Permalink
Created a simple JsonRPC based chat server example, shows:
Browse files Browse the repository at this point in the history
- Asynchronous JsonRPC handling on both client and server side
- Errors and timeout handling
  • Loading branch information
artyom-beilis committed Dec 11, 2011
1 parent f591a95 commit 7da94a4
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 21 deletions.
44 changes: 42 additions & 2 deletions contrib/client_side/jsonrpc/jsonrpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,33 @@
/// - id - JSONRPC id. It should be null for notification methods
/// and it should be some integer or string for function methods
///
/// Each method given in the constructor would have following properties:
///
/// on_error(e) - Returned error, where e.type is one of 'transport', 'protocol', 'response' and
/// e.error is the error object.
/// on_result(r) - Returned method result, or on_result() - for notifications.
///
/// For example
///
/// var rpc = new JsonRPC('/chat',['getValue','getStatistics'],['updateValue']);
///
/// // Asynchronouse method
///
/// rpc.getValue.on_error = function(e) { alert('Error:' + e.error); }
/// rpc.getValue.on_result = function(r) { alert(r); }
///
/// rpc.getValue();
///
/// // Synchronous method
///
/// // not setting callbacks or setting on_error and on_result to null
/// // makes them synchronous rpc calls. For example;
///
/// alert(rpc.getStatistics());
/// rpc.updateValue(10);
///
///

function JsonRPC(uri,function_methods,notification_methods) {
if(!(this instanceof JsonRPC))
return new JsonRPC(uri,function_methods,notification_methods);
Expand Down Expand Up @@ -75,7 +102,13 @@ JsonRPC.prototype.syncCall = function(name,id,params) {
if(xhr.status!=200)
throw Error('Invalid response:' + xhr.status);
if(id!=null) {
var response = JSON.parse(xhr.responseText);
var response = null;
try {
response = JSON.parse(xhr.responseText);
}
catch(e) {
throw Error('Invalid JSON-RPC response');
}
if(response.error != null)
throw Error(response.error);
return response.result;
Expand All @@ -95,7 +128,14 @@ JsonRPC.prototype.asyncCall = function(name,id,params,on_result,on_error) {
return;
if(xhr.status==200) {
if(id!=null) {
var response = JSON.parse(xhr.responseText);
var response = null;
try {
response = JSON.parse(xhr.responseText);
}
catch(e) {
on_error({'type' : 'protocol', 'error' : 'invalid response'});
return;
}
if(response.error != null) {
on_error({'type': 'response', 'error' : response.error });
}
Expand Down
50 changes: 37 additions & 13 deletions examples/json_rpc_chat/chat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,48 +41,55 @@ class chat : public cppcms::rpc::json_rpc_server {
cppcms::rpc::json_rpc_server(srv),
timer_(srv.get_io_service())
{
// Our main methods
bind("post",cppcms::rpc::json_method(&chat::post,this),notification_role);
bind("get",cppcms::rpc::json_method(&chat::get,this),method_role);

// Add timeouts to the system
last_wake_ = time(0);
on_timer(booster::system::error_code());
}

// Handle new message call
void post(std::string const &author,std::string const &message)
{
cppcms::json::value obj;
obj["author"]=author;
obj["message"]=message;
messages_.push_back(obj);
last_wake_ = time(0);
broadcast(messages_.size()-1);
}

void on_timer(booster::system::error_code const &e)
{
if(e) return; // cancelation
if(last_wake_ - time(0) > 10) {

// check idle connections for more then 10 seconds
if(time(0) - last_wake_ > 10) {
broadcast(messages_.size());
last_wake_ = time(0);
}
// restart timer
timer_.expires_from_now(booster::ptime::seconds(1));
timer_.async_wait(boost::bind(&chat::on_timer,booster::intrusive_ptr<chat>(this),_1));
std::cout << "Status: \n"
<< "Waiters: " << waiters_.size() << '\n'
<< "Messages:" << messages_.size() <<'\n'
<< "[";
for(size_t i=0;i<messages_.size();i++)
std::cout << messages_[i] << std::endl;
std::cout <<"]" << std::endl;
}

// Handle request
void get(unsigned from)
{
if(from < messages_.size()) {
// not long polling - return result now
return_result(make_response(from));
return;
}
else if(from == messages_.size()) {
// Can't answer now

// Add long polling request to the list

booster::shared_ptr<cppcms::rpc::json_call> call=release_call();
waiters_.insert(call);

// set disconnect callback
call->context().async_on_peer_reset(
boost::bind(
&chat::remove_context,
Expand All @@ -93,36 +100,53 @@ class chat : public cppcms::rpc::json_rpc_server {
return_error("Invalid position");
}
}

// handle client disconnect
void remove_context(booster::shared_ptr<cppcms::rpc::json_call> call)
{
waiters_.erase(call);
}

void broadcast(size_t from)
{
// update timeout
last_wake_ = time(0);
// Prepare response
cppcms::json::value response = make_response(from);
// Send it to everybody
for(waiters_type::iterator waiter=waiters_.begin();waiter!=waiters_.end();++waiter) {
booster::shared_ptr<cppcms::rpc::json_call> call = *waiter;
call->return_result(make_response(from));
call->return_result(response);
}
waiters_.clear();
}

// Prepare response to the client
cppcms::json::value make_response(size_t n)
{
cppcms::json::value v;

// Small optimization
v=cppcms::json::array();
cppcms::json::array &ar = v.array();

ar.reserve(messages_.size() - n);

// prepare all messages
for(size_t i=n;i<messages_.size();i++) {
ar.push_back(messages_[i]);
}
std::cout << "Response to client:" << v << std::endl;
return v;
}
private:

// message store
std::vector<cppcms::json::value> messages_;

// long poll requests
typedef std::set<booster::shared_ptr<cppcms::rpc::json_call> > waiters_type;
waiters_type waiters_;

// timer for resetting idle requests
booster::aio::deadline_timer timer_;
time_t last_wake_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,16 @@
<title>Chat Room</title>

<script type="text/javascript">
message_count = 0;
// Global values:

// RPC object with two methods:
//
// get(counter), returns array of objects with properties author and message
// post(author,message) posts new chat message
//
rpc = new JsonRPC('/chat',['get'],['post']);
// Messages counter - where to get new messages from, parameter for rpc.get
message_count = 0;

function make_error(what,e)
{
Expand All @@ -25,7 +33,6 @@
'<dd>' + m.message + '</dd>';
message_count++;
}
//messagesHtml.innerHTML += '<p>JSON: ' + JSON.stringify(messages) + '</p>' ;

restart();
}
Expand All @@ -35,25 +42,29 @@
document.getElementById('reconnect').disabled = false;
}

rpc.post.on_error = function(e) {
make_error('Posting New Messages',e);
}
rpc.post.on_result = function() {
// reset the form content
document.getElementById("message").value = '';
}
rpc.post.on_error = function(e) {
make_error('Posting New Messages',e);
}

function restart()
{
rpc.get(message_count);
}

function reconnect_to_server()
{
message_count = 0;
document.getElementById('error_message').innerHTML = '';
document.getElementById('messages').innerHTML = '';
document.getElementById('reconnect').disabled = true;
restart();
return false;
}

function send_data() {
author = document.getElementById('author').value;
message = document.getElementById("message").value;
Expand Down

0 comments on commit 7da94a4

Please sign in to comment.