From 5ab68b615a47cbd592ece4e3676f7e2e30d50df0 Mon Sep 17 00:00:00 2001 From: Francis Gulotta Date: Sun, 28 Apr 2019 11:42:04 -0400 Subject: [PATCH] feat: Async-Iterator interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is the next generation of serial port interfaces. Streams are never going away but the async iterator interface is much more flexible. It can be combined with async-iterator tools (such as [streaming-iterables](https://www.npmjs.com/package/streaming-iterables)) to make buffers and parsers, and can even be combined with our existing stream based parsers. This is very experimental. I’ve tried to bring a lot of these changes in https://github.com/node-serialport/node-serialport/tree/reconbot/typescript2 but I haven’t had time for a full typescript rewrite. So maybe this smaller api change lets us get some of these advantages without having to rewrite everything. ## Todo - [ ] api feedback - [ ] docs for website and readme - [ ] abstract away get/set/update borrowing from https://github.com/node-serialport/node-serialport/issues/1679 for local and remote state and parity with web serial - [ ] tests ## Example Usage ```js const { open, list } = require('@serialport/async-iterator') const ports = await list() const arduinoPort = ports.find(info => (info.manufacture || '').includes('Arduino')) const port = await open(arduinoPort) // read bytes until close for await (const bytes of port) { console.log(`read ${bytes.length} bytes`) } // read 12 bytes const { value, end } = await port.next(12) console.log(`read ${value.length} bytes / port closed: ${end}`) // write a buffer await port.write(Buffer.from('hello!')) ``` --- greenkeeper.json | 1 + packages/async-iterator/.npmignore | 4 + packages/async-iterator/README.md | 13 +++ packages/async-iterator/lib/index.js | 88 +++++++++++++++++++ .../async-iterator/lib/index.node-10-test.js | 86 ++++++++++++++++++ packages/async-iterator/lib/index.test.js | 8 ++ packages/async-iterator/package.json | 22 +++++ 7 files changed, 222 insertions(+) create mode 100644 packages/async-iterator/.npmignore create mode 100644 packages/async-iterator/README.md create mode 100644 packages/async-iterator/lib/index.js create mode 100644 packages/async-iterator/lib/index.node-10-test.js create mode 100644 packages/async-iterator/lib/index.test.js create mode 100644 packages/async-iterator/package.json diff --git a/greenkeeper.json b/greenkeeper.json index 9fffeee3b..3a2f8b2ba 100644 --- a/greenkeeper.json +++ b/greenkeeper.json @@ -3,6 +3,7 @@ "default": { "packages": [ "package.json", + "packages/async-iterator/package.json", "packages/binding-abstract/package.json", "packages/binding-mock/package.json", "packages/bindings/package.json", diff --git a/packages/async-iterator/.npmignore b/packages/async-iterator/.npmignore new file mode 100644 index 000000000..234ae0f52 --- /dev/null +++ b/packages/async-iterator/.npmignore @@ -0,0 +1,4 @@ +.DS_Store +*.test.js +*.node-10-test.js +CHANGELOG.md diff --git a/packages/async-iterator/README.md b/packages/async-iterator/README.md new file mode 100644 index 000000000..d096a34b9 --- /dev/null +++ b/packages/async-iterator/README.md @@ -0,0 +1,13 @@ +# @serialport/AsyncIterator + +This is a node SerialPort project! This package does some neat stuff. + +- [Guides and API Docs](https://serialport.io/) + +This is why you'd use it. + +This is how you use it. +```js +const asyncIterator = new AsyncIterator() + +``` diff --git a/packages/async-iterator/lib/index.js b/packages/async-iterator/lib/index.js new file mode 100644 index 000000000..7bbdef9ae --- /dev/null +++ b/packages/async-iterator/lib/index.js @@ -0,0 +1,88 @@ +const debug = require('debug')('serialport/async-iterator') + +/** + * An AsyncIterator that does something pretty cool. + * @param {Object} options open options + * @example ``` +// To use the `AsyncIterator` interface: +const { open } = require('@serialport/async-iterator') +const Binding = require('@serialport/bindings') +const ports = await Binding.list() +const arduinoPort = ports.find(info => (info.manufacture || '').includes('Arduino')) +const port = await open(arduinoPort) + +// read bytes until close +for await (const bytes of port) { + console.log(`read ${bytes.length} bytes`) +} + +// read 12 bytes +const { value, end } = await port.next(12) +console.log(`read ${value.length} bytes / port closed: ${end}`) + +// write a buffer +await port.write(Buffer.from('hello!')) +``` +*/ + +/** + * Wrap an async function so that subsequent calls are queued behind the previous promise resolving + */ +const promiseQueue = func => { + const queuedFunc = (...args) => { + queuedFunc.previousCall = queuedFunc.previousCall.then(() => func(...args)) + return queuedFunc.previousCall + } + queuedFunc.previousCall = Promise.resolve() + return queuedFunc +} + +const open = async ({ Binding, readSize = 1024, path, bindingOptions = {}, ...openOptions }) => { + const binding = new Binding(bindingOptions) + debug('opening with', { path, openOptions }) + await binding.open(path, openOptions) + + const next = async (bytesToRead = readSize) => { + if (!binding.isOpen) { + debug('next: port is closed') + return { value: undefined, done: true } + } + + const readBuffer = Buffer.allocUnsafe(bytesToRead) + try { + debug(`next: read starting`) + const { bytesRead } = await binding.read(readBuffer, 0, bytesToRead) + debug(`next: read ${bytesRead} bytes`) + const value = readBuffer.slice(0, bytesRead) + return { value, done: false } + } catch (error) { + if (error.canceled) { + debug(`next: read canceled`) + return { value: undefined, done: true } + } + debug(`next: read error ${error.message}`) + throw error + } + } + + const port = { + [Symbol.asyncIterator]: () => port, + next: promiseQueue(next), + write: promiseQueue(data => binding.write(data)), + close: () => binding.close(), + update: opt => binding.update(opt), + set: opt => binding.set(opt), + get: () => binding.get(), + flush: () => binding.flush(), + drain: () => binding.drain(), + binding, + get isOpen() { + return binding.isOpen + }, + } + return port +} + +module.exports = { + open, +} diff --git a/packages/async-iterator/lib/index.node-10-test.js b/packages/async-iterator/lib/index.node-10-test.js new file mode 100644 index 000000000..afc752000 --- /dev/null +++ b/packages/async-iterator/lib/index.node-10-test.js @@ -0,0 +1,86 @@ +const assert = require('assert') +const { open } = require('./index') +const MockBinding = require('@serialport/binding-mock') + +const testPath = '/dev/coolPort' +describe('AsyncIterator', () => { + if (process.versions.node.split('.')[0] < 10) { + // eslint-disable-next-line mocha/no-pending-tests + it('Requires Node 10 or higher') + return + } + describe('.open', () => { + beforeEach(() => { + MockBinding.reset() + MockBinding.createPort(testPath) + }) + it('Opens port', async () => { + const port = await open({ Binding: MockBinding, path: testPath }) + assert.strictEqual(port.isOpen, true) + await port.close() + assert.strictEqual(port.isOpen, false) + }) + }) + describe('reading', () => { + beforeEach(() => { + MockBinding.reset() + MockBinding.createPort(testPath) + }) + it('reads data', async () => { + const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 }) + const buffers = [] + const testData = Buffer.from('This is test data.') + port.binding.emitData(testData) + for await (const bytes of port) { + buffers.push(bytes) + // if we're done reading available data close the port + if (port.binding.port.data.length === 0) { + await port.close() + } + } + assert.deepStrictEqual(Buffer.concat(buffers), testData, 'data does not match') + assert.strictEqual(buffers.length, testData.length) + }) + it('deals with concurrent reads', async () => { + const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 }) + const testData = Buffer.from('This is test data.') + port.binding.emitData(testData) + const buffers = await Promise.all([port.next(testData.length / 2), port.next(testData.length / 2)]) + assert.deepStrictEqual(Buffer.concat(buffers.map(itr => itr.value)), testData, 'data does not match') + await port.close() + }) + it('deals with huge read requests', async () => { + const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 }) + const testData = Buffer.from('This is test data.') + port.binding.emitData(testData) + const data = await port.next(10000) + assert.deepStrictEqual(data.value, testData) + await port.close() + }) + it('deals with the port being closed', async () => { + const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 }) + const read = port.next() + await port.close() + assert.deepStrictEqual(await read, { value: undefined, done: true }) + }) + }) + describe('writes', () => { + beforeEach(() => { + MockBinding.reset() + MockBinding.createPort(testPath) + }) + it('writes data', async () => { + const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 }) + const testData = Buffer.from('This is test data.') + await port.write(testData) + assert.deepStrictEqual(port.binding.lastWrite, testData) + }) + it('queues writes', async () => { + const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 }) + const testData = Buffer.from('This is test data.') + const testData2 = Buffer.from('this is also test data') + await Promise.all([port.write(testData), port.write(testData2)]) + assert.deepStrictEqual(port.binding.lastWrite, testData2) + }) + }) +}) diff --git a/packages/async-iterator/lib/index.test.js b/packages/async-iterator/lib/index.test.js new file mode 100644 index 000000000..4b4a47826 --- /dev/null +++ b/packages/async-iterator/lib/index.test.js @@ -0,0 +1,8 @@ +if (process.versions.node.split('.')[0] < 10) { + describe('AsyncIterator', () => { + // eslint-disable-next-line mocha/no-pending-tests + it('Requires Node 10 or higher') + }) +} else { + require('./index.node-10-test') +} diff --git a/packages/async-iterator/package.json b/packages/async-iterator/package.json new file mode 100644 index 000000000..ac0038a86 --- /dev/null +++ b/packages/async-iterator/package.json @@ -0,0 +1,22 @@ +{ + "name": "@serialport/async-iterator", + "version": "8.0.5", + "main": "lib/async-iterator.js", + "dependencies": { + "debug": "^4.1.1" + }, + "devDependencies": { + "@serialport/binding-mock": "^8.0.4" + }, + "engines": { + "node": ">=10.3.0" + }, + "publishConfig": { + "access": "public" + }, + "license": "MIT", + "repository": { + "type": "git", + "url": "git://github.com/node-serialport/node-serialport.git" + } +}