Skip to content

Commit

Permalink
separate loops for send w/o proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Jan 2, 2024
1 parent 5d10458 commit 1433cee
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions src/CMQMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,36 +114,36 @@ class CMQMaster {

int send(SEXP cmd) {
auto &w = check_current_worker(wlife_t::active);
bool is_proxied = ! w.via.empty();
std::set<std::string> new_env;
std::set_difference(env_names.begin(), env_names.end(), w.env.begin(), w.env.end(),
std::inserter(new_env, new_env.end()));
std::vector<std::string> proxy_add_env;
std::set<std::string> *via_env;

auto mp = init_multipart(w, wlife_t::active);
mp.push_back(r2msg(cmd));
if (is_proxied)
via_env = &peers[w.via].env;

for (auto &str : new_env) {
w.env.insert(str);
if (is_proxied) {
if (via_env->find(str) != via_env->end()) {
if (w.via.empty()) {
for (auto &str : new_env) {
w.env.insert(str);
mp.push_back(zmq::message_t(str));
mp.push_back(zmq::message_t(env[str].data(), env[str].size()));
}
} else {
std::vector<std::string> proxy_add_env;
auto &via_env = peers[w.via].env;
for (auto &str : new_env) {
w.env.insert(str);
if (via_env.find(str) != via_env.end()) {
// std::cout << "+from_proxy " << str << "\n";
proxy_add_env.push_back(str);
continue;
} else {
// std::cout << "+from_master " << str << "\n";
via_env->insert(str);
via_env.insert(str);
}
mp.push_back(zmq::message_t(str));
mp.push_back(zmq::message_t(env[str].data(), env[str].size()));
}
mp.push_back(zmq::message_t(str));
mp.push_back(zmq::message_t(env[str].data(), env[str].size()));
}

if (is_proxied)
mp.push_back(r2msg(Rcpp::wrap(proxy_add_env)));
}

w.call = cmd;
w.call_ref = ++call_counter;
Expand Down

0 comments on commit 1433cee

Please sign in to comment.