Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded JSON-RPC with HTTP 1.1 Keep-Alive support #434

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 96 additions & 37 deletions src/bitcoinrpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ CReserveKey* pMiningKey = NULL;

static std::string strRPCUserColonPass;

void ThreadRPCServer3(void* parg);

Object JSONRPCError(int code, const string& message)
{
Object error;
Expand Down Expand Up @@ -298,7 +300,7 @@ Value getwork(const Array& params, bool fHelp)


typedef map<uint256, pair<CBlock*, CScript> > mapNewBlock_t;
static mapNewBlock_t mapNewBlock;
static mapNewBlock_t mapNewBlock; // FIXME: thread safety
static vector<CBlock*> vNewBlock;

if (params.size() == 0)
Expand Down Expand Up @@ -825,7 +827,7 @@ string rfc1123Time()
return string(buffer);
}

static string HTTPReply(int nStatus, const string& strMsg)
static string HTTPReply(int nStatus, const string& strMsg, bool keepalive)
{
if (nStatus == 401)
return strprintf("HTTP/1.0 401 Authorization Required\r\n"
Expand Down Expand Up @@ -854,7 +856,7 @@ static string HTTPReply(int nStatus, const string& strMsg)
return strprintf(
"HTTP/1.1 %d %s\r\n"
"Date: %s\r\n"
"Connection: close\r\n"
"Connection: %s\r\n"
"Content-Length: %d\r\n"
"Content-Type: application/json\r\n"
"Server: paycoin-json-rpc/%s\r\n"
Expand All @@ -863,19 +865,24 @@ static string HTTPReply(int nStatus, const string& strMsg)
nStatus,
cStatus,
rfc1123Time().c_str(),
keepalive ? "keep-alive" : "close",
strMsg.size(),
FormatFullVersion().c_str(),
strMsg.c_str());
}

int ReadHTTPStatus(std::basic_istream<char>& stream)
int ReadHTTPStatus(std::basic_istream<char>& stream, int &proto)
{
string str;
getline(stream, str);
vector<string> vWords;
boost::split(vWords, str, boost::is_any_of(" "));
if (vWords.size() < 2)
return 500;
proto = 0;
const char *ver = strstr(str.c_str(), "HTTP/1.");
if (ver != NULL)
proto = atoi(ver+7);
return atoi(vWords[1].c_str());
}

Expand Down Expand Up @@ -910,7 +917,8 @@ int ReadHTTP(std::basic_istream<char>& stream, map<string, string>& mapHeadersRe
strMessageRet = "";

// Read status
int nStatus = ReadHTTPStatus(stream);
int nProto;
int nStatus = ReadHTTPStatus(stream, nProto);

// Read header
int nLen = ReadHTTPHeader(stream, mapHeadersRet);
Expand All @@ -925,6 +933,16 @@ int ReadHTTP(std::basic_istream<char>& stream, map<string, string>& mapHeadersRe
strMessageRet = string(vch.begin(), vch.end());
}

string sConHdr = mapHeadersRet["connection"];

if ((sConHdr != "close") && (sConHdr != "keep-alive"))
{
if (nProto >= 1)
mapHeadersRet["connection"] = "keep-alive";
else
mapHeadersRet["connection"] = "close";
}

return nStatus;
}

Expand Down Expand Up @@ -983,7 +1001,7 @@ void ErrorReply(std::ostream& stream, const Object& objError, const Value& id)
if (code == -32600) nStatus = 400;
else if (code == -32601) nStatus = 404;
string strReply = JSONRPCReply(Value::null, objError, id);
stream << HTTPReply(nStatus, strReply) << std::flush;
stream << HTTPReply(nStatus, strReply, false) << std::flush;
}

bool ClientAllowed(const string& strAddress)
Expand Down Expand Up @@ -1049,22 +1067,36 @@ class SSLIOStreamDevice : public iostreams::device<iostreams::bidirectional> {
SSLStream& stream;
};

class AcceptedConnection
{
public:
SSLStream sslStream;
SSLIOStreamDevice d;
iostreams::stream<SSLIOStreamDevice> stream;

ip::tcp::endpoint peer;

AcceptedConnection(asio::io_service &io_service, ssl::context &context,
bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL),
stream(d) { ; }
};

