Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standarised websocket and YT stream changes #2562

Merged
merged 12 commits into from
Aug 14, 2024
7 changes: 3 additions & 4 deletions lib/saito/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,19 +325,17 @@ class Server {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const ws = require('ws');

const wss = new ws.Server({
const wss = new ws.WebSocketServer({
noServer: true,
path: '/wsopen'
});
webserver.on('upgrade', (request: any, socket: any, head: any) => {
// console.debug("connection upgrade ----> " + request.url);
console.debug("connection upgrade ----> " + request.url);
const { pathname } = parse(request.url);
if (pathname === '/wsopen') {
wss.handleUpgrade(request, socket, head, (websocket: any) => {
wss.emit('connection', websocket, request);
});
} else {
socket.destroy();
}
});
webserver.on('error', (error) => {
Expand Down Expand Up @@ -367,6 +365,7 @@ class Server {

});

this.app.modules.onWebSocketServer(webserver);
}

initialize() {
Expand Down
96 changes: 71 additions & 25 deletions lib/saito/modules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import Peer from './peer';
import Transaction from './transaction';
import path from 'path';
import fs from 'fs';
import ws from 'ws';
import { parse } from 'url';

class Mods {

Expand Down Expand Up @@ -126,8 +128,8 @@ class Mods {
}
}

} catch (err) { }

} catch (err) {
}


for (let iii = 0; iii < this.mods.length; iii++) {
Expand Down Expand Up @@ -283,11 +285,11 @@ class Mods {
//
// ... setup moderation / filter functions
//
for (let xmod of this.app.modules.respondTo('saito-moderation-app')) {
this.app_filter_func.push(xmod.respondTo('saito-moderation-app').filter_func);
for (let xmod of this.app.modules.respondTo('saito-moderation-app')) {
this.app_filter_func.push(xmod.respondTo('saito-moderation-app').filter_func);
}
for (let xmod of this.app.modules.respondTo('saito-moderation-core')) {
this.core_filter_func.push(xmod.respondTo('saito-moderation-core').filter_func);
for (let xmod of this.app.modules.respondTo('saito-moderation-core')) {
this.core_filter_func.push(xmod.respondTo('saito-moderation-core').filter_func);
}

//
Expand All @@ -311,7 +313,7 @@ class Mods {
'handshake_complete',
async (peerIndex: bigint) => {

if (this.app.BROWSER){
if (this.app.BROWSER) {
// broadcasts my keylist to other peers
await this.app.wallet.setKeyList(this.app.keychain.returnWatchedPublicKeys());
}
Expand Down Expand Up @@ -372,16 +374,18 @@ class Mods {
//
// 1 = permit, -1 = do not permit
//
moderateModule(tx=null, mod=null) {
moderateModule(tx = null, mod = null) {

if (mod == null || tx == null) { return 0; }
if (mod == null || tx == null) {
return 0;
}

for (let z = 0; z < this.app_filter_func.length; z++) {
let permit_through = this.app_filter_func[z](mod, tx);
if (permit_through == 1) {
if (permit_through == 1) {
return 1;
}
if (permit_through == -1) {
if (permit_through == -1) {
return -1;
}
}
Expand All @@ -394,16 +398,18 @@ class Mods {
//
// 1 = permit, -1 = do not permit
//
moderateCore(tx=null) {
moderateCore(tx = null) {

if (tx == null) { return 0; }
if (tx == null) {
return 0;
}

for (let z = 0; z < this.core_filter_func.length; z++) {
let permit_through = this.core_filter_func[z](tx);
if (permit_through == 1) {
if (permit_through == 1) {
return 1;
}
if (permit_through == -1) {
if (permit_through == -1) {
return -1;
}
}
Expand All @@ -412,25 +418,28 @@ class Mods {
}



moderateAddress(publickey="") {
moderateAddress(publickey = '') {
let newtx = new Transaction();
newtx.addFrom(publickey);
return this.moderate(newtx);
}

moderate(tx=null, app="") {
moderate(tx = null, app = '') {

let permit_through = 0;

//
// if there is a relevant app-filter-function, respect it
//
for (let i = 0; i < this.mods.length; i++) {
if (this.mods[i].name == app || app == "*") {
if (this.mods[i].name == app || app == '*') {
permit_through = this.moderateModule(tx, this.mods[i]);
if (permit_through == -1) { return -1; }
if (permit_through == 1) { return 1; }
if (permit_through == -1) {
return -1;
}
if (permit_through == 1) {
return 1;
}
}
}

Expand All @@ -439,9 +448,13 @@ class Mods {
//
permit_through = this.moderateCore(tx);

if (permit_through == -1) { return -1; }
if (permit_through == 1) { return 1; }

if (permit_through == -1) {
return -1;
}
if (permit_through == 1) {
return 1;
}

//
// seems OK if we made it this far
//
Expand All @@ -456,7 +469,7 @@ class Mods {
await this.mods[icb].render(this.app, this.mods[icb]);
}
}
this.app.connection.emit("saito-render-complete");
this.app.connection.emit('saito-render-complete');
return null;
}

Expand Down Expand Up @@ -697,6 +710,39 @@ class Mods {
}
}

async onWebSocketServer(webserver) {
for (let i = 0; i < this.mods.length; i++) {
let mod = this.mods[i];
let path = mod.getWebsocketPath();
if (!path) {
continue;
}
console.log('creating websocket server for module :' + mod.name + " on path : "+path);
let wss = new ws.WebSocketServer({
noServer: true,
// todo : check if the path is already being used or reserved?
path: "/"+path
});
webserver.on('upgrade', (request: any, socket: any, head: any) => {
console.debug("connection on module : "+mod.name+" upgrade ----> " + request.url);
const parsedUrl = parse(request.url);
const pathname = parsedUrl.pathname;
const pathParts = pathname.split('/').filter(Boolean);
const subdirectory = pathParts.length > 0 ? pathParts[0] : null;
console.log(subdirectory + " - " + path);
if (subdirectory === path) {
console.log('inside handleUpgrade');
wss.handleUpgrade(request, socket, head, (websocket: any) => {
console.log("handling upgrade ///");
wss.emit('connection', websocket, request);
});
}
});

mod.onWebSocketServer(wss);
}
}

/*
async getBuildNumber() {
for (let i = 0; i < this.mods.length; i++) {
Expand Down
Loading