-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathtail-processor.ts
40 lines (34 loc) · 1.19 KB
/
tail-processor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import path from 'path';
import { Readable } from 'stream';
import { ITailProcessorConfig, processStream } from '../types';
import { DirectoryProcessor } from './base/directory-processor';
const Tail = require('tail').Tail;
export class TailProcessor extends DirectoryProcessor {
protected fromBeginning: boolean;
constructor(config: ITailProcessorConfig) {
super(config);
this.fromBeginning = !!config.fromBeginning;
}
async process(processStream: processStream) {
const readStream = new Readable({
objectMode: true,
read() { },
});
const tails = [];
for (const file of this.files) {
const filePath = path.join(this.path, file);
const tail = new Tail(filePath, {
fromBeginning: this.fromBeginning,
});
tail.on('line', (data: any) => {
readStream.push({ file, data });
});
tail.on('error', (exception: any) => {
readStream.emit('error', exception);
});
tails.push(tail);
};
await processStream(readStream);
tails.forEach(tail => tail.unwatch());
}
}