void ThreadRPCServer(void* parg)
{
// getwork/getblocktemplate mining rewards paid here:
pMiningKey = new CReserveKey(pwalletMain);

try
{
vnThreadsRunning[THREAD_RPCSERVER]++;
vnThreadsRunning[THREAD_RPCLISTENER]++;
ThreadRPCServer2(parg);
vnThreadsRunning[THREAD_RPCSERVER]--;
vnThreadsRunning[THREAD_RPCLISTENER]--;
}
catch (std::exception& e) {
vnThreadsRunning[THREAD_RPCSERVER]--;
vnThreadsRunning[THREAD_RPCLISTENER]--;
PrintException(&e, "ThreadRPCServer()");
} catch (...) {
vnThreadsRunning[THREAD_RPCSERVER]--;
vnThreadsRunning[THREAD_RPCLISTENER]--;
PrintException(NULL, "ThreadRPCServer()");
}

Expand Down Expand Up @@ -1200,55 +1232,77 @@ void ThreadRPCServer2(void* parg)
for (;;)
{
// Accept connection
SSLStream sslStream(io_service, context);
SSLIOStreamDevice d(sslStream, fUseSSL);
iostreams::stream<SSLIOStreamDevice> stream(d);

ip::tcp::endpoint peer;
vnThreadsRunning[THREAD_RPCSERVER]--;
acceptor.accept(sslStream.lowest_layer(), peer);
vnThreadsRunning[4]++;
AcceptedConnection *conn =
new AcceptedConnection(io_service, context, fUseSSL);

vnThreadsRunning[THREAD_RPCLISTENER]--;
acceptor.accept(conn->sslStream.lowest_layer(), conn->peer);
vnThreadsRunning[THREAD_RPCLISTENER]++;

if (fShutdown)
{
delete conn;
return;
}

// Restrict callers by IP
if (!ClientAllowed(peer.address().to_string()))
// Restrict callers by IP. It is important to
// do this before starting client thread, to filter out
// certain DoS and misbehaving clients.
if (!ClientAllowed(conn->peer.address().to_string()))
{
// Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake.
if (!fUseSSL)
stream << HTTPReply(403, "") << std::flush;
continue;
conn->stream << HTTPReply(403, "", false) << std::flush;
delete conn;
}

// start HTTP client thread
else if (!NewThread(ThreadRPCServer3, conn)) {
printf("Failed to create RPC server client thread\n");
delete conn;
}
}
}

void ThreadRPCServer3(void* parg)
{
vnThreadsRunning[THREAD_RPCHANDLER]++;
AcceptedConnection *conn = (AcceptedConnection *) parg;

bool fRun = true;
for (;;) {
if (fShutdown || !fRun)
{
conn->stream.close();
delete conn;
--vnThreadsRunning[THREAD_RPCHANDLER];
return;
}
map<string, string> mapHeaders;
string strRequest;

boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest));
if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30))))
{ // Timed out:
acceptor.cancel();
printf("ThreadRPCServer ReadHTTP timeout\n");
continue;
}
ReadHTTP(conn->stream, mapHeaders, strRequest);

// Check authorization
if (mapHeaders.count("authorization") == 0)
{
stream << HTTPReply(401, "") << std::flush;
continue;
conn->stream << HTTPReply(401, "", false) << std::flush;
break;
}
if (!HTTPAuthorized(mapHeaders))
{
printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str());
printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str());
/* Deter brute-forcing short passwords.
If this results in a DOS the user really
shouldn't have their RPC port exposed.*/
if (mapArgs["-rpcpassword"].size() < 20)
Sleep(250);

stream << HTTPReply(401, "") << std::flush;
continue;
conn->stream << HTTPReply(401, "", false) << std::flush;
break;
}
if (mapHeaders["connection"] == "close")
fRun = false;

Value id = Value::null;
try
Expand All @@ -1271,17 +1325,22 @@ void ThreadRPCServer2(void* parg)
throw JSONRPCError(-32600, "Top-level object parse error");

// Send reply
stream << HTTPReply(200, strReply) << std::flush;
conn->stream << HTTPReply(200, strReply, fRun) << std::flush;
}
catch (Object& objError)
{
ErrorReply(stream, objError, id);
ErrorReply(conn->stream, objError, id);
break;
}
catch (std::exception& e)
{
ErrorReply(stream, JSONRPCError(-32700, e.what()), id);
ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id);
break;
}
}

delete conn;
vnThreadsRunning[THREAD_RPCHANDLER]--;
}

json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array &params) const
Expand Down
5 changes: 3 additions & 2 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1953,13 +1953,14 @@ bool StopNode()
if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n");
if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n");
if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n");
if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n");
if (vnThreadsRunning[THREAD_MINTER] > 0) printf("ThreadStakeMinter still running\n");
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0)
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0)
Sleep(20);
Sleep(50);
DumpAddresses();
Expand Down
3 changes: 2 additions & 1 deletion src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ enum threadId
THREAD_OPENCONNECTIONS,
THREAD_MESSAGEHANDLER,
THREAD_MINER,
THREAD_RPCSERVER,
THREAD_RPCLISTENER,
THREAD_UPNP,
THREAD_DNSSEED,
THREAD_ADDEDCONNECTIONS,
THREAD_DUMPADDRESS,
THREAD_MINTER,
THREAD_RPCHANDLER,

THREAD_MAX
};
Expand Down