Skip to content

Commit

Permalink
Pull out the c++ write queue (#1198)
Browse files Browse the repository at this point in the history
  • Loading branch information
reconbot authored Jun 20, 2017
1 parent 06543aa commit 4984943
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 206 deletions.
175 changes: 18 additions & 157 deletions src/serialport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,81 +6,6 @@
#include "./read-poller.h"
#endif

struct _WriteQueue {
const int _fd; // the fd that is associated with this write queue
QueuedWrite _write_queue;
uv_mutex_t _write_queue_mutex;
_WriteQueue *_next;

_WriteQueue(const int fd) : _fd(fd), _write_queue(), _next(NULL) {
uv_mutex_init(&_write_queue_mutex);
}

void lock() { uv_mutex_lock(&_write_queue_mutex); }
void unlock() { uv_mutex_unlock(&_write_queue_mutex); }

QueuedWrite &get() { return _write_queue; }
};

static _WriteQueue *write_queues = NULL;

static _WriteQueue *qForFD(const int fd) {
_WriteQueue *q = write_queues;
while (q != NULL) {
if (q->_fd == fd) {
return q;
}
q = q->_next;
}
return NULL;
}

static _WriteQueue *newQForFD(const int fd) {
_WriteQueue *q = qForFD(fd);

if (q == NULL) {
if (write_queues == NULL) {
write_queues = new _WriteQueue(fd);
return write_queues;
} else {
q = write_queues;
while (q->_next != NULL) {
q = q->_next;
}
q->_next = new _WriteQueue(fd);
return q->_next;
}
}

return q;
}

static void deleteQForFD(const int fd) {
if (write_queues == NULL)
return;

_WriteQueue *q = write_queues;
if (write_queues->_fd == fd) {
write_queues = write_queues->_next;
delete q;

return;
}

while (q->_next != NULL) {
if (q->_next->_fd == fd) {
_WriteQueue *out_q = q->_next;
q->_next = q->_next->_next;
delete out_q;

return;
}
q = q->_next;
}

// It wasn't found...
}

v8::Local<v8::Value> getValueFromObject(v8::Local<v8::Object> options, std::string key) {
v8::Local<v8::String> v8str = Nan::New<v8::String>(key).ToLocalChecked();
return Nan::Get(options, v8str).ToLocalChecked();
Expand Down Expand Up @@ -161,9 +86,6 @@ void EIO_AfterOpen(uv_work_t* req) {
} else {
argv[0] = Nan::Null();
argv[1] = Nan::New<v8::Int32>(data->result);

int fd = Nan::To<v8::Int32>(argv[1]).ToLocalChecked()->Value();
newQForFD(fd);
}

data->callback.Call(2, argv);
Expand Down Expand Up @@ -251,46 +173,24 @@ NAN_METHOD(Write) {
return;
}

WriteBaton* data = new WriteBaton();
memset(data, 0, sizeof(WriteBaton));
data->fd = fd;
data->buffer.Reset(buffer);
data->bufferData = bufferData;
data->bufferLength = bufferLength;
data->offset = 0;
data->callback.Reset(info[2].As<v8::Function>());

QueuedWrite* queuedWrite = new QueuedWrite();
memset(queuedWrite, 0, sizeof(QueuedWrite));
queuedWrite->baton = data;
queuedWrite->req.data = queuedWrite;

_WriteQueue *q = qForFD(fd);
if (!q) {
v8::Local<v8::Value> argv[1];
argv[0] = v8::Exception::Error(Nan::New<v8::String>("There's no write queue for file descriptor").ToLocalChecked());
data->callback.Call(1, argv);
delete data;
return;
}

q->lock();
QueuedWrite &write_queue = q->get();
bool empty = write_queue.empty();
WriteBaton* baton = new WriteBaton();
memset(baton, 0, sizeof(WriteBaton));
baton->fd = fd;
baton->buffer.Reset(buffer);
baton->bufferData = bufferData;
baton->bufferLength = bufferLength;
baton->offset = 0;
baton->callback.Reset(info[2].As<v8::Function>());

write_queue.insert_tail(queuedWrite);
uv_work_t* req = new uv_work_t();
req->data = baton;

if (empty) {
uv_queue_work(uv_default_loop(), &queuedWrite->req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite);
}
q->unlock();
uv_queue_work(uv_default_loop(), req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite);
}

void EIO_AfterWrite(uv_work_t* req) {
Nan::HandleScope scope;

QueuedWrite* queuedWrite = static_cast<QueuedWrite*>(req->data);
WriteBaton* data = static_cast<WriteBaton*>(queuedWrite->baton);
WriteBaton* data = static_cast<WriteBaton*>(req->data);

v8::Local<v8::Value> argv[1];
if (data->errorString[0]) {
Expand All @@ -299,41 +199,16 @@ void EIO_AfterWrite(uv_work_t* req) {
argv[0] = Nan::Null();
}

if (data->offset < data->bufferLength && !data->errorString[0]) {
// We're not done with this baton, so throw it right back onto the queue.
// Don't re-push the write in the event loop if there was an error; because same error could occur again!
// TODO: Add a uv_poll here for unix...
// If there's no error and there's still data to write, lets keep trying
// TODO: Add a uv_poll here for unix
if (!data->errorString[0] && data->offset < data->bufferLength) {
uv_queue_work(uv_default_loop(), req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite);
return;
}

// throwing errors instead of returning them at this point is rude
int fd = data->fd;
_WriteQueue *q = qForFD(fd);
if (!q) {
Nan::ThrowTypeError("There's no write queue for that file descriptor (after write)");
return;
}

q->lock();
QueuedWrite &write_queue = q->get();

// remove this one from the list
queuedWrite->remove();

data->callback.Call(1, argv);

// If there are any left, start a new thread to write the next one.
if (!write_queue.empty()) {
// Always pull the next work item from the head of the queue
QueuedWrite* nextQueuedWrite = write_queue.next;
uv_queue_work(uv_default_loop(), &nextQueuedWrite->req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite);
}
q->unlock();

data->buffer.Reset();
delete data;
delete queuedWrite;
delete req;
}

#ifdef WIN32
Expand All @@ -357,7 +232,7 @@ NAN_METHOD(Read) {
return;
}
int offset = Nan::To<v8::Int32>(info[2]).ToLocalChecked()->Value();

// bytes to read
if (!info[3]->IsInt32()) {
Nan::ThrowTypeError("Fourth argument must be an int");
Expand Down Expand Up @@ -390,7 +265,7 @@ void EIO_AfterRead(uv_work_t* req) {
ReadBaton* data = static_cast<ReadBaton*>(req->data);

v8::Local<v8::Value> argv[2];

if (data->errorString[0]) {
argv[0] = Nan::Error(data->errorString);
argv[1] = Nan::Undefined();
Expand Down Expand Up @@ -438,20 +313,6 @@ void EIO_AfterClose(uv_work_t* req) {
argv[0] = v8::Exception::Error(Nan::New<v8::String>(data->errorString).ToLocalChecked());
} else {
argv[0] = Nan::Null();

// We don't have an error, so clean up the write queue for that fd
_WriteQueue *q = qForFD(data->fd);
if (q) {
q->lock();
QueuedWrite &write_queue = q->get();
while (!write_queue.empty()) {
QueuedWrite *del_q = write_queue.next;
del_q->baton->buffer.Reset();
del_q->remove();
}
q->unlock();
deleteQForFD(data->fd);
}
}
data->callback.Call(1, argv);

Expand Down
37 changes: 0 additions & 37 deletions src/serialport.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,43 +110,6 @@ struct WriteBaton {
char errorString[ERROR_STRING_SIZE];
};

struct QueuedWrite {
uv_work_t req;
QueuedWrite *prev;
QueuedWrite *next;
WriteBaton* baton;

QueuedWrite() {
prev = this;
next = this;

baton = 0;
}

~QueuedWrite() {
remove();
}

void remove() {
prev->next = next;
next->prev = prev;

next = this;
prev = this;
}

void insert_tail(QueuedWrite *qw) {
qw->next = this;
qw->prev = this->prev;
qw->prev->next = qw;
this->prev = qw;
}

bool empty() {
return next == this;
}
};

#ifdef WIN32
struct ReadBaton {
int fd;
Expand Down
12 changes: 2 additions & 10 deletions src/serialport_unix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,7 @@ int setup(int fd, OpenBaton *data) {
}

void EIO_Write(uv_work_t* req) {
QueuedWrite* queuedWrite = static_cast<QueuedWrite*>(req->data);
WriteBaton* data = static_cast<WriteBaton*>(queuedWrite->baton);
WriteBaton* data = static_cast<WriteBaton*>(req->data);
int bytesWritten = 0;

do {
Expand All @@ -336,19 +335,12 @@ void EIO_Write(uv_work_t* req) {
continue;
}

// The write call was interrupted before anything was written, try again immediately.
if (errno == EINTR) {
// why try again right away instead of in another event loop?
continue;
}

// Try again in another event loop
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
return;
}

// EBAD would mean we're "disconnected"

// a real error so lets bail
snprintf(data->errorString, sizeof(data->errorString), "Error: %s, calling write", strerror(errno));
return;
Expand Down
3 changes: 1 addition & 2 deletions src/serialport_win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ bool IsClosingHandle(int fd) {
}

void EIO_Write(uv_work_t* req) {
QueuedWrite* queuedWrite = static_cast<QueuedWrite*>(req->data);
WriteBaton* data = static_cast<WriteBaton*>(queuedWrite->baton);
WriteBaton* data = static_cast<WriteBaton*>(req->data);
data->result = 0;

do {
Expand Down

0 comments on commit 4984943

Please sign in to comment.