Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run tcp test in branch #9

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ script:
- cd build
- cmake ..
- make
- export ARGS="-VV"
- make test
after_success:
- codecov
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ExternalProject_Add(nanomsg
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/nanomsg
GIT_REPOSITORY https://github.com/nanomsg/nanomsg.git
GIT_TAG "master"
PATCH_COMMAND patch -p1 < ${PATCHES_DIR}/btcp.patch
CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR}
)
add_library(libnanomsg STATIC SHARED IMPORTED)
Expand Down
13 changes: 13 additions & 0 deletions patches/btcp.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
diff --git a/src/transports/tcp/btcp.c b/src/transports/tcp/btcp.c
index 4fe8b58..b87f0a1 100644
--- a/src/transports/tcp/btcp.c
+++ b/src/transports/tcp/btcp.c
@@ -154,7 +154,7 @@ int nn_btcp_create (struct nn_ep *ep)

rc = nn_btcp_listen (self);
if (rc != 0) {
- // I suspect we might need to do nn_free here.
+ nn_free (self);
return rc;
}

115 changes: 67 additions & 48 deletions src/libparodus.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ typedef struct {
int reconnect_count;
libpd_cfg_t cfg;
bool connect_on_every_send; // always false, currently
int rcv_sock;
int stop_rcv_sock;
int send_sock;
nn_sock_t rcv_sock;
nn_sock_t stop_rcv_sock;
nn_sock_t send_sock;
char wrp_queue_name[QNAME_SIZE];
libpd_mq_t wrp_queue;
pthread_t wrp_receiver_tid;
Expand Down Expand Up @@ -118,6 +118,12 @@ static void getParodusUrl(__instance_t *inst)
libpd_log (LEVEL_INFO, ("LIBPARODUS: client url is %s\n", inst->client_url));
}

static void init_nn_sock (nn_sock_t *sock)
{
sock->sock = -1;
sock->eid = -1;
}

