forked from splitice/node-netflowv9
-
Notifications
You must be signed in to change notification settings - Fork 0
/
netflowv9.js
111 lines (97 loc) · 3.58 KB
/
netflowv9.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/**
* This version support only compiled code and works with streams API
*/
//require('debug').enable('NetFlowV9');
var debug = require('debug')('NetFlowV9');
var dgram = require('dgram');
var util = require('util');
var EventEmitter = require('asynchronous-emitter');
var nf9PktDecode = require('./lib/nf9/nf9decode');
const FifoQueue = require('./lib/FifoQueue')
const NfTypes = require('./lib/nf9/nftypes');
var nfInfoTemplates = require('./lib/nf9/nfinfotempl');
function nfPktDecode(msg,rinfo) {
const version = msg.readUInt16BE(0);
switch (version) {
case 9:
return this.nf9PktDecode(msg,rinfo);
default:
debug('bad header version %d', version);
return;
}
}
class NetFlowV9 extends EventEmitter {
constructor(options = {}) {
super()
this.templates = {};
const nft = NfTypes(options)
this.nfTypes = nft.nfTypes;
this.nfScope = nft.nfScope;
this.cb = null;
this.templateCb = null;
this.port = null;
this.fifo = new FifoQueue(options.queueSize || 2048);
if (typeof options == 'function') this.cb = options; else
if (typeof options.cb == 'function') this.cb = options.cb;
if (typeof options.templateCb == 'function') this.templateCb = options.templateCb;
if (typeof options == 'object') {
if (options.nfTypes) this.nfTypes = util._extend(this.nfTypes,options.nfTypes); // Inherit nfTypes
if (options.nfScope) this.nfScope = util._extend(this.nfScope,options.nfScope); // Inherit nfTypes
if (options.port) this.port = options.port;
if (options.templates) this.templates = options.templates;
}
this.server = dgram.createSocket(options.socketType || 'udp4');
this.server.on('message',(msg,rinfo)=>{
this.fifo.push({msg, rinfo});
if (!this.closed && this.set) {
this.set = false;
setImmediate(()=>this.fetch())
}
});
this.server.on('close', () => {
this.closed = true;
});
if (this.port) this.listen(options.port, options.host);
}
listen (port,host,cb) {
this.fetch();
if (host && typeof host === 'function')
this.server.bind(port,host);
else if (host && typeof host === 'string' && cb)
this.server.bind(port,host,cb);
else if (host && typeof host === 'string' && !cb)
this.server.bind(port,host);
else if (!host && cb)
this.server.bind(port, cb);
else
this.server.bind(port);
}
getDropped (){
return this.fifo.dropped
}
async fetch () {
try {
const all = this.fifo.shiftAll()
for(const {msg,rinfo} of all){
if (rinfo.size<20) return;
const o = this.nfPktDecode(msg, rinfo);
// If the packet does not contain flows, only templates we do not decode
if (!o) return
o.rinfo = rinfo;
o.packet = msg;
await this.emit('data',o)
}
} catch(ex){
debug(`An exception occurred while handling message, ex: ${ex}`)
}finally {
this.set = true;
}
}
static decIpv4ToString(t){
return ((~~(t/16777216))%256)+'.'+((~~(t/65536))%256)+'.'+((~~(t/256))%256)+'.'+(t%256)
}
}
NetFlowV9.prototype.nfInfoTemplates = nfInfoTemplates;
NetFlowV9.prototype.nfPktDecode = nfPktDecode;
NetFlowV9.prototype.nf9PktDecode = nf9PktDecode;
module.exports = NetFlowV9;