diff --git a/src/binpickingtask.cpp b/src/binpickingtask.cpp index fb9fc833..9dac09c1 100644 --- a/src/binpickingtask.cpp +++ b/src/binpickingtask.cpp @@ -1817,14 +1817,12 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti socket.reset(); } socket.reset(new zmq::socket_t((*_zmqcontext.get()),ZMQ_SUB)); - socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect - socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - std::stringstream ss; ss << std::setprecision(std::numeric_limits::digits10+1); - ss << _heartbeatPort; - socket->connect(("tcp://"+ _mujinControllerIp+":"+ss.str()).c_str()); - socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); + socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect + socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + socket->connect("tcp://" + _mujinControllerIp + ":" + std::to_string(_heartbeatPort)); + socket->set(zmq::sockopt::subscribe, ""); zmq::pollitem_t pollitem; memset(&pollitem, 0, sizeof(zmq::pollitem_t)); @@ -1833,13 +1831,12 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti unsigned long long lastheartbeat = GetMilliTime(); while (!_bShutdownHeartbeatMonitor && (GetMilliTime() - lastheartbeat) / 1000.0f < reinitializetimeout) { - zmq::poll(&pollitem,1, 50); // wait 50 ms for message + zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message if (pollitem.revents & ZMQ_POLLIN) { zmq::message_t reply; - socket->recv(&reply); - std::string replystring((char *)reply.data (), (size_t)reply.size()); + socket->recv(reply); //if ((size_t)reply.size() == 1 && ((char *)reply.data())[0]==255) { - if (replystring == "255") { + if (reply.to_string() == "255") { lastheartbeat = GetMilliTime(); } } @@ -1859,37 +1856,36 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti std::string utils::GetHeartbeat(const std::string& endpoint) { zmq::context_t zmqcontext(1); zmq::socket_t socket(zmqcontext, ZMQ_SUB); - socket.connect(endpoint.c_str()); - socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); + socket.connect(endpoint); + socket.set(zmq::sockopt::subscribe, ""); zmq::pollitem_t pollitem; memset(&pollitem, 0, sizeof(zmq::pollitem_t)); pollitem.socket = socket; pollitem.events = ZMQ_POLLIN; - zmq::poll(&pollitem,1, 50); // wait 50 ms for message + zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message if (!(pollitem.revents & ZMQ_POLLIN)) { return ""; } zmq::message_t reply; - socket.recv(&reply); - const std::string received((char *)reply.data (), (size_t)reply.size()); + socket.recv(reply); #ifndef _WIN32 - return received; + return reply.to_string(); #else // sometimes buffer can container \n or \\, which windows does not like std::string newbuffer; std::vector< std::pair > serachpairs(2); serachpairs[0].first = "\n"; serachpairs[0].second = ""; serachpairs[1].first = "\\"; serachpairs[1].second = ""; - SearchAndReplace(newbuffer, received, serachpairs); + SearchAndReplace(newbuffer, reply.to_string(), serachpairs); return newbuffer; #endif } - namespace { + std::string FindSmallestSlaveRequestId(const rapidjson::Value& pt) { // get all slave request ids std::vector slavereqids; @@ -1936,13 +1932,11 @@ std::string FindSmallestSlaveRequestId(const rapidjson::Value& pt) { return slavereqids[smallest_suffix_index]; } -std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat, - const std::string& key) +std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat, const std::string& key) { rapidjson::Document pt(rapidjson::kObjectType); - std::stringstream ss(heartbeat); - ParseJson(pt, ss.str()); + ParseJson(pt, heartbeat); try { const std::string slavereqid = FindSmallestSlaveRequestId(pt); std::string result; @@ -1954,8 +1948,8 @@ std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat, } } -} +} // anonymous namespace std::string mujinclient::utils::GetScenePkFromHeartbeat(const std::string& heartbeat) { static const std::string prefix("mujin:/"); @@ -1964,8 +1958,7 @@ std::string mujinclient::utils::GetScenePkFromHeartbeat(const std::string& heart std::string utils::GetSlaveRequestIdFromHeartbeat(const std::string& heartbeat) { rapidjson::Document pt; - std::stringstream ss(heartbeat); - ParseJson(pt, ss.str()); + ParseJson(pt, heartbeat); try { static const std::string prefix("slaverequestid-"); return FindSmallestSlaveRequestId(pt).substr(prefix.length()); diff --git a/src/binpickingtaskzmq.cpp b/src/binpickingtaskzmq.cpp index 6a54ba0d..cac8747d 100644 --- a/src/binpickingtaskzmq.cpp +++ b/src/binpickingtaskzmq.cpp @@ -220,14 +220,12 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ socket.reset(); } socket.reset(new zmq::socket_t((*_zmqcontext.get()),ZMQ_SUB)); - socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect - socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - std::stringstream ss; ss << std::setprecision(std::numeric_limits::digits10+1); - ss << _heartbeatPort; - socket->connect (("tcp://"+ _mujinControllerIp+":"+ss.str()).c_str()); - socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); + socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect + socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + socket->connect("tcp://" + _mujinControllerIp + ":" + std::to_string(_heartbeatPort)); + socket->set(zmq::sockopt::subscribe, ""); zmq::pollitem_t pollitem; memset(&pollitem, 0, sizeof(zmq::pollitem_t)); @@ -235,15 +233,13 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ pollitem.events = ZMQ_POLLIN; unsigned long long lastheartbeat = GetMilliTime(); while (!_bShutdownHeartbeatMonitor && (GetMilliTime() - lastheartbeat) / 1000.0f < reinitializetimeout) { - zmq::poll(&pollitem,1, 50); // wait 50 ms for message + zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message if (pollitem.revents & ZMQ_POLLIN) { zmq::message_t reply; - socket->recv(&reply); - std::string replystring((char *)reply.data (), (size_t)reply.size()); + socket->recv(reply); rapidjson::Document pt(rapidjson::kObjectType); try{ - std::stringstream replystring_ss(replystring); - ParseJson(pt, replystring_ss.str()); + ParseJson(pt, reply.to_string()); heartbeat.Parse(pt); { boost::mutex::scoped_lock lock(_mutexTaskState); @@ -257,7 +253,7 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ } catch (std::exception const &e) { MUJIN_LOG_ERROR("HeartBeat reply is not JSON"); - MUJIN_LOG_ERROR(replystring); + MUJIN_LOG_ERROR(reply.to_string()); MUJIN_LOG_ERROR(e.what()); continue; } @@ -272,6 +268,4 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ MUJIN_LOG_DEBUG(str(boost::format("Stopped controller %s monitoring thread on port %d for slaverequestid=%s.")%_mujinControllerIp%_heartbeatPort%_slaverequestid)); } - - } // end namespace mujinclient diff --git a/src/controllerclientimpl.cpp b/src/controllerclientimpl.cpp index 4a018d35..b5bd39f6 100644 --- a/src/controllerclientimpl.cpp +++ b/src/controllerclientimpl.cpp @@ -17,7 +17,7 @@ #include #include #include -#include +#include #define SKIP_PEER_VERIFICATION // temporary //#define SKIP_HOSTNAME_VERIFICATION @@ -126,7 +126,7 @@ ControllerClientImpl::ControllerClientImpl(const std::string& usernamepassword, _mujinControllerIp = _baseuri.substr(_baseuri.find("//")+2, _baseuri.size()); _mujinControllerIp.erase(remove(_mujinControllerIp.begin(), _mujinControllerIp.end(), '/'), _mujinControllerIp.end()); // Remove / from string // Remove port - int idx = _mujinControllerIp.find(':'); + size_t idx = _mujinControllerIp.find(':'); if (idx != std::string::npos) { _mujinControllerIp = _mujinControllerIp.substr(0, idx); } diff --git a/src/mujinzmq.cpp b/src/mujinzmq.cpp index fcfb814e..357814a0 100644 --- a/src/mujinzmq.cpp +++ b/src/mujinzmq.cpp @@ -151,16 +151,14 @@ void ZmqSubscriber::_InitializeSocket(boost::shared_ptr context) _sharedcontext = false; } _socket.reset(new zmq::socket_t ((*_context.get()), ZMQ_SUB)); - _socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - _socket->setsockopt(ZMQ_SNDHWM, 2); - _socket->setsockopt(ZMQ_LINGER, 100); // ms - std::ostringstream port_stream; - port_stream << _port; - _socket->connect (("tcp://" + _host + ":" + port_stream.str()).c_str()); - _socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); + _socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect + _socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + _socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + _socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + _socket->set(zmq::sockopt::sndhwm, 2); + _socket->set(zmq::sockopt::linger, 100); // ms + _socket->connect("tcp://" + _host + ':' + std::to_string(_port)); + _socket->set(zmq::sockopt::subscribe, ""); } void ZmqSubscriber::_DestroySocket() @@ -189,7 +187,7 @@ bool ZmqPublisher::Publish(const std::string& messagestr) { zmq::message_t message(messagestr.size()); memcpy(message.data(), messagestr.data(), messagestr.size()); - return _socket->send(message); + return (bool)_socket->send(message, zmq::send_flags::none); } void ZmqPublisher::_InitializeSocket(boost::shared_ptr context) @@ -203,15 +201,13 @@ void ZmqPublisher::_InitializeSocket(boost::shared_ptr context) _sharedcontext = false; } _socket.reset(new zmq::socket_t ((*(zmq::context_t*)_context.get()), ZMQ_PUB)); - _socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - _socket->setsockopt(ZMQ_SNDHWM, 2); - _socket->setsockopt(ZMQ_LINGER, 100); // ms - std::ostringstream port_stream; - port_stream << _port; - _socket->bind (("tcp://*:" + port_stream.str()).c_str()); + _socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect + _socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + _socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + _socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + _socket->set(zmq::sockopt::sndhwm, 2); + _socket->set(zmq::sockopt::linger, 100); // ms + _socket->bind("tcp://*:" + std::to_string(_port)); } void ZmqPublisher::_DestroySocket() @@ -251,7 +247,7 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const bool recreatedonce = false; while (GetMilliTime() - starttime < timeout*1000.0) { try { - _socket->send(request); + _socket->send(request, zmq::send_flags::none); break; } catch (const zmq::error_t& e) { if (e.num() == EAGAIN) { @@ -277,10 +273,9 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const _InitializeSocket(_context); recreatedonce = true; } else{ - std::stringstream ss; - ss << "Failed to send request after re-creating socket."; - MUJIN_LOG_ERROR(ss.str()); - throw MujinException(ss.str(), MEC_Failed); + std::string ss = "Failed to send request after re-creating socket."; + MUJIN_LOG_ERROR(ss); + throw MujinException(ss, MEC_Failed); } } if( !!_preemptfn ) { @@ -289,15 +284,14 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const } if (GetMilliTime() - starttime > timeout*1000.0) { - std::stringstream ss; - ss << "Timed out trying to send request."; - MUJIN_LOG_ERROR(ss.str()); + std::string ss = "Timed out trying to send request."; + MUJIN_LOG_ERROR(ss); if (msg.length() > 1000) { MUJIN_LOG_INFO(msg.substr(0,1000) << "..."); } else { MUJIN_LOG_INFO(msg); } - throw MujinException(ss.str(), MEC_Timeout); + throw MujinException(ss, MEC_Timeout); } //recv recreatedonce = false; @@ -321,12 +315,11 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const timeoutms = timeout * 1000.0; } - zmq::poll(&pollitem, 1, timeoutms); + zmq::poll(&pollitem, 1, std::chrono::milliseconds{timeoutms}); receivedonce = true; if (pollitem.revents & ZMQ_POLLIN) { - _socket->recv(&reply); - std::string replystring((char *) reply.data (), (size_t) reply.size()); - return replystring; + _socket->recv(reply); + return reply.to_string(); } else{ std::stringstream ss; if (msg.length() > 1000) { @@ -378,15 +371,14 @@ std::string ZmqClient::Call(const std::string& msg, const double timeout, const } } if (GetMilliTime() - starttime > timeout*1000.0) { - std::stringstream ss; - ss << "timed out trying to receive request"; - MUJIN_LOG_ERROR(ss.str()); + std::string ss = "timed out trying to receive request"; + MUJIN_LOG_ERROR(ss); if (msg.length() > 1000) { MUJIN_LOG_INFO(msg.substr(0,1000) << "..."); } else { MUJIN_LOG_INFO(msg); } - throw MujinException(ss.str(), MEC_Failed); + throw MujinException(ss, MEC_Failed); } return ""; @@ -402,22 +394,19 @@ void ZmqClient::_InitializeSocket(boost::shared_ptr context) _sharedcontext = false; } _socket.reset(new zmq::socket_t ((*(zmq::context_t*)_context.get()), ZMQ_REQ)); - _socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer - std::ostringstream port_stream; - port_stream << _port; - std::stringstream ss; - ss << "connecting to socket at " << _host << ":" << _port; - MUJIN_LOG_INFO(ss.str()); - _socket->connect (("tcp://" + _host + ":" + port_stream.str()).c_str()); + _socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect + _socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + _socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + _socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + std::string endpoint = "tcp://" + _host + ':' + std::to_string(_port); + MUJIN_LOG_INFO("connecting to socket at " + endpoint); + _socket->connect(endpoint); } void ZmqClient::_DestroySocket() { if (!!_socket) { - _socket->setsockopt(ZMQ_LINGER, 0); + _socket->set(zmq::sockopt::linger, 0); _socket->close(); _socket.reset(); } @@ -438,7 +427,7 @@ unsigned int ZmqServer::Recv(std::string& data, long timeout) { // wait timeout in millisecond for message if (timeout > 0) { - zmq::poll(&_pollitem, 1, timeout); + zmq::poll(&_pollitem, 1, std::chrono::milliseconds{timeout}); if ((_pollitem.revents & ZMQ_POLLIN) == 0) { // did not receive anything @@ -446,10 +435,9 @@ unsigned int ZmqServer::Recv(std::string& data, long timeout) } } - const bool ret = _socket->recv(&_reply, ZMQ_NOBLOCK); + const bool ret = (bool)_socket->recv(_reply, zmq::recv_flags::dontwait); if (ret && _reply.size() > 0) { - data.resize(_reply.size()); - std::copy((uint8_t*)_reply.data(), (uint8_t*)_reply.data() + _reply.size(), data.begin()); + data = _reply.to_string(); return _reply.size(); } else { return 0; @@ -460,7 +448,7 @@ void ZmqServer::Send(const std::string& message) { zmq::message_t request(message.size()); memcpy((void *)request.data(), message.c_str(), message.size()); - _socket->send(request); + _socket->send(request, zmq::send_flags::none); } void ZmqServer::_InitializeSocket(boost::shared_ptr context) @@ -474,28 +462,25 @@ void ZmqServer::_InitializeSocket(boost::shared_ptr context) } _socket.reset(new zmq::socket_t((*(zmq::context_t*)_context.get()), ZMQ_REP)); - _socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime - _socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer + _socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect + _socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further + _socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime + _socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer // setup the pollitem memset(&_pollitem, 0, sizeof(_pollitem)); _pollitem.socket = _socket->operator void*(); _pollitem.events = ZMQ_POLLIN; - std::ostringstream endpoint; - endpoint << "tcp://*:" << _port; - _socket->bind(endpoint.str().c_str()); - std::stringstream ss; - ss << "binded to " << endpoint.str(); - MUJIN_LOG_INFO(ss.str()); + std::string endpoint = "tcp://*:" + std::to_string(_port); + _socket->bind(endpoint); + MUJIN_LOG_INFO("binded to " + endpoint); } void ZmqServer::_DestroySocket() { if (!!_socket) { - _socket->setsockopt(ZMQ_LINGER, 0); + _socket->set(zmq::sockopt::linger, 0); _socket->close(); _socket.reset(); }