Skip to content

Commit

Permalink
feat: inter byte timeout parser (#1779)
Browse files Browse the repository at this point in the history
Emits data if there is a pause between packets for the specified amount of time.
Holger-Will authored and reconbot committed Jan 20, 2019
1 parent dd08fea commit cbb8e41
Showing 7 changed files with 194 additions and 0 deletions.
22 changes: 22 additions & 0 deletions docs/api-parser-inter-byte-timeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
id: api-parser-inter-byte-timeout
title: InterByteTimeout Parser
---
```typescript
new InterByteTimeout(options)
```
Emits data if there is a pause between packets for the specified amount of time.

A transform stream that emits data as a buffer after not recieving any bytes for the specified amount of time.

Arguments
- `options.interval: number` the period of silence in milliseconds after which data is emited
- `options.maxBufferSize: number` the maximum number of bytes after which data will be emited. Defaults to 65536.

## Example
```js
const SerialPort = require('serialport')
const InterByteTimeout = require('@serialport/parser-inter-byte-timeout')
const port = new SerialPort('/dev/tty-usbserial1')
const parser = port.pipe(new InterByteTimeout({interval: 30}))
parser.on('data', console.log) // will emit data if there is a pause between packets of at least 30ms
1 change: 1 addition & 0 deletions packages/parser-inter-byte-timeout/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.test.js
1 change: 1 addition & 0 deletions packages/parser-inter-byte-timeout/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
See our api docs https://serialport.io/docs/api-parser-inter-byte-timeout
71 changes: 71 additions & 0 deletions packages/parser-inter-byte-timeout/inter-byte-timeout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
const Transform = require('stream').Transform

/**
* Emits data if there is a pause between packets for the specified amount of time.
* @extends Transform
* @param {Object} options parser options object
* @param {Number} options.interval the period of silence in milliseconds after which data is emited
* @param {Number} options.maxBufferSize the maximum number of bytes after which data will be emited. Defaults to 65536.
* @summary A transform stream that emits data as a buffer after not receiving any bytes for the specified amount of time.
* @example
const SerialPort = require('serialport')
const InterByteTimeout = require('@serialport/parser-inter-byte-timeout')
const port = new SerialPort('/dev/tty-usbserial1')
const parser = port.pipe(new InterByteTimeout({interval: 30}))
parser.on('data', console.log) // will emit data if there is a pause between packets greater than 30ms
*/

class InterByteTimeoutParser extends Transform {
constructor(options) {
super()
options = Object.assign({ maxBufferSize: 65536 }, options)
if (!options.interval) {
throw new TypeError('"interval" is required')
}

if (typeof options.interval !== 'number' || Number.isNaN(options.interval)) {
throw new TypeError('"interval" is not a number')
}

if (options.interval < 1) {
throw new TypeError('"interval" is not greater than 0')
}

if (typeof options.maxBufferSize !== 'number' || Number.isNaN(options.maxBufferSize)) {
throw new TypeError('"maxBufferSize" is not a number')
}

if (options.maxBufferSize < 1) {
throw new TypeError('"maxBufferSize" is not greater than 0')
}

this.maxBufferSize = options.maxBufferSize
this.currentPacket = []
this.interval = options.interval
this.intervalID = -1
}
_transform(chunk, encoding, cb) {
clearTimeout(this.intervalID)
for (let offset = 0; offset < chunk.length; offset++) {
this.currentPacket.push(chunk[offset])
if (this.currentPacket.length >= this.maxBufferSize) {
this.emitPacket()
}
}
this.intervalID = setTimeout(this.emitPacket.bind(this), this.interval)
cb()
}
emitPacket() {
clearTimeout(this.intervalID)
if (this.currentPacket.length > 0) {
this.push(Buffer.from(this.currentPacket))
}
this.currentPacket = []
}
_flush(cb) {
this.emitPacket()
cb()
}
}

module.exports = InterByteTimeoutParser
82 changes: 82 additions & 0 deletions packages/parser-inter-byte-timeout/inter-byte-timeout.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* eslint-disable no-new */

const sinon = require('sinon')
const InterByteTimeoutParser = require('./inter-byte-timeout')

function wait(interval) {
return new Promise((resolve, reject) => {
setTimeout(resolve, interval)
if (interval < 1) reject()
})
}

describe('InterByteTimeoutParser', () => {
it('emits data events after a pause of 30ms', () => {
const spy = sinon.spy()
const parser = new InterByteTimeoutParser({ interval: 30 })
parser.on('data', spy)
parser.write(Buffer.from('I love robots Each'))
parser.write(Buffer.from('and Every One'))
wait(30).then(() => {
parser.write(Buffer.from('even you!'))
parser.write(Buffer.from('The angry red robot'))
wait(30).then(() => {
assert(spy.calledTwice, 'expecting 2 data events')
})
})
})

it('throws when interval is not a number or negative', () => {
assert.throws(() => {
new InterByteTimeoutParser({ interval: -20 })
})
assert.throws(() => {
new InterByteTimeoutParser({ interval: NaN })
})
assert.throws(() => {
new InterByteTimeoutParser({ interval: 'hello' })
})
assert.throws(() => {
new InterByteTimeoutParser()
})
})

it('throws when maxBufferSize is not a number or negative', () => {
assert.throws(() => {
new InterByteTimeoutParser({ maxBufferSize: -20, interval: 15 })
})
assert.throws(() => {
new InterByteTimeoutParser({ maxBufferSize: NaN, interval: 15 })
})
assert.throws(() => {
new InterByteTimeoutParser({ maxBufferSize: 'hello', interval: 15 })
})
})

it('emits data events when buffer is full', () => {
const spy = sinon.spy()
const parser = new InterByteTimeoutParser({ maxBufferSize: 2, interval: 15 })
parser.on('data', spy)
parser.write(Buffer.from([1, 2, 3, 4, 5, 6]))
wait(15).then(() => {
assert(spy.calledThrice, 'expecting 3 data events')
})
})

it('emits all buffered data when stream ends', () => {
const spy = sinon.spy()
const parser = new InterByteTimeoutParser({ interval: 15 })
parser.on('data', spy)
parser.write('Oh wow.')
parser.end()
assert(spy.calledOnce, 'expecting 1 data event')
})
it('emits all buffered data when stream ends', () => {
const spy = sinon.spy()
const parser = new InterByteTimeoutParser({ interval: 15 })
parser.on('data', spy)
parser.write('')
parser.end()
assert(spy.notCalled, 'expecting no data events')
})
})
16 changes: 16 additions & 0 deletions packages/parser-inter-byte-timeout/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "@serialport/parser-inter-byte-timeout",
"version": "1.0.0",
"main": "inter-byte-timeout.js",
"engines": {
"node": ">=6.0.0"
},
"publishConfig": {
"access": "public"
},
"license": "MIT",
"repository": {
"type": "git",
"url": "git://github.com/node-serialport/node-serialport.git"
}
}
1 change: 1 addition & 0 deletions packages/website/sidebars.json
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
"api-parser-byte-length",
"api-parser-cctalk",
"api-parser-delimiter",
"api-parser-inter-byte-timeout",
"api-parser-readline",
"api-parser-ready",
"api-parser-regex",

0 comments on commit cbb8e41

Please sign in to comment.