-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
75 lines (55 loc) · 1.72 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
const {Duplex, Readable} = require('stream')
function read(){}
// Based on
// https://github.com/maxogden/websocket-stream/blob/3f39dcbc098f661ee12d7deba297b1420a0a0e07/stream.js#L154
function mapChunks(item)
{
const {chunk} = item
return typeof chunk === 'string' ? Buffer.from(item, 'utf8') : chunk
}
function writev(chunks, cb)
{
this._write(Buffer.concat(chunks.map(mapChunks)), 'binary', cb)
}
module.exports = function MessageEventStream(ws, {closeOnFinish, ...options} = {})
{
const send = ws.send || ws.postMessage
const Constructor = send ? Duplex : Readable
const stream = new Constructor({...options, read})
.once('close', ws.close.bind(ws))
// Configure `MessageEvent` emitter object
ws.addEventListener('close', function(event)
{
stream.destroy(event.code !== undefined && event)
})
ws.addEventListener('error', stream.destroy.bind(stream))
ws.addEventListener('message', function({data})
{
stream.push(data)
})
// Configure writable part of the `Duplex` stream
if(send)
{
stream._write = function(chunk, _, callback)
{
send.call(ws, chunk)
callback()
}
if(closeOnFinish) stream.once('finish', stream.emit.bind(stream, 'close'))
// Allow to concatenate and sent messages as one if not in `objectMode`
if(!options.objectMode) stream._writev = writev
// Don't call to `write()` until the `MessageEvent` emitter object is ready
const {OPEN, readyState} = ws
if(readyState !== OPEN && readyState !== 'open')
{
stream.cork()
function onOpen()
{
ws.removeEventListener('open', onOpen)
process.nextTick(stream.uncork.bind(stream))
}
ws.addEventListener('open', onOpen)
}
}
return stream
}