Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check memory is specified as numeric value #233

Closed
wants to merge 15 commits into from
Closed
1 change: 1 addition & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ README\.md
CITATION
_pkgdown\.yml
clustermq_[0-9.]+\.tar\.gz
^\.github$
132 changes: 132 additions & 0 deletions .github/workflows/check-standard.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# For help debugging build failures open an issue on the RStudio community with the 'github-actions' tag.
# https://community.rstudio.com/new-topic?category=Package%20development&tags=github-actions
on:
push:
branches-ignore: gh-pages
pull_request:
branches-ignore: gh-pages
schedule:
- cron: "0 0 * * 2"

name: R-CMD-check

jobs:
R-CMD-check:
runs-on: ${{ matrix.config.os }}

name: ${{ matrix.config.os }} (${{ matrix.config.r }})

strategy:
fail-fast: false
matrix:
config:
- {os: windows-latest, r: 'release'}
- {os: macOS-latest, r: 'release'}
- {os: ubuntu-20.04, r: 'release', rspm: "https://packagemanager.rstudio.com/cran/__linux__/focal/latest"}
- {os: ubuntu-20.04, r: 'devel', rspm: "https://packagemanager.rstudio.com/cran/__linux__/focal/latest"}

env:
R_REMOTES_NO_ERRORS_FROM_WARNINGS: true
RSPM: ${{ matrix.config.rspm }}
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}

steps:
- uses: actions/checkout@v2

- uses: r-lib/actions/setup-r@v1
with:
r-version: ${{ matrix.config.r }}

- uses: r-lib/actions/setup-pandoc@v1

- name: Query dependencies
run: |
install.packages('remotes')
saveRDS(remotes::dev_package_deps(dependencies = TRUE), ".github/depends.Rds", version = 2)
writeLines(sprintf("R-%i.%i", getRversion()$major, getRversion()$minor), ".github/R-version")
shell: Rscript {0}

- name: Cache R packages
if: runner.os != 'Windows'
uses: actions/cache@v2
with:
path: ${{ env.R_LIBS_USER }}
key: ${{ runner.os }}-${{ hashFiles('.github/R-version') }}-1-${{ hashFiles('.github/depends.Rds') }}
restore-keys: ${{ runner.os }}-${{ hashFiles('.github/R-version') }}-1-

- name: Install system dependencies (Linux)
if: runner.os == 'Linux'
run: |
while read -r cmd
do
eval sudo $cmd
done < <(Rscript -e 'writeLines(remotes::system_requirements("ubuntu", "20.04"))')

- name: Install system dependencies (macOS)
if: runner.os == 'macOS'
run: |
brew update
brew install zeromq coreutils

- name: Set up local key-based SSH
if: runner.os != 'Windows' # GHA does not allow empty passphrase on Windows
run: |
ssh-keygen -t rsa -f ~/.ssh/id_rsa -N "" -q
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh-keyscan -t rsa 127.0.0.1 >> ~/.ssh/known_hosts
echo "Host 127.0.0.1" >> ~/.ssh/config
echo " IdentityFile ~/.ssh/id_rsa" >> ~/.ssh/config
echo "$(hostname) 127.0.0.1" >> ~/.hosts
chmod og-rw ~

- name: Install dependencies
run: |
remotes::install_deps(dependencies = TRUE)
remotes::install_cran("rcmdcheck")
shell: Rscript {0}

- name: Install R package and add paths
if: runner.os != 'Windows'
run: |
R CMD INSTALL .
echo '.libPaths("~/work/_temp/Library")' >> ~/.Rprofile # cmq package in R
echo "$(pwd)/tests/bin" >> $GITHUB_PATH # local cmq
sed -i "1iexport PATH=$(pwd)/tests/bin:\$PATH" ~/.bashrc || true # ssh cmq

