Skip to content

Commit

Permalink
Revert "fix(windows): Asynchronous callbacks for reading and writing …
Browse files Browse the repository at this point in the history
…on Windows" (#1323)

This reverts commit 2c2a8b6.
  • Loading branch information
reconbot authored Sep 2, 2017
1 parent 2c2a8b6 commit c7a3be4
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 163 deletions.
262 changes: 109 additions & 153 deletions src/serialport_win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ typedef BOOL (WINAPI *CancelIoExType)(HANDLE hFile, LPOVERLAPPED lpOverlapped);

std::list<int> g_closingHandles;


void ErrorCodeToString(const char* prefix, int errorCode, char *errorStr) {
switch (errorCode) {
case ERROR_FILE_NOT_FOUND:
Expand Down Expand Up @@ -153,10 +152,11 @@ void EIO_Open(uv_work_t* req) {
return;
}

// Set the timeouts for read and write operations.
// Read operation will wait for at least 1 byte to be received.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = 0; // Never timeout, always wait for data.
// Set the timeouts for read and write operations read operation is to return
// immediately with the bytes that have already been received, even if no bytes
// have been received.
COMMTIMEOUTS commTimeouts = {0};
commTimeouts.ReadIntervalTimeout = MAXDWORD; // Never timeout
commTimeouts.ReadTotalTimeoutMultiplier = 0; // Do not allow big read timeout when big read buffer used
commTimeouts.ReadTotalTimeoutConstant = 0; // Total read timeout (period of read loop)
commTimeouts.WriteTotalTimeoutConstant = 0; // Const part of write timeout
Expand Down Expand Up @@ -291,63 +291,58 @@ NAN_METHOD(Write) {
baton->bufferLength = bufferLength;
baton->offset = 0;
baton->callback.Reset(info[2].As<v8::Function>());
baton->complete = false;
// WriteFileEx requires a thread that can block. Create a new thread to
// run the write operation, saving the handle so it can be deallocated later.
baton->hThread = CreateThread(NULL, 0, WriteThread, baton, 0, NULL);
}

void __stdcall WriteIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
WriteBaton* baton = static_cast<WriteBaton*>(ov->hEvent);
DWORD bytesWritten;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesWritten, TRUE)) {
errorCode = GetLastError();
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", errorCode, baton->errorString);
baton->complete = true;
return;
}
if (bytesWritten) {
baton->offset += bytesWritten;
if (baton->offset >= baton->bufferLength) {
baton->complete = true;
}
}
uv_work_t* req = new uv_work_t();
req->data = baton;

uv_queue_work(uv_default_loop(), req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite);
}

DWORD WriteThread(void* param) {
WriteBaton* baton = static_cast<WriteBaton*>(param);

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(ov));
ov->hEvent = static_cast<void*>(baton);

while (!baton->complete) {
char* offsetPtr = baton->bufferData + baton->offset;
// WriteFileEx requires calling GetLastError even upon success. Clear the error beforehand.
SetLastError(0);
WriteFileEx((HANDLE)baton->fd, offsetPtr, static_cast<DWORD>(baton->bufferLength), ov, WriteIOCompletion);
// Error codes when call is successful, such as ERROR_MORE_DATA.
DWORD lastError = GetLastError();
if (lastError != ERROR_SUCCESS) {
ErrorCodeToString("Writing to COM port (WriteFileEx)", lastError, baton->errorString);
break;
void EIO_Write(uv_work_t* req) {
WriteBaton* data = static_cast<WriteBaton*>(req->data);
data->result = 0;

do {
OVERLAPPED ov = {0};
// Event used by GetOverlappedResult(..., TRUE) to wait for outgoing data or timeout
// Event MUST be used if program has several simultaneous asynchronous operations
// on the same handle (i.e. ReadFile and WriteFile)
ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

// Start write operation - synchronous or asynchronous
DWORD bytesWritten = 0;
if (!WriteFile((HANDLE)data->fd, data->bufferData, static_cast<DWORD>(data->bufferLength), &bytesWritten, &ov)) {
DWORD lastError = GetLastError();
if (lastError != ERROR_IO_PENDING) {
// Write operation error
ErrorCodeToString("Writing to COM port (WriteFile)", lastError, data->errorString);
CloseHandle(ov.hEvent);
return;
}
// Write operation is completing asynchronously
// We MUST wait for the operation completion before deallocation of OVERLAPPED struct
// or write data buffer

// block for async write operation completion
bytesWritten = 0;
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesWritten, TRUE)) {
// Write operation error
DWORD lastError = GetLastError();
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", lastError, data->errorString);
CloseHandle(ov.hEvent);
return;
}
}
// IOCompletion routine is only called once this thread is in an alertable wait state.
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterWrite);
async->data = baton;
uv_async_send(async);
ExitThread(0);
// Write operation completed synchronously
data->result = bytesWritten;
data->offset += data->result;
CloseHandle(ov.hEvent);
} while (data->bufferLength > data->offset);
}

