diff --git a/src/CMQMaster.h b/src/CMQMaster.h index e47914d..bd38833 100644 --- a/src/CMQMaster.h +++ b/src/CMQMaster.h @@ -24,10 +24,10 @@ class CMQMaster { return sock.get(zmq::sockopt::last_endpoint); } catch(zmq::error_t const &e) { if (errno != EADDRINUSE) - Rf_error(e.what()); + Rcpp::stop(e.what()); } } - Rf_error("Could not bind port to any address in provided pool"); + Rcpp::stop("Could not bind port to any address in provided pool"); } void close(int timeout=0) { @@ -45,7 +45,7 @@ class CMQMaster { SEXP recv(int timeout=-1) { if (peers.size() + pending_workers <= 0) - Rf_error("Trying to receive data without workers"); + Rcpp::stop("Trying to receive data without workers"); int data_offset; std::vector msgs; @@ -215,7 +215,7 @@ class CMQMaster { rc = zmq::poll(pitems, time_left); } catch (zmq::error_t const &e) { if (errno != EINTR || pending_interrupt()) - Rf_error(e.what()); + Rcpp::stop(e.what()); } if (timeout != -1) { @@ -252,7 +252,7 @@ class CMQMaster { w.via = msgs[0].to_string(); if (msgs[++cur_i].size() != 0) - Rf_error("No frame delimiter found at expected position"); + Rcpp::stop("No frame delimiter found at expected position"); // handle status frame if present, else it's a disconnect notification if (msgs.size() > ++cur_i) @@ -265,14 +265,14 @@ class CMQMaster { if (it->second.status == wlife_t::shutdown) it = peers.erase(it); else - Rf_error("Proxy disconnect with active worker(s)"); + Rcpp::stop("Proxy disconnect with active worker(s)"); } } peers.erase(cur); } else if (w.status == wlife_t::shutdown) peers.erase(cur); else - Rf_error("Unexpected worker disconnect"); + Rcpp::stop("Unexpected worker disconnect"); } w.time = msg2r(msgs[++cur_i], true); diff --git a/src/CMQProxy.h b/src/CMQProxy.h index 10c2ba3..ab75b35 100644 --- a/src/CMQProxy.h +++ b/src/CMQProxy.h @@ -36,7 +36,7 @@ class CMQProxy { to_master.set(zmq::sockopt::routing_id, "proxy"); if (zmq_socket_monitor(to_master, "inproc://monitor", ZMQ_EVENT_DISCONNECTED) < 0) - Rf_error("failed to create socket monitor"); + Rcpp::stop("failed to create socket monitor"); mon = zmq::socket_t(*ctx, ZMQ_PAIR); mon.connect("inproc://monitor"); @@ -69,10 +69,10 @@ class CMQProxy { return to_worker.get(zmq::sockopt::last_endpoint); } catch(zmq::error_t const &e) { if (errno != EADDRINUSE) - Rf_error(e.what()); + Rcpp::stop(e.what()); } } - Rf_error("Could not bind port to any address in provided pool"); + Rcpp::stop("Could not bind port to any address in provided pool"); } bool process_one() { @@ -91,7 +91,7 @@ class CMQProxy { rc = zmq::poll(pitems, time_left); } catch (zmq::error_t const &e) { if (errno != EINTR || pending_interrupt()) - Rf_error(e.what()); + Rcpp::stop(e.what()); } } while (rc == 0); diff --git a/src/CMQWorker.h b/src/CMQWorker.h index b150860..9c5690e 100644 --- a/src/CMQWorker.h +++ b/src/CMQWorker.h @@ -17,7 +17,7 @@ class CMQWorker { if (mon.handle() == nullptr) { if (zmq_socket_monitor(sock, "inproc://monitor", ZMQ_EVENT_DISCONNECTED) < 0) - Rf_error("failed to create socket monitor"); + Rcpp::stop("failed to create socket monitor"); mon = zmq::socket_t(*ctx, ZMQ_PAIR); mon.connect("inproc://monitor"); } @@ -30,7 +30,7 @@ class CMQWorker { sock.send(r2msg(gc()), zmq::send_flags::sndmore); sock.send(r2msg(R_NilValue), zmq::send_flags::none); } catch (zmq::error_t const &e) { - Rf_error(e.what()); + Rcpp::stop(e.what()); } } @@ -63,10 +63,10 @@ class CMQWorker { zmq::poll(pitems); } catch (zmq::error_t const &e) { if (errno != EINTR || pending_interrupt()) - Rf_error(e.what()); + Rcpp::stop(e.what()); } if (pitems[1].revents > 0) - Rf_error("Unexpected peer disconnect"); + Rcpp::stop("Unexpected peer disconnect"); total_sock_ev = pitems[0].revents; } while (total_sock_ev == 0); } @@ -125,7 +125,7 @@ class CMQWorker { zmq::poll(pitems, time_left); } catch (zmq::error_t const &e) { if (errno != EINTR || pending_interrupt()) - Rf_error(e.what()); + Rcpp::stop(e.what()); } auto ms_diff = std::chrono::duration_cast(Time::now() - start);