Skip to content

Commit

Permalink
Merge pull request #52 from vinceau/fix/folder-stream
Browse files Browse the repository at this point in the history
Fix custom-hud example.
  • Loading branch information
vinceau authored Nov 16, 2020
2 parents d2ef354 + c45fc29 commit 5844fe7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 26 deletions.
8 changes: 4 additions & 4 deletions examples/custom-hud/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ const errHandler = (err) => {
if (err) {
console.error(err);
}
}
};

const setPlayerStock = (player, stock) => {
fs.writeFile(path.join(playerInfoFolder, `player${player}Stocks.txt`), stock, errHandler);
}
};

const setPlayerPercent = (player, percent) => {
fs.writeFile(path.join(playerInfoFolder, `player${player}Percent.txt`), percent, errHandler);
}
};

// Connect to the relay
const stream = new SlpFolderStream();
const realtime = new SlpRealTime();
realtime.setStream(stream);
realtime.game.start$.subscribe(() => {
console.log(`Detected a new game in ${stream.getCurrentFilename()}`);
console.log(`Detected a new game in ${stream.latestFile()}`);
});
realtime.stock.percentChange$.subscribe((payload) => {
const player = payload.playerIndex + 1;
Expand Down
65 changes: 43 additions & 22 deletions src/stream/slpFolderStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import tailstream, { TailStream } from "tailstream";
import { RxSlpStream } from "./rxSlpStream";
import { SlpFileWriterOptions, SlpStreamSettings, SlpStreamMode } from "@slippi/slippi-js";
import { WritableOptions } from "stream";
import { Subject, Observable, fromEvent } from "rxjs";
import { Subject, fromEvent, BehaviorSubject } from "rxjs";
import { map, switchMap, share, takeUntil } from "rxjs/operators";

/**
Expand All @@ -27,7 +27,7 @@ import { map, switchMap, share, takeUntil } from "rxjs/operators";
export class SlpFolderStream extends RxSlpStream {
private startRequested$ = new Subject<[string, boolean]>();
private stopRequested$ = new Subject<void>();
private newFile$: Observable<string>;
private newFile$ = new BehaviorSubject<string | null>(null);
private readStream: TailStream | null = null;

public constructor(
Expand All @@ -36,28 +36,17 @@ export class SlpFolderStream extends RxSlpStream {
opts?: WritableOptions,
) {
super(options, { ...slpOptions, mode: SlpStreamMode.MANUAL }, opts);
this.newFile$ = this.startRequested$.pipe(
switchMap(([slpFolder, includeSubfolders]) => {
// End any existing read streams
this.endReadStream();
this._setupSubjects();
}

// Initialize watcher.
const subFolderGlob = includeSubfolders ? "**" : "";
const slpGlob = path.join(slpFolder, subFolderGlob, "*.slp");
const watcher = chokidar.watch(slpGlob, {
ignored: /(^|[\/\\])\../, // ignore dotfiles
persistent: true,
ignoreInitial: true,
ignorePermissionErrors: true,
});
return fromEvent<[string, any]>(watcher, "add").pipe(
share(),
map(([filename]) => path.resolve(filename)),
takeUntil(this.stopRequested$),
);
}),
);
private _setupSubjects(): void {
// Handle what happens when we detect a new file
this.newFile$.subscribe((filePath) => {
// Filepath can be null if it's not subscription hasn't started
if (!filePath) {
return;
}

console.log(`found a new file: ${filePath}`);
this.endReadStream();

Expand All @@ -67,6 +56,31 @@ export class SlpFolderStream extends RxSlpStream {
this.readStream = tailstream.createReadStream(filePath);
this.readStream.pipe(this, { end: false });
});

// Set up the new file listener
this.startRequested$
.pipe(
switchMap(([slpFolder, includeSubfolders]) => {
// End any existing read streams
this.endReadStream();

// Initialize watcher.
const subFolderGlob = includeSubfolders ? "**" : "";
const slpGlob = path.join(slpFolder, subFolderGlob, "*.slp");
const watcher = chokidar.watch(slpGlob, {
ignored: /(^|[\/\\])\../, // ignore dotfiles
persistent: true,
ignoreInitial: true,
ignorePermissionErrors: true,
});
return fromEvent<[string, any]>(watcher, "add").pipe(
share(),
map(([filename]) => path.resolve(filename)),
takeUntil(this.stopRequested$),
);
}),
)
.subscribe(this.newFile$);
}

private endReadStream(): void {
Expand All @@ -93,4 +107,11 @@ export class SlpFolderStream extends RxSlpStream {
this.endReadStream();
this.stopRequested$.next();
}

/**
* Returns the latest created file that was found by folder monitoring.
*/
public latestFile(): string | null {
return this.newFile$.value;
}
}

0 comments on commit 5844fe7

Please sign in to comment.