Skip to content

Commit

Permalink
independant busy states for TCP send/recv
Browse files Browse the repository at this point in the history
  • Loading branch information
jadeblaquiere committed Oct 23, 2018
1 parent e90371b commit 4fb1f86
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 8 deletions.
18 changes: 10 additions & 8 deletions tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ struct dill_tcp_conn {
struct dill_bsock_vfs bvfs;
int fd;
struct dill_fd_rxbuf rxbuf;
unsigned int busy : 1;
unsigned int rbusy : 1;
unsigned int sbusy : 1;
unsigned int indone : 1;
unsigned int outdone: 1;
unsigned int inerr : 1;
Expand All @@ -77,7 +78,8 @@ static int dill_tcp_makeconn(int fd, void *mem) {
self->bvfs.brecvl = dill_tcp_brecvl;
self->fd = fd;
dill_fd_initrxbuf(&self->rxbuf);
self->busy = 0;
self->rbusy = 0;
self->sbusy = 0;
self->indone = 0;
self->outdone = 0;
self->inerr = 0;
Expand Down Expand Up @@ -168,12 +170,12 @@ int dill_tcp_connect(const struct dill_ipaddr *addr, int64_t deadline) {
static int dill_tcp_bsendl(struct dill_bsock_vfs *bvfs,
struct dill_iolist *first, struct dill_iolist *last, int64_t deadline) {
struct dill_tcp_conn *self = dill_cont(bvfs, struct dill_tcp_conn, bvfs);
if(dill_slow(self->busy)) {errno = EBUSY; return -1;}
if(dill_slow(self->sbusy)) {errno = EBUSY; return -1;}
if(dill_slow(self->outdone)) {errno = EPIPE; return -1;}
if(dill_slow(self->outerr)) {errno = ECONNRESET; return -1;}
self->busy = 1;
self->sbusy = 1;
ssize_t sz = dill_fd_send(self->fd, first, last, deadline);
self->busy = 0;
self->sbusy = 0;
if(dill_fast(sz >= 0)) return sz;
self->outerr = 1;
return -1;
Expand All @@ -182,12 +184,12 @@ static int dill_tcp_bsendl(struct dill_bsock_vfs *bvfs,
static int dill_tcp_brecvl(struct dill_bsock_vfs *bvfs,
struct dill_iolist *first, struct dill_iolist *last, int64_t deadline) {
struct dill_tcp_conn *self = dill_cont(bvfs, struct dill_tcp_conn, bvfs);
if(dill_slow(self->busy)) {errno = EBUSY; return -1;}
if(dill_slow(self->rbusy)) {errno = EBUSY; return -1;}
if(dill_slow(self->indone)) {errno = EPIPE; return -1;}
if(dill_slow(self->inerr)) {errno = ECONNRESET; return -1;}
self->busy = 1;
self->rbusy = 1;
int rc = dill_fd_recv(self->fd, &self->rxbuf, first, last, deadline);
self->busy = 0;
self->rbusy = 0;
if(dill_fast(rc == 0)) return 0;
if(errno == EPIPE) self->indone = 1;
else self->inerr = 1;
Expand Down
144 changes: 144 additions & 0 deletions tests/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,147 @@ coroutine void client4(int port) {
errno_assert(rc == -1 && errno == ETIMEDOUT);
}

coroutine void tcp_forward(int s1, int s2, int ch) {
uint8_t c;
while (1) {
int err = brecv(s1, &c, 1, -1);
if(err) break;
err = bsend(s2, &c, 1, -1);
if(err) break;
}
// send something to channel on err - close signal
chsend(ch, &c, 1, -1);
return;
}

coroutine void tcp_proxy(int s1, int s2) {
int u_ch[2];
int d_ch[2];
int r;

int err = chmake(u_ch);
if(err) return;
err = chmake(d_ch);
if(err) return;
int up = go(tcp_forward(s1, s2, u_ch[1]));
if(up < 0) return;
int dn = go(tcp_forward(s2, s1, d_ch[1]));
if(dn < 0) {hclose(up); return;}
struct chclause cc[] = {{CHRECV, u_ch[0], &r, sizeof(int)},
{CHRECV, d_ch[0], &r, sizeof(int)}};
int i=choose(cc, 2, -1);
assert(errno == 0);
if(i == 0) {
err = bundle_wait(up, -1);
assert(!err);
hclose(dn);
err = bundle_wait(dn, -1);
} else {
err = bundle_wait(dn, -1);
assert(!err);
hclose(up);
err = bundle_wait(up, -1);
}
return;
}

coroutine void tcp_echo(int s) {
while (1) {
uint8_t c;
int err = brecv(s, &c, 1, -1);
if(err) break;
err = bsend(s, &c, 1, -1);
if(err) break;
}
return;
}

coroutine void middle_segment(void) {
struct ipaddr laddr, raddr;
int rc = ipaddr_local(&laddr, NULL, 5678, 0);
errno_assert(rc == 0);
rc = ipaddr_local(&raddr, NULL, 8765, 0);
errno_assert(rc == 0);

int rl = tcp_listen(&raddr, 10);
errno_assert(rl >- 0);
int ll = tcp_listen(&laddr, 10);
errno_assert(ll >- 0);

int ls = tcp_accept(ll, NULL, -1);
errno_assert(ls >= 0);
int rs = tcp_accept(rl, NULL, -1);
errno_assert(rs >= 0);

tcp_proxy(ls, rs);
tcp_close(ls, -1);
tcp_close(rs, -1);
return;
}

coroutine void end_segment(void) {
struct ipaddr addr;
int rc = ipaddr_local(&addr, NULL, 8765, 0);
errno_assert(rc == 0);

int ls = tcp_connect(&addr, -1);
errno_assert(ls >= 0);

tcp_echo(ls);

return;
}

coroutine void middle_and_end_segments(void) {
int b = bundle();
int err = bundle_go(b, middle_segment());
errno_assert(!err);
msleep(now()+250);
err = bundle_go(b, end_segment());
errno_assert(!err);
err = bundle_wait(b, -1);
return;
}

coroutine void send10k(int s) {
for (int i = 0; i < 10000; i++) {
int err = bsend(s, &i, sizeof(i), -1);
errno_assert(!err);
}
return;
}

coroutine void recv10k(int s) {
for (int i = 0; i < 10000; i++) {
int err = bsend(s, &i, sizeof(i), -1);
errno_assert(!err);
}
return;
}

static void tcp_concurrency_test(void) {
int b1 = bundle();
errno_assert(b1 >= 0);
int err = bundle_go(b1, middle_and_end_segments());
errno_assert(!err);
msleep(now() + 500);

struct ipaddr addr;
int rc = ipaddr_local(&addr, NULL, 5678, 0);
errno_assert(rc == 0);
int s = tcp_connect(&addr, -1);
errno_assert(s >= 0);
int b2 = bundle();
errno_assert(b2 >= 0);
err = bundle_go(b2, recv10k(s));
errno_assert(!err);
err = bundle_go(b2, send10k(s));
errno_assert(!err);
err = bundle_wait(b2, -1);
errno_assert(!err);
hclose(b1);
}

static void move_lots_of_data(size_t nbytes, size_t buffer_size);
static void test_fromfd();

Expand Down Expand Up @@ -221,6 +362,9 @@ int main(void) {
move_lots_of_data(5000, 3000);

test_fromfd();

/* test concurrency in TCP */
tcp_concurrency_test();

return 0;
}
Expand Down

0 comments on commit 4fb1f86

Please sign in to comment.