Skip to content

Commit

Permalink
feat: Async-Iterator interface
Browse files Browse the repository at this point in the history
This is the next generation of serial port interfaces. Streams are never going away but the async iterator interface is much more flexible. It can be combined with async-iterator tools (such as [streaming-iterables](https://www.npmjs.com/package/streaming-iterables)) to make buffers and parsers, and can even be combined with our existing stream based parsers.

This is very experimental. I’ve tried to bring a lot of these changes in https://github.com/node-serialport/node-serialport/tree/reconbot/typescript2 but I haven’t had time for a full typescript rewrite. So maybe this smaller api change lets us get some of these advantages without having to rewrite everything.

## Todo
- [ ] api feedback
- [ ] docs for website and readme
- [ ] abstract away get/set/update borrowing from #1679 for local and remote state and parity with web serial
- [ ] tests

## Example Usage
```js
const { open, list } = require('@serialport/async-iterator')
const ports = await list()
const arduinoPort = ports.find(info => (info.manufacture || '').includes('Arduino'))
const port = await open(arduinoPort)

// read bytes until close
for await (const bytes of port) {
  console.log(`read ${bytes.length} bytes`)
}

// read 12 bytes
const { value, end } = await port.next(12)
console.log(`read ${value.length} bytes / port closed: ${end}`)

// write a buffer
await port.write(Buffer.from('hello!'))
```
  • Loading branch information
reconbot committed Oct 30, 2019
1 parent d500a5b commit 5ab68b6
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 0 deletions.
1 change: 1 addition & 0 deletions greenkeeper.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"default": {
"packages": [
"package.json",
"packages/async-iterator/package.json",
"packages/binding-abstract/package.json",
"packages/binding-mock/package.json",
"packages/bindings/package.json",
Expand Down
4 changes: 4 additions & 0 deletions packages/async-iterator/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DS_Store
*.test.js
*.node-10-test.js
CHANGELOG.md
13 changes: 13 additions & 0 deletions packages/async-iterator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# @serialport/AsyncIterator

This is a node SerialPort project! This package does some neat stuff.

- [Guides and API Docs](https://serialport.io/)

This is why you'd use it.

This is how you use it.
```js
const asyncIterator = new AsyncIterator()

```
88 changes: 88 additions & 0 deletions packages/async-iterator/lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
const debug = require('debug')('serialport/async-iterator')

/**
* An AsyncIterator that does something pretty cool.
* @param {Object} options open options
* @example ```
// To use the `AsyncIterator` interface:
const { open } = require('@serialport/async-iterator')
const Binding = require('@serialport/bindings')
const ports = await Binding.list()
const arduinoPort = ports.find(info => (info.manufacture || '').includes('Arduino'))
const port = await open(arduinoPort)
// read bytes until close
for await (const bytes of port) {
console.log(`read ${bytes.length} bytes`)
}
// read 12 bytes
const { value, end } = await port.next(12)
console.log(`read ${value.length} bytes / port closed: ${end}`)
// write a buffer
await port.write(Buffer.from('hello!'))
```
*/

/**
* Wrap an async function so that subsequent calls are queued behind the previous promise resolving
*/
const promiseQueue = func => {
const queuedFunc = (...args) => {
queuedFunc.previousCall = queuedFunc.previousCall.then(() => func(...args))
return queuedFunc.previousCall
}
queuedFunc.previousCall = Promise.resolve()
return queuedFunc
}

const open = async ({ Binding, readSize = 1024, path, bindingOptions = {}, ...openOptions }) => {
const binding = new Binding(bindingOptions)
debug('opening with', { path, openOptions })
await binding.open(path, openOptions)

const next = async (bytesToRead = readSize) => {
if (!binding.isOpen) {
debug('next: port is closed')
return { value: undefined, done: true }
}

const readBuffer = Buffer.allocUnsafe(bytesToRead)
try {
debug(`next: read starting`)
const { bytesRead } = await binding.read(readBuffer, 0, bytesToRead)
debug(`next: read ${bytesRead} bytes`)
const value = readBuffer.slice(0, bytesRead)
return { value, done: false }
} catch (error) {
if (error.canceled) {
debug(`next: read canceled`)
return { value: undefined, done: true }
}
debug(`next: read error ${error.message}`)
throw error
}
}

const port = {
[Symbol.asyncIterator]: () => port,
next: promiseQueue(next),
write: promiseQueue(data => binding.write(data)),
close: () => binding.close(),
update: opt => binding.update(opt),
set: opt => binding.set(opt),
get: () => binding.get(),
flush: () => binding.flush(),
drain: () => binding.drain(),
binding,
get isOpen() {
return binding.isOpen
},
}
return port
}

module.exports = {
open,
}
86 changes: 86 additions & 0 deletions packages/async-iterator/lib/index.node-10-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
const assert = require('assert')
const { open } = require('./index')
const MockBinding = require('@serialport/binding-mock')

const testPath = '/dev/coolPort'
describe('AsyncIterator', () => {
if (process.versions.node.split('.')[0] < 10) {
// eslint-disable-next-line mocha/no-pending-tests
it('Requires Node 10 or higher')
return
}
describe('.open', () => {
beforeEach(() => {
MockBinding.reset()
MockBinding.createPort(testPath)
})
it('Opens port', async () => {
const port = await open({ Binding: MockBinding, path: testPath })
assert.strictEqual(port.isOpen, true)
await port.close()
assert.strictEqual(port.isOpen, false)
})
})
describe('reading', () => {
beforeEach(() => {
MockBinding.reset()
MockBinding.createPort(testPath)
})
it('reads data', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const buffers = []
const testData = Buffer.from('This is test data.')
port.binding.emitData(testData)
for await (const bytes of port) {
buffers.push(bytes)
// if we're done reading available data close the port
if (port.binding.port.data.length === 0) {
await port.close()
}
}
assert.deepStrictEqual(Buffer.concat(buffers), testData, 'data does not match')
assert.strictEqual(buffers.length, testData.length)
})
it('deals with concurrent reads', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const testData = Buffer.from('This is test data.')
port.binding.emitData(testData)
const buffers = await Promise.all([port.next(testData.length / 2), port.next(testData.length / 2)])
assert.deepStrictEqual(Buffer.concat(buffers.map(itr => itr.value)), testData, 'data does not match')
await port.close()
})
it('deals with huge read requests', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const testData = Buffer.from('This is test data.')
port.binding.emitData(testData)
const data = await port.next(10000)
assert.deepStrictEqual(data.value, testData)
await port.close()
})
it('deals with the port being closed', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const read = port.next()
await port.close()
assert.deepStrictEqual(await read, { value: undefined, done: true })
})
})
describe('writes', () => {
beforeEach(() => {
MockBinding.reset()
MockBinding.createPort(testPath)
})
it('writes data', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const testData = Buffer.from('This is test data.')
await port.write(testData)
assert.deepStrictEqual(port.binding.lastWrite, testData)
})
it('queues writes', async () => {
const port = await open({ Binding: MockBinding, path: testPath, readSize: 1 })
const testData = Buffer.from('This is test data.')
const testData2 = Buffer.from('this is also test data')
await Promise.all([port.write(testData), port.write(testData2)])
assert.deepStrictEqual(port.binding.lastWrite, testData2)
})
})
})
8 changes: 8 additions & 0 deletions packages/async-iterator/lib/index.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
if (process.versions.node.split('.')[0] < 10) {
describe('AsyncIterator', () => {
// eslint-disable-next-line mocha/no-pending-tests
it('Requires Node 10 or higher')
})
} else {
require('./index.node-10-test')
}
22 changes: 22 additions & 0 deletions packages/async-iterator/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "@serialport/async-iterator",
"version": "8.0.5",
"main": "lib/async-iterator.js",
"dependencies": {
"debug": "^4.1.1"
},
"devDependencies": {
"@serialport/binding-mock": "^8.0.4"
},
"engines": {
"node": ">=10.3.0"
},
"publishConfig": {
"access": "public"
},
"license": "MIT",
"repository": {
"type": "git",
"url": "git://github.com/node-serialport/node-serialport.git"
}
}

0 comments on commit 5ab68b6

Please sign in to comment.