void EIO_AfterWrite(uv_async_t* req) {
void EIO_AfterWrite(uv_work_t* req) {
Nan::HandleScope scope;
WriteBaton* baton = static_cast<WriteBaton*>(req->data);
WaitForSingleObject(baton->hThread, INFINITE);
delete req;

v8::Local<v8::Value> argv[1];
Expand Down Expand Up @@ -409,120 +404,81 @@ NAN_METHOD(Read) {
baton->bufferLength = bufferLength;
baton->bufferData = node::Buffer::Data(buffer);
baton->callback.Reset(info[4].As<v8::Function>());
baton->complete = false;
// ReadFileEx requires a thread that can block. Create a new thread to
// run the read operation, saving the handle so it can be deallocated later.
baton->hThread = CreateThread(NULL, 0, ReadThread, baton, 0, NULL);
}

void __stdcall ReadIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
ReadBaton* baton = static_cast<ReadBaton*>(ov->hEvent);

if (errorCode) {
ErrorCodeToString("Reading from COM port (ReadIOCompletion)", errorCode, baton->errorString);
baton->complete = true;
return;
}
uv_work_t* req = new uv_work_t();
req->data = baton;
uv_queue_work(uv_default_loop(), req, EIO_Read, (uv_after_work_cb)EIO_AfterRead);
}

DWORD lastError;
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
lastError = GetLastError();
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
baton->complete = true;
return;
}
if (bytesTransferred) {
baton->bytesRead += bytesTransferred;
baton->offset += bytesTransferred;
}
void EIO_Read(uv_work_t* req) {
ReadBaton* data = static_cast<ReadBaton*>(req->data);
data->bytesRead = 0;
int errorCode = ERROR_SUCCESS;

// ReadFileEx and GetOverlappedResult retrieved only 1 byte. Read any additional data in the input
// buffer. Set the timeout to MAXDWORD in order to disable timeouts, so the read operation will
// return immediately no matter how much data is available.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = MAXDWORD;
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
lastError = GetLastError();
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
baton->complete = true;
return;
}
char* offsetPtr = data->bufferData;
offsetPtr += data->offset;

// Store additional data after whatever data has already been read.
char* offsetPtr = baton->bufferData + baton->offset;
// Event used by GetOverlappedResult(..., TRUE) to wait for incoming data or timeout
// Event MUST be used if program has several simultaneous asynchronous operations
// on the same handle (i.e. ReadFile and WriteFile)
HANDLE hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

// ReadFile, unlike ReadFileEx, needs an event in the overlapped structure.
ov->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!ReadFile((HANDLE)baton->fd, offsetPtr, baton->bufferLength - baton->bytesRead, &bytesTransferred, ov)) {
errorCode = GetLastError();
while (true) {
OVERLAPPED ov = {0};
ov.hEvent = hEvent;

// Start read operation - synchrounous or asynchronous
DWORD bytesReadSync = 0;
if (!ReadFile((HANDLE)data->fd, offsetPtr, data->bytesToRead, &bytesReadSync, &ov)) {
errorCode = GetLastError();
if (errorCode != ERROR_IO_PENDING) {
// Read operation error
if (errorCode == ERROR_OPERATION_ABORTED) {
} else {
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, data->errorString);
CloseHandle(hEvent);
return;
}
break;
}

if (errorCode != ERROR_IO_PENDING) {
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, baton->errorString);
baton->complete = true;
CloseHandle(ov->hEvent);
return;
// Read operation is asynchronous and is pending
// We MUST wait for operation completion before deallocation of OVERLAPPED struct
// or read data buffer

// Wait for async read operation completion or timeout
DWORD bytesReadAsync = 0;
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesReadAsync, TRUE)) {
// Read operation error
errorCode = GetLastError();
if (errorCode == ERROR_OPERATION_ABORTED) {
} else {
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", errorCode, data->errorString);
CloseHandle(hEvent);
return;
}
break;
} else {
// Read operation completed asynchronously
data->bytesRead = bytesReadAsync;
}
} else {
// Read operation completed synchronously
data->bytesRead = bytesReadSync;
}

