Skip to content

Commit

Permalink
split out send checking, multipart init
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Jan 2, 2024
1 parent 0e861f2 commit 5d10458
Showing 1 changed file with 26 additions and 30 deletions.
56 changes: 26 additions & 30 deletions src/CMQMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,18 @@ class CMQMaster {
}

int send(SEXP cmd) {
if (peers.find(cur) == peers.end())
Rcpp::stop("Trying to send to worker that does not exist");
auto &w = peers[cur];
if (w.status != wlife_t::active)
Rcpp::stop("Trying to send to worker that is not active");
auto &w = check_current_worker(wlife_t::active);
bool is_proxied = ! w.via.empty();
std::set<std::string> new_env;
std::set_difference(env_names.begin(), env_names.end(), w.env.begin(), w.env.end(),
std::inserter(new_env, new_env.end()));
std::vector<std::string> proxy_add_env;
std::set<std::string> *via_env;

zmq::multipart_t mp;
if (is_proxied) {
via_env = &peers[w.via].env;
mp.push_back(zmq::message_t(w.via));
}
mp.push_back(zmq::message_t(cur));
mp.push_back(zmq::message_t(0));
mp.push_back(int2msg(wlife_t::active));
auto mp = init_multipart(w, wlife_t::active);
mp.push_back(r2msg(cmd));
if (is_proxied)
via_env = &peers[w.via].env;

for (auto &str : new_env) {
w.env.insert(str);
Expand All @@ -160,19 +151,8 @@ class CMQMaster {
return w.call_ref;
}
void send_shutdown() {
if (peers.find(cur) == peers.end())
Rcpp::stop("Trying to send to worker that does not exist");
auto &w = peers[cur];
if (w.status != wlife_t::active)
Rcpp::stop("Trying to send to worker that is not active");

zmq::multipart_t mp;
if (!w.via.empty())
mp.push_back(zmq::message_t(w.via));
mp.push_back(zmq::message_t(cur));
mp.push_back(zmq::message_t(0));
mp.push_back(int2msg(wlife_t::shutdown));

auto &w = check_current_worker(wlife_t::active);
auto mp = init_multipart(w, wlife_t::shutdown);
w.call = R_NilValue;
w.status = wlife_t::shutdown;
mp.send(sock);
Expand All @@ -187,10 +167,8 @@ class CMQMaster {
// msgs[1] == delimiter
// msgs[2] == wlife_t::proxy_cmd

zmq::multipart_t mp;
mp.push_back(zmq::message_t(cur));
mp.push_back(zmq::message_t(0));
mp.push_back(int2msg(wlife_t::proxy_cmd));
auto &w = check_current_worker(wlife_t::proxy_cmd);
auto mp = init_multipart(w, wlife_t::proxy_cmd);
mp.push_back(r2msg(args));
mp.send(sock);
}
Expand Down Expand Up @@ -293,6 +271,24 @@ class CMQMaster {
std::unordered_map<std::string, zmq::message_t> env;
std::set<std::string> env_names;

worker_t &check_current_worker(const wlife_t status) {
if (peers.find(cur) == peers.end())
Rcpp::stop("Trying to send to worker that does not exist");
auto &w = peers[cur];
if (w.status != status)
Rcpp::stop("Trying to send to worker with invalid status");
return w;
}
zmq::multipart_t init_multipart(const worker_t &w, const wlife_t status) const {
zmq::multipart_t mp;
if (!w.via.empty())
mp.push_back(zmq::message_t(w.via));
mp.push_back(zmq::message_t(cur));
mp.push_back(zmq::message_t(0));
mp.push_back(int2msg(status));
return mp;
}

int poll(int timeout=-1) {
auto pitems = std::vector<zmq::pollitem_t>(1);
pitems[0].socket = sock;
Expand Down

0 comments on commit 5d10458

Please sign in to comment.