- name: Query capabilities
if: runner.os != 'Windows' # does not recognize -e
run: |
set -x
which R
which sbatch || echo "sbatch not found"
ssh 127.0.0.1 'which R; which sbatch; echo $PATH' || true
ssh 127.0.0.1 'R --slave --no-restore -e ".libPaths()"' || true
R --slave --no-restore -e "message(clustermq:::qsys_default)" || true
ssh 127.0.0.1 'R --slave --no-restore -e "message(clustermq:::qsys_default)"' || true

- name: make test
if: runner.os != 'Windows'
run: |
timeout 300 make test

- name: Check
env:
_R_CHECK_CRAN_INCOMING_REMOTE_: false
run: rcmdcheck::rcmdcheck(args = c("--no-manual", "--as-cran"), error_on = "warning", check_dir = "check")
shell: Rscript {0}

- name: Print logs if failure
if: failure() && runner.os != 'Windows'
run: |
set -x
cat ~/*.log || true
cat ~/worker.log || true
cat ~/ssh_proxy.log || true
cat clustermq.Rcheck/tests/* || true

- name: Upload check results
if: failure()
uses: actions/upload-artifact@main
with:
name: ${{ runner.os }}-r${{ matrix.config.r }}-results
path: check
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ src/*.so
*.log
clustermq.Rcheck
src/Makevars
.github/*.html
55 changes: 0 additions & 55 deletions .travis.yml

This file was deleted.

2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ Suggests:
tools
VignetteBuilder: knitr
Roxygen: list(r6 = FALSE)
RoxygenNote: 7.1.0
RoxygenNote: 7.1.1
2 changes: 2 additions & 0 deletions R/Q_rows.r
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Q_rows = function(df, fun, const=list(), export=list(), pkgs=c(), seed=128965,
# check if call args make sense
if (!is.null(memory))
template$memory = memory
if (is.character(memory))
stop("Expected numeric value for memory")
if (!is.null(template$memory) && template$memory < 50)
stop("Worker needs about 23 MB overhead, set memory>=50")
if (is.na(seed) || length(seed) != 1)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ClusterMQ: send R function calls as cluster jobs
================================================

[![CRAN version](http://www.r-pkg.org/badges/version/clustermq)](https://cran.r-project.org/package=clustermq)
[![Build Status](https://travis-ci.org/mschubert/clustermq.svg?branch=master)](https://travis-ci.org/mschubert/clustermq)
[![Build Status](https://github.com/mschubert/clustermq/workflows/R-CMD-check/badge.svg?branch=master)](https://github.com/mschubert/clustermq/actions)
[![CRAN downloads](http://cranlogs.r-pkg.org/badges/clustermq)](http://cran.rstudio.com/web/packages/clustermq/index.html)
[![DOI](https://zenodo.org/badge/DOI/10.1093/bioinformatics/btz284.svg)](https://doi.org/10.1093/bioinformatics/btz284)

Expand Down
4 changes: 2 additions & 2 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ elif [ "$PKGCONFIG_CFLAGS" ] || [ "$PKGCONFIG_LIBS" ]; then
elif [[ "$OSTYPE" == "darwin"* ]]; then
if [ $(command -v brew) ]; then
BREWDIR=$(brew --prefix)
PKG_CFLAGS="-I$BREWDIR/opt/$PKG_BREW_NAME/include"
PKG_LIBS="-L$BREWDIR/opt/$PKG_BREW_NAME/lib $PKG_LIBS"
else
curl -sfL "https://autobrew.github.io/scripts/$PKG_BREW_NAME" > autobrew
source autobrew
fi
PKG_CFLAGS="-I$BREWDIR/opt/$PKG_BREW_NAME/include"
PKG_LIBS="-L$BREWDIR/opt/$PKG_BREW_NAME/lib $PKG_LIBS"
fi

# For debugging
Expand Down
50 changes: 43 additions & 7 deletions src/zeromq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ZeroMQ {

std::string listen(Rcpp::CharacterVector addrs, std::string socket_type="ZMQ_REP",
std::string sid="default") {
auto sock = zmq::socket_t(ctx, str2socket(socket_type));
auto sock = MonitoredSocket(ctx, str2socket(socket_type), sid);
int i;
for (i=0; i<addrs.length(); i++) {
auto addr = Rcpp::as<std::string>(addrs[i]);
Expand All @@ -37,7 +37,7 @@ class ZeroMQ {
Rf_error("Could not bind port after ", i, " tries");
}
void connect(std::string address, std::string socket_type="ZMQ_REQ", std::string sid="default") {
auto sock = zmq::socket_t(ctx, str2socket(socket_type));
auto sock = MonitoredSocket(ctx, str2socket(socket_type), sid);
sock.connect(address);
sockets.emplace(sid, std::move(sock));
}
Expand Down Expand Up @@ -71,11 +71,13 @@ class ZeroMQ {
}
Rcpp::IntegerVector poll(Rcpp::CharacterVector sids, int timeout=-1) {
auto nsock = sids.length();
auto pitems = std::vector<zmq::pollitem_t>(nsock);
auto pitems = std::vector<zmq::pollitem_t>(nsock*2);
for (int i = 0; i < nsock; i++) {
auto socket_id = Rcpp::as<std::string>(sids[i]);
pitems[i].socket = find_socket(socket_id);
MonitoredSocket &sock = find_socket(Rcpp::as<std::string>(sids[i]));
pitems[i].socket = sock;
pitems[i].events = ZMQ_POLLIN; // | ZMQ_POLLOUT; // ssh_proxy XREP/XREQ has 2200
pitems[i+nsock].socket = sock.mon;
pitems[i+nsock].events = ZMQ_POLLIN;
}

int rc = -1;
Expand All @@ -95,6 +97,15 @@ class ZeroMQ {
}
} while(rc < 0);

int n_disc = 0;
for (int i = 0; i < nsock; i++)
if (pitems[i+nsock].revents > 0) {
auto msg = get_monitor_event(Rcpp::as<std::string>(sids[i]));
n_disc++;
}
if (n_disc > 0)
Rf_error((std::to_string(n_disc) + " peer(s) lost").c_str());

auto result = Rcpp::IntegerVector(nsock);
for (int i = 0; i < nsock; i++)
result[i] = pitems[i].revents;
Expand All @@ -103,7 +114,20 @@ class ZeroMQ {

private:
zmq::context_t ctx;
std::unordered_map<std::string, zmq::socket_t> sockets;

class MonitoredSocket : public zmq::socket_t {
public:
MonitoredSocket(zmq::context_t &ctx, int socket_type, std::string sid):
zmq::socket_t(ctx, socket_type), mon(ctx, ZMQ_PAIR) {
auto mon_addr = "inproc://" + sid;
int rc = zmq_socket_monitor(*this, mon_addr.c_str(), ZMQ_EVENT_DISCONNECTED);
if (rc < 0) // C API needs return value check
Rf_error("failed to create socket monitor");
mon.connect(mon_addr);
}
zmq::socket_t mon;
};
std::unordered_map<std::string, MonitoredSocket> sockets;

int str2socket(std::string str) {
if (str == "ZMQ_REP") {
Expand All @@ -120,7 +144,7 @@ class ZeroMQ {
return -1;
}

zmq::socket_t & find_socket(std::string socket_id) {
MonitoredSocket & find_socket(std::string socket_id) {
auto socket_iter = sockets.find(socket_id);
if (socket_iter == sockets.end())
Rf_error("Trying to access non-existing socket: ", socket_id.c_str());
Expand All @@ -137,6 +161,18 @@ class ZeroMQ {
socket.recv(message, flags);
return message;
}

std::string get_monitor_event(std::string sid) {
// receive message to clear, but we know it is disconnect
// expand this if we monitor anything else
zmq::message_t msg1, msg2;
auto & socket = find_socket(sid);
// we expect 2 frames: http://api.zeromq.org/4-1:zmq-socket-monitor
socket.mon.recv(msg1, zmq::recv_flags::dontwait);
socket.mon.recv(msg2, zmq::recv_flags::dontwait);
// do something with the info...
return std::string();
}
};

RCPP_MODULE(zmq) {
Expand Down
Loading