if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
lastError = GetLastError();
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
baton->complete = true;
CloseHandle(ov->hEvent);
return;
// Return data received if any
if (data->bytesRead > 0) {
break;
}
}
CloseHandle(ov->hEvent);

baton->bytesRead += bytesTransferred;
baton->complete = true;
}

DWORD __stdcall ReadThread(void* param) {
ReadBaton* baton = static_cast<ReadBaton*>(param);
DWORD lastError;

OVERLAPPED* ov = new OVERLAPPED;
memset(ov, 0, sizeof(OVERLAPPED));
ov->hEvent = static_cast<void*>(baton);

while (!baton->complete) {
// Reset the read timeout to 0, so that it will block until more data arrives.
COMMTIMEOUTS commTimeouts = {};
commTimeouts.ReadIntervalTimeout = 0;
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
lastError = GetLastError();
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
break;
}
// ReadFileEx doesn't use overlapped's hEvent, so it is reserved for user data.
ov->hEvent = static_cast<HANDLE>(baton);
char* offsetPtr = baton->bufferData + baton->offset;
// ReadFileEx requires calling GetLastError even upon success. Clear the error beforehand.
SetLastError(0);
// Only read 1 byte, so that the callback will be triggered once any data arrives.
ReadFileEx((HANDLE)baton->fd, offsetPtr, 1, ov, ReadIOCompletion);
// Error codes when call is successful, such as ERROR_MORE_DATA.
lastError = GetLastError();
if (lastError != ERROR_SUCCESS) {
ErrorCodeToString("Reading from COM port (ReadFileEx)", lastError, baton->errorString);
break;
}
// IOCompletion routine is only called once this thread is in an alertable wait state.
SleepEx(INFINITE, TRUE);
}
delete ov;
// Signal the main thread to run the callback.
uv_async_t* async = new uv_async_t;
uv_async_init(uv_default_loop(), async, EIO_AfterRead);
async->data = baton;
uv_async_send(async);
ExitThread(0);
CloseHandle(hEvent);
}

void EIO_AfterRead(uv_async_t* req) {
void EIO_AfterRead(uv_work_t* req) {
Nan::HandleScope scope;
ReadBaton* baton = static_cast<ReadBaton*>(req->data);
WaitForSingleObject(baton->hThread, INFINITE);
delete req;

v8::Local<v8::Value> argv[2];
Expand Down
12 changes: 2 additions & 10 deletions src/serialport_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ struct WriteBaton {
size_t bufferLength;
size_t offset;
size_t bytesWritten;
void* hThread;
bool complete;
Nan::Persistent<v8::Object> buffer;
Nan::Callback callback;
int result;
Expand All @@ -25,9 +23,7 @@ struct WriteBaton {

NAN_METHOD(Write);
void EIO_Write(uv_work_t* req);
void EIO_AfterWrite(uv_async_t* req);
DWORD WriteThread(void* param);

void EIO_AfterWrite(uv_work_t* req);

struct ReadBaton {
int fd;
Expand All @@ -36,17 +32,13 @@ struct ReadBaton {
size_t bytesRead;
size_t bytesToRead;
size_t offset;
void* hThread;
bool complete;
char errorString[ERROR_STRING_SIZE];
Nan::Callback callback;
};

NAN_METHOD(Read);
void EIO_Read(uv_work_t* req);
void EIO_AfterRead(uv_async_t* req);
DWORD ReadThread(void* param);

void EIO_AfterRead(uv_work_t* req);

NAN_METHOD(List);
void EIO_List(uv_work_t* req);
Expand Down

0 comments on commit c7a3be4

Please sign in to comment.