diff --git a/include/mujincontrollerclient/mujinjson.h b/include/mujincontrollerclient/mujinjson.h index 90cae29f..f4354f3c 100644 --- a/include/mujincontrollerclient/mujinjson.h +++ b/include/mujincontrollerclient/mujinjson.h @@ -164,7 +164,7 @@ inline void ParseJson(rapidjson::Document& d, const std::string& str) { inline void ParseJson(rapidjson::Document& d, std::istream& is) { rapidjson::IStreamWrapper isw(is); // see note in: void ParseJson(rapidjson::Document& d, const std::string& str) - rapidjson::Document tempDoc; + rapidjson::Document(tempDoc); tempDoc.ParseStream(isw); // parse float in full precision mode if (tempDoc.HasParseError()) { throw MujinJSONException(boost::str(boost::format("Json stream is invalid (offset %u) %s")%((unsigned)d.GetErrorOffset())%GetParseError_En(d.GetParseError())), MJE_Failed); diff --git a/src/binpickingtask.cpp b/src/binpickingtask.cpp index 20f9da71..b33ab4cd 100644 --- a/src/binpickingtask.cpp +++ b/src/binpickingtask.cpp @@ -1785,12 +1785,14 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti socket.reset(); } socket.reset(new zmq::socket_t((*_zmqcontext.get()),ZMQ_SUB)); - socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect - socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - socket->connect("tcp://" + _mujinControllerIp + ":" + std::to_string(_heartbeatPort)); - socket->set(zmq::sockopt::subscribe, ""); + socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect + socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + std::stringstream ss; ss << std::setprecision(std::numeric_limits::digits10+1); + ss << _heartbeatPort; + socket->connect(("tcp://"+ _mujinControllerIp+":"+ss.str()).c_str()); + socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); zmq::pollitem_t pollitem; memset(&pollitem, 0, sizeof(zmq::pollitem_t)); @@ -1799,12 +1801,13 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti unsigned long long lastheartbeat = GetMilliTime(); while (!_bShutdownHeartbeatMonitor && (GetMilliTime() - lastheartbeat) / 1000.0f < reinitializetimeout) { - zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message + zmq::poll(&pollitem,1, 50); // wait 50 ms for message if (pollitem.revents & ZMQ_POLLIN) { zmq::message_t reply; - socket->recv(reply); + socket->recv(&reply); + std::string replystring((char *)reply.data (), (size_t)reply.size()); //if ((size_t)reply.size() == 1 && ((char *)reply.data())[0]==255) { - if (reply.to_string() == "255") { + if (replystring == "255") { lastheartbeat = GetMilliTime(); } } @@ -1824,36 +1827,37 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti std::string utils::GetHeartbeat(const std::string& endpoint) { zmq::context_t zmqcontext(1); zmq::socket_t socket(zmqcontext, ZMQ_SUB); - socket.connect(endpoint); - socket.set(zmq::sockopt::subscribe, ""); + socket.connect(endpoint.c_str()); + socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); zmq::pollitem_t pollitem; memset(&pollitem, 0, sizeof(zmq::pollitem_t)); pollitem.socket = socket; pollitem.events = ZMQ_POLLIN; - zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message + zmq::poll(&pollitem,1, 50); // wait 50 ms for message if (!(pollitem.revents & ZMQ_POLLIN)) { return ""; } zmq::message_t reply; - socket.recv(reply); + socket.recv(&reply); + const std::string received((char *)reply.data (), (size_t)reply.size()); #ifndef _WIN32 - return reply.to_string(); + return received; #else // sometimes buffer can container \n or \\, which windows does not like std::string newbuffer; std::vector< std::pair > serachpairs(2); serachpairs[0].first = "\n"; serachpairs[0].second = ""; serachpairs[1].first = "\\"; serachpairs[1].second = ""; - SearchAndReplace(newbuffer, reply.to_string(), serachpairs); + SearchAndReplace(newbuffer, received, serachpairs); return newbuffer; #endif } -namespace { +namespace { std::string FindSmallestSlaveRequestId(const rapidjson::Value& pt) { // get all slave request ids std::vector slavereqids; @@ -1894,11 +1898,13 @@ std::string FindSmallestSlaveRequestId(const rapidjson::Value& pt) { return slavereqids[smallest_suffix_index]; } -std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat, const std::string& key) +std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat, + const std::string& key) { rapidjson::Document pt(rapidjson::kObjectType); - ParseJson(pt, heartbeat); + std::stringstream ss(heartbeat); + ParseJson(pt, ss.str()); try { const std::string slavereqid = FindSmallestSlaveRequestId(pt); std::string result; @@ -1910,8 +1916,8 @@ std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat, cons } } +} -} // anonymous namespace std::string mujinclient::utils::GetScenePkFromHeartbeat(const std::string& heartbeat) { static const std::string prefix("mujin:/"); @@ -1920,7 +1926,8 @@ std::string mujinclient::utils::GetScenePkFromHeartbeat(const std::string& heart std::string utils::GetSlaveRequestIdFromHeartbeat(const std::string& heartbeat) { rapidjson::Document pt; - ParseJson(pt, heartbeat); + std::stringstream ss(heartbeat); + ParseJson(pt, ss.str()); try { static const std::string prefix("slaverequestid-"); return FindSmallestSlaveRequestId(pt).substr(prefix.length()); diff --git a/src/binpickingtaskzmq.cpp b/src/binpickingtaskzmq.cpp index 2656a167..334ad212 100644 --- a/src/binpickingtaskzmq.cpp +++ b/src/binpickingtaskzmq.cpp @@ -267,12 +267,14 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ socket.reset(); } socket.reset(new zmq::socket_t((*_zmqcontext.get()),ZMQ_SUB)); - socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect - socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - socket->connect("tcp://" + _mujinControllerIp + ":" + std::to_string(_heartbeatPort)); - socket->set(zmq::sockopt::subscribe, ""); + socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect + socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + std::stringstream ss; ss << std::setprecision(std::numeric_limits::digits10+1); + ss << _heartbeatPort; + socket->connect (("tcp://"+ _mujinControllerIp+":"+ss.str()).c_str()); + socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); zmq::pollitem_t pollitem; memset(&pollitem, 0, sizeof(zmq::pollitem_t)); @@ -280,13 +282,15 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ pollitem.events = ZMQ_POLLIN; unsigned long long lastheartbeat = GetMilliTime(); while (!_bShutdownHeartbeatMonitor && (GetMilliTime() - lastheartbeat) / 1000.0f < reinitializetimeout) { - zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message + zmq::poll(&pollitem,1, 50); // wait 50 ms for message if (pollitem.revents & ZMQ_POLLIN) { zmq::message_t reply; - socket->recv(reply); + socket->recv(&reply); + std::string replystring((char *)reply.data (), (size_t)reply.size()); rapidjson::Document pt(rapidjson::kObjectType); try{ - ParseJson(pt, reply.to_string()); + std::stringstream replystring_ss(replystring); + ParseJson(pt, replystring_ss.str()); heartbeat.Parse(pt); { boost::mutex::scoped_lock lock(_mutexTaskState); @@ -300,7 +304,7 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ } catch (std::exception const &e) { MUJIN_LOG_ERROR("HeartBeat reply is not JSON"); - MUJIN_LOG_ERROR(reply.to_string()); + MUJIN_LOG_ERROR(replystring); MUJIN_LOG_ERROR(e.what()); continue; } @@ -315,4 +319,6 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ MUJIN_LOG_DEBUG(str(boost::format("Stopped controller %s monitoring thread on port %d for slaverequestid=%s.")%_mujinControllerIp%_heartbeatPort%_slaverequestid)); } + + } // end namespace mujinclient diff --git a/src/mujinzmq.cpp b/src/mujinzmq.cpp index dc82ecc2..ff7c1896 100644 --- a/src/mujinzmq.cpp +++ b/src/mujinzmq.cpp @@ -151,14 +151,16 @@ void ZmqSubscriber::_InitializeSocket(boost::shared_ptr context) _sharedcontext = false; } _socket.reset(new zmq::socket_t ((*_context.get()), ZMQ_SUB)); - _socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect - _socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - _socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - _socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - _socket->set(zmq::sockopt::sndhwm, 2); - _socket->set(zmq::sockopt::linger, 100); // ms - _socket->connect("tcp://" + _host + ':' + std::to_string(_port)); - _socket->set(zmq::sockopt::subscribe, ""); + _socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + _socket->setsockopt(ZMQ_SNDHWM, 2); + _socket->setsockopt(ZMQ_LINGER, 100); // ms + std::ostringstream port_stream; + port_stream << _port; + _socket->connect (("tcp://" + _host + ":" + port_stream.str()).c_str()); + _socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); } void ZmqSubscriber::_DestroySocket() @@ -187,7 +189,7 @@ bool ZmqPublisher::Publish(const std::string& messagestr) { zmq::message_t message(messagestr.size()); memcpy(message.data(), messagestr.data(), messagestr.size()); - return (bool)_socket->send(message, zmq::send_flags::none); + return _socket->send(message); } void ZmqPublisher::_InitializeSocket(boost::shared_ptr context) @@ -201,13 +203,15 @@ void ZmqPublisher::_InitializeSocket(boost::shared_ptr context) _sharedcontext = false; } _socket.reset(new zmq::socket_t ((*(zmq::context_t*)_context.get()), ZMQ_PUB)); - _socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect - _socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - _socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - _socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - _socket->set(zmq::sockopt::sndhwm, 2); - _socket->set(zmq::sockopt::linger, 100); // ms - _socket->bind("tcp://*:" + std::to_string(_port)); + _socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + _socket->setsockopt(ZMQ_SNDHWM, 2); + _socket->setsockopt(ZMQ_LINGER, 100); // ms + std::ostringstream port_stream; + port_stream << _port; + _socket->bind (("tcp://*:" + port_stream.str()).c_str()); } void ZmqPublisher::_DestroySocket() @@ -247,7 +251,7 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const bool recreatedonce = false; while (GetMilliTime() - starttime < timeout*1000.0) { try { - _socket->send(request, zmq::send_flags::none); + _socket->send(request); break; } catch (const zmq::error_t& e) { if (e.num() == EAGAIN) { @@ -273,9 +277,10 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const _InitializeSocket(_context); recreatedonce = true; } else{ - std::string ss = "Failed to send request after re-creating socket."; - MUJIN_LOG_ERROR(ss); - throw MujinException(ss, MEC_Failed); + std::stringstream ss; + ss << "Failed to send request after re-creating socket."; + MUJIN_LOG_ERROR(ss.str()); + throw MujinException(ss.str(), MEC_Failed); } } if( !!_preemptfn ) { @@ -284,14 +289,15 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const } if (GetMilliTime() - starttime > timeout*1000.0) { - std::string ss = "Timed out trying to send request."; - MUJIN_LOG_ERROR(ss); + std::stringstream ss; + ss << "Timed out trying to send request."; + MUJIN_LOG_ERROR(ss.str()); if (msg.length() > 1000) { MUJIN_LOG_INFO(msg.substr(0,1000) << "..."); } else { MUJIN_LOG_INFO(msg); } - throw MujinException(ss, MEC_Timeout); + throw MujinException(ss.str(), MEC_Timeout); } //recv recreatedonce = false; @@ -315,11 +321,12 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const timeoutms = timeout * 1000.0; } - zmq::poll(&pollitem, 1, std::chrono::milliseconds{timeoutms}); + zmq::poll(&pollitem, 1, timeoutms); receivedonce = true; if (pollitem.revents & ZMQ_POLLIN) { - _socket->recv(reply); - return reply.to_string(); + _socket->recv(&reply); + std::string replystring((char *) reply.data (), (size_t) reply.size()); + return replystring; } else{ std::stringstream ss; if (msg.length() > 1000) { @@ -371,14 +378,15 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const } } if (GetMilliTime() - starttime > timeout*1000.0) { - std::string ss = "timed out trying to receive request"; - MUJIN_LOG_ERROR(ss); + std::stringstream ss; + ss << "timed out trying to receive request"; + MUJIN_LOG_ERROR(ss.str()); if (msg.length() > 1000) { MUJIN_LOG_INFO(msg.substr(0,1000) << "..."); } else { MUJIN_LOG_INFO(msg); } - throw MujinException(ss, MEC_Failed); + throw MujinException(ss.str(), MEC_Failed); } return ""; @@ -394,13 +402,16 @@ void ZmqClient::_InitializeSocket(boost::shared_ptr context) _sharedcontext = false; } _socket.reset(new zmq::socket_t ((*(zmq::context_t*)_context.get()), ZMQ_REQ)); - _socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect - _socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - _socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - _socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - std::string endpoint = "tcp://" + _host + ':' + std::to_string(_port); - MUJIN_LOG_INFO("connecting to socket at " + endpoint); - _socket->connect(endpoint); + _socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + std::ostringstream port_stream; + port_stream << _port; + std::stringstream ss; + ss << "connecting to socket at " << _host << ":" << _port; + MUJIN_LOG_INFO(ss.str()); + _socket->connect (("tcp://" + _host + ":" + port_stream.str()).c_str()); } void ZmqClient::_DestroySocket() @@ -426,7 +437,7 @@ unsigned int ZmqServer::Recv(std::string& data, long timeout) { // wait timeout in millisecond for message if (timeout > 0) { - zmq::poll(&_pollitem, 1, std::chrono::milliseconds{timeout}); + zmq::poll(&_pollitem, 1, timeout); if ((_pollitem.revents & ZMQ_POLLIN) == 0) { // did not receive anything @@ -434,9 +445,10 @@ unsigned int ZmqServer::Recv(std::string& data, long timeout) } } - const bool ret = (bool)_socket->recv(_reply, zmq::recv_flags::dontwait); + const bool ret = _socket->recv(&_reply, ZMQ_NOBLOCK); if (ret && _reply.size() > 0) { - data = _reply.to_string(); + data.resize(_reply.size()); + std::copy((uint8_t*)_reply.data(), (uint8_t*)_reply.data() + _reply.size(), data.begin()); return _reply.size(); } else { return 0; @@ -447,7 +459,7 @@ void ZmqServer::Send(const std::string& message) { zmq::message_t request(message.size()); memcpy((void *)request.data(), message.c_str(), message.size()); - _socket->send(request, zmq::send_flags::none); + _socket->send(request); } void ZmqServer::_InitializeSocket(boost::shared_ptr context) @@ -461,19 +473,22 @@ void ZmqServer::_InitializeSocket(boost::shared_ptr context) } _socket.reset(new zmq::socket_t((*(zmq::context_t*)_context.get()), ZMQ_REP)); - _socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect - _socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - _socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - _socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + _socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + _socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer // setup the pollitem memset(&_pollitem, 0, sizeof(_pollitem)); _pollitem.socket = _socket->operator void*(); _pollitem.events = ZMQ_POLLIN; - std::string endpoint = "tcp://*:" + std::to_string(_port); - _socket->bind(endpoint); - MUJIN_LOG_INFO("binded to " + endpoint); + std::ostringstream endpoint; + endpoint << "tcp://*:" << _port; + _socket->bind(endpoint.str().c_str()); + std::stringstream ss; + ss << "binded to " << endpoint.str(); + MUJIN_LOG_INFO(ss.str()); } void ZmqServer::_DestroySocket()