static __instance_t *make_new_instance (libpd_cfg_t *cfg)
{
size_t qname_len;
Expand All @@ -128,6 +134,9 @@ static __instance_t *make_new_instance (libpd_cfg_t *cfg)
if (qname_len >= QNAME_SIZE)
return NULL;
memset ((void*) inst, 0, sizeof(__instance_t));
init_nn_sock (&inst->rcv_sock);
init_nn_sock (&inst->stop_rcv_sock);
init_nn_sock (&inst->send_sock);
pthread_mutex_init (&inst->send_mutex, NULL);
//inst->cfg = *cfg;
memcpy (&inst->cfg, cfg, sizeof(libpd_cfg_t));
Expand Down Expand Up @@ -160,13 +169,24 @@ bool is_auth_received (libpd_instance_t instance)
nn_shutdown ((sock), 0); \
(sock) = 0;

void shutdown_socket (int *sock)
void shutdown_socket (nn_sock_t *sock)
{
if (*sock >= 0) {
nn_shutdown (*sock, 0);
nn_close (*sock);
int rtn;
if (sock->sock < 0)
return;
if (sock->eid >= 0) {
rtn = nn_shutdown (sock->sock, sock->eid);
if (rtn != 0) {
libpd_log_err (LEVEL_DEBUG, errno, ("socket shutdown error\n"));
}
sock->eid = -1;
}
*sock = -1;
rtn = nn_close (sock->sock);
if (rtn != 0) {
libpd_log_err (LEVEL_DEBUG, errno, ("Unable to close socket\n"));
}

sock->sock = -1;
}

typedef enum {
Expand Down Expand Up @@ -195,38 +215,40 @@ typedef enum {
/**
* Open receive socket and bind to it.
*/
int connect_receiver (const char *rcv_url, int keepalive_timeout_secs, int *exterr)
int connect_receiver (const char *rcv_url, int keepalive_timeout_secs,
nn_sock_t *sock, int *exterr)
{
int rcv_timeout;
int sock;

init_nn_sock (sock);
*exterr = 0;
if (NULL == rcv_url) {
return CONN_RCV_ERR_NULL_URL;
}
sock = nn_socket (AF_SP, NN_PULL);
if (sock < 0) {
sock->sock = nn_socket (AF_SP, NN_PULL);
if (sock->sock < 0) {
*exterr = errno;
libpd_log_err (LEVEL_ERROR, errno, ("Unable to create rcv socket %s\n", rcv_url));
return CONN_RCV_ERR_CREATE;
}
if (keepalive_timeout_secs > 0) {
rcv_timeout = keepalive_timeout_secs * 1000;
if (nn_setsockopt (sock, NN_SOL_SOCKET, NN_RCVTIMEO,
if (nn_setsockopt (sock->sock, NN_SOL_SOCKET, NN_RCVTIMEO,
&rcv_timeout, sizeof (rcv_timeout)) < 0) {
*exterr = errno;
libpd_log_err (LEVEL_ERROR, errno, ("Unable to set socket timeout: %s\n", rcv_url));
shutdown_socket (&sock);
shutdown_socket (sock);
return CONN_RCV_ERR_SETOPT;
}
}
if (nn_bind (sock, rcv_url) < 0) {
sock->eid = nn_bind (sock->sock, rcv_url);
if (sock->eid < 0) {
*exterr = errno;
libpd_log_err (LEVEL_ERROR, errno, ("Unable to bind to receive socket %s\n", rcv_url));
shutdown_socket (&sock);
shutdown_socket (sock);
return CONN_RCV_ERR_BIND;
}
return sock;
return 0;
}

typedef enum {
Expand Down Expand Up @@ -255,36 +277,37 @@ typedef enum {
/**
* Open send socket and connect to it.
*/
int connect_sender (const char *send_url, int *exterr)
int connect_sender (const char *send_url, nn_sock_t *sock, int *exterr)
{
int sock;
int send_timeout = SOCK_SEND_TIMEOUT_MS;

init_nn_sock (sock);
*exterr = 0;
if (NULL == send_url) {
return CONN_SEND_ERR_NULL;
}
sock = nn_socket (AF_SP, NN_PUSH);
if (sock < 0) {
sock->sock = nn_socket (AF_SP, NN_PUSH);
if (sock->sock < 0) {
*exterr = errno;
libpd_log_err (LEVEL_ERROR, errno, ("Unable to create send socket: %s\n", send_url));
return CONN_SEND_ERR_CREATE;
}
if (nn_setsockopt (sock, NN_SOL_SOCKET, NN_SNDTIMEO,
if (nn_setsockopt (sock->sock, NN_SOL_SOCKET, NN_SNDTIMEO,
&send_timeout, sizeof (send_timeout)) < 0) {
*exterr = errno;
libpd_log_err (LEVEL_ERROR, errno, ("Unable to set socket timeout: %s\n", send_url));
shutdown_socket (&sock);
shutdown_socket (sock);
return CONN_SEND_ERR_SETOPT;
}
if (nn_connect (sock, send_url) < 0) {
sock->eid = nn_connect (sock->sock, send_url);
if (sock->eid < 0) {
*exterr = errno;
libpd_log_err (LEVEL_ERROR, errno, ("Unable to connect to send socket %s\n",
send_url));
shutdown_socket (&sock);
shutdown_socket (sock);
return CONN_SEND_ERR_CONN;
}
return sock;
return 0;
}

static int create_thread (pthread_t *tid, void *(*thread_func) (void*),
Expand Down Expand Up @@ -414,32 +437,30 @@ int libparodus_init (libpd_instance_t *instance, libpd_cfg_t *libpd_cfg)
show_options (libpd_cfg);
if (inst->cfg.receive) {
libpd_log (LEVEL_INFO, ("LIBPARODUS: connecting receiver to %s\n", inst->client_url));
err = connect_receiver (inst->client_url, inst->cfg.keepalive_timeout_secs, &oserr);
if (err < 0) {
err = connect_receiver (inst->client_url,
inst->cfg.keepalive_timeout_secs, &inst->rcv_sock, &oserr);
if (err != 0) {
SETERR(oserr, LIBPD_ERR_INIT_RCV + err);
return CONNECT_ERR (oserr);
}
inst->rcv_sock = err;
}
if (!inst->connect_on_every_send) {
libpd_log (LEVEL_INFO, ("LIBPARODUS: connecting sender to %s\n", inst->parodus_url));
err = connect_sender (inst->parodus_url, &oserr);
if (err < 0) {
err = connect_sender (inst->parodus_url, &inst->send_sock, &oserr);
if (err != 0) {
abort_init (inst, "r");
SETERR (oserr, LIBPD_ERR_INIT_SEND + err);
return CONNECT_ERR (oserr);
}
inst->send_sock = err;
}
if (inst->cfg.receive) {
// We use the stop_rcv_sock to send a stop msg to our own receive socket.
err = connect_sender (inst->client_url, &oserr);
if (err < 0) {
err = connect_sender (inst->client_url, &inst->stop_rcv_sock, &oserr);
if (err != 0) {
abort_init (inst, "rs");
SETERR (oserr, LIBPD_ERR_INIT_TERMSOCK + err);
return CONNECT_ERR (oserr);
}
inst->stop_rcv_sock = err;
libpd_log (LEVEL_INFO, ("LIBPARODUS: Opened sockets\n"));
err = libpd_qcreate (&inst->wrp_queue, inst->wrp_queue_name, WRP_QUEUE_SIZE, &oserr);
if (err != 0) {
Expand Down Expand Up @@ -491,13 +512,13 @@ int libparodus_init (libpd_instance_t *instance, libpd_cfg_t *libpd_cfg)
}

// When msg_len is given as -1, then msg is a null terminated string
static int sock_send (int sock, const char *msg, int msg_len, int *exterr)
static int sock_send (nn_sock_t *sock, const char *msg, int msg_len, int *exterr)
{
int bytes;
*exterr = 0;
if (msg_len < 0)
msg_len = strlen (msg) + 1; // include terminating null
bytes = nn_send (sock, msg, msg_len, 0);
bytes = nn_send (sock->sock, msg, msg_len, 0);
if (bytes < 0) {
*exterr = errno;
libpd_log_err (LEVEL_ERROR, errno, ("Error sending msg\n"));
Expand All @@ -511,10 +532,10 @@ static int sock_send (int sock, const char *msg, int msg_len, int *exterr)
}

// returns 0 OK, 1 timedout, -1 error
static int sock_receive (int rcv_sock, raw_msg_t *msg, int *exterr)
static int sock_receive (nn_sock_t *rcv_sock, raw_msg_t *msg, int *exterr)
{
char *buf = NULL;
msg->len = nn_recv (rcv_sock, &buf, NN_MSG, 0);
msg->len = nn_recv (rcv_sock->sock, &buf, NN_MSG, 0);

*exterr = 0;
if (msg->len < 0) {
Expand All @@ -538,7 +559,7 @@ static void libparodus_shutdown__ (__instance_t *inst)
inst->run_state = RUN_STATE_DONE;
libpd_log (LEVEL_INFO, ("LIBPARODUS: Shutting Down\n"));
if (inst->cfg.receive) {
sock_send (inst->stop_rcv_sock, end_msg, -1, &exterr);
sock_send (&inst->stop_rcv_sock, end_msg, -1, &exterr);
rtn = pthread_join (inst->wrp_receiver_tid, NULL);
if (rtn != 0) {
libpd_log_err (LEVEL_ERROR, rtn, ("Error terminating wrp receiver thread\n"));
Expand Down Expand Up @@ -743,17 +764,16 @@ static int wrp_sock_send (__instance_t *inst, wrp_msg_t *msg)
SST (sst_start_total_timing (&sst_times);)

if (inst->connect_on_every_send) {
rtn = connect_sender (inst->parodus_url, &inst->exterr);
if (rtn < 0) {
rtn = connect_sender (inst->parodus_url, &inst->send_sock, &inst->exterr);
if (rtn != 0) {
free (msg_bytes);
pthread_mutex_unlock (&inst->send_mutex);
return -0x1200 + rtn;
}
inst->send_sock = rtn;
}

SST (sst_start_send_timing (&sst_times);)
rtn = sock_send (inst->send_sock, (const char *)msg_bytes, msg_len, &inst->exterr);
rtn = sock_send (&inst->send_sock, (const char *)msg_bytes, msg_len, &inst->exterr);
SST (sst_update_send_time (&sst_times);)

if (inst->connect_on_every_send) {
Expand Down Expand Up @@ -834,9 +854,8 @@ static void wrp_receiver_reconnect (__instance_t *inst)
}
sleep (retry_delay);
libpd_log (LEVEL_DEBUG, ("Retrying receiver connection\n"));
inst->rcv_sock = connect_receiver
(inst->client_url, inst->cfg.keepalive_timeout_secs, &exterr);
if (inst->rcv_sock < 0)
if (connect_receiver (inst->client_url,
inst->cfg.keepalive_timeout_secs, &inst->rcv_sock, &exterr) != 0)
continue;
if (send_registration_msg (inst) != 0)
continue;
Expand All @@ -859,7 +878,7 @@ static void *wrp_receiver_thread (void *arg)

libpd_log (LEVEL_INFO, ("LIBPARODUS: Starting wrp receiver thread\n"));
while (1) {
rtn = sock_receive (inst->rcv_sock, &raw_msg, &exterr);
rtn = sock_receive (&inst->rcv_sock, &raw_msg, &exterr);
if (rtn != 0) {
if (rtn == 1) { // timed out
wrp_receiver_reconnect (inst);
Expand Down
2 changes: 1 addition & 1 deletion src/libparodus_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

// if TEST_ENVIRONMENT is not defined, then the macros libpd_log and libpd_log_err
// generate nothing
//#define TEST_ENVIRONMENT 1
#define TEST_ENVIRONMENT 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what we want or a mistake?


#ifndef TEST_ENVIRONMENT
#define libpd_log(level,msg)
Expand Down
6 changes: 6 additions & 0 deletions src/libparodus_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

#include "libparodus.h"

typedef struct {
int sock;
int eid; // nanomsg endpoint ID
} nn_sock_t;


/**
* @brief libparodus internal error rtn codes
*
Expand Down
Loading