Skip to content

Commit

Permalink
Have a notify message
Browse files Browse the repository at this point in the history
  • Loading branch information
o0101 committed Sep 24, 2023
1 parent cc7effa commit ed9bff5
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 40 deletions.
14 changes: 13 additions & 1 deletion src/ws-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@
const TabNumbers = new Map();

export let LatestCSRFToken = '';
let notifyBandwidthIssue;
let BANDWIDTH_ISSUE_STATE = false;
let serverOrigin;
let messageQueueRunning = false;
let requestId = 0;
Expand Down Expand Up @@ -198,6 +200,12 @@
const sockets = new Set();
const websockets = new Set();
const peers = new Set();
const notifyBandwidthIssue = throttle(function (bandwidthIssue) {
zl.act.fanOut(
socket => so(socket, {bandwidthIssue}),
zombie_port
);
});

let latestMessageId = 0;

Expand Down Expand Up @@ -578,7 +586,11 @@
}
if ( screenshotAck ) {
DEBUG.acks && console.log('client sent screenshot ack', messageId, screenshotAck);
zl.act.screenshotAck(connectionId, zombie_port, screenshotAck, {Data, Frames, Meta, State, receivesFrames: false, messageId});
const {bandwidthIssue} = zl.act.screenshotAck(connectionId, zombie_port, screenshotAck, {Data, Frames, Meta, State, receivesFrames: false, messageId});
if ( bandwidthIssue != BANDWIDTH_ISSUE_STATE ) {
BANDWIDTH_ISSUE_STATE = bandwidthIssue;
notifyBandwidthIssue(bandwidthIssue);
}
}
if ( zombie ) {
const {events} = zombie;
Expand Down
82 changes: 43 additions & 39 deletions src/zombie-lord/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const Options = {
const goLowRes = throttle((connection, ...args) => connection.shrinkImagery(...args), 10000);
const goHighRes = throttle((connection, ...args) => connection.growImagery(...args), 10000);


const controller_api = {
zombieIsDead(port) {
const connection = connections.get(port);
Expand Down Expand Up @@ -72,6 +71,7 @@ const controller_api = {
async screenshotAck(connectionId, port, receivedFrameId, channel) {
const {frameId, castSessionId} = receivedFrameId;
const connection = connections.get(port);
let bandwidthIssue = false;
//DEBUG.debugCast && console.log('Acking', connectionId, port, receivedFrameId);
if ( connection ) {
const channels = connection.links.get(connectionId);
Expand Down Expand Up @@ -137,61 +137,65 @@ const controller_api = {
DEBUG.debugAdaptiveImagery && console.log(`Average roundtrip time: ${avgRoundtrip}ms, actual: ${roundtripTime}ms`);
if ( avgRoundtrip > MAX_ROUNDTRIP /*|| roundtripTime > MAX_ROUNDTRIP */ ) {
goLowRes(connection);
bandwidthIssue = true;
} else if ( avgRoundtrip < MIN_ROUNDTRIP /*|| roundtripTime < MIN_SPOT_ROUNDTRIP */) {
goHighRes(connection);
bandwidthIssue = false;
}
}
}

if ( ack.sending ) return;
//DEBUG.debugCast && console.log("Sending frames", ack);
ack.sending = true;

await sleep(10);

while ( ack.count && DEBUG.bufSend && ack.bufSend && ack.buffer.length ) {
DEBUG.acks && console.log(`Got ack from ${connectionId} and have buffered unsent frame. Will send now.`);

const {peer, socket, fastest} = channels;
const channel = DEBUG.chooseFastest && fastest ? fastest :
DEBUG.useWebRTC && peer ? peer : socket;
const [imgBuf, frameId] = ack.buffer.pop();
DEBUG.shotDebug && console.log('Sending', frameId);
connection.so(channel, imgBuf);
DEBUG.adaptiveImagery && ack.sent.set(frameId, Date.now());

if ( DEBUG.chooseFastest && DEBUG.useWebRTC && socket && peer ) {
const choice = Math.random() >= RACE_SAMPLE;
if ( choice ) {
const otherChannel = channel === peer ? socket : peer;
connection.so(otherChannel, imgBuf);
DEBUG.logFastest && console.log('Race started');
if ( !ack.sending ) {
//DEBUG.debugCast && console.log("Sending frames", ack);
ack.sending = true;

await sleep(10);

while ( ack.count && DEBUG.bufSend && ack.bufSend && ack.buffer.length ) {
DEBUG.acks && console.log(`Got ack from ${connectionId} and have buffered unsent frame. Will send now.`);

const {peer, socket, fastest} = channels;
const channel = DEBUG.chooseFastest && fastest ? fastest :
DEBUG.useWebRTC && peer ? peer : socket;
const [imgBuf, frameId] = ack.buffer.pop();
DEBUG.shotDebug && console.log('Sending', frameId);
connection.so(channel, imgBuf);
DEBUG.adaptiveImagery && ack.sent.set(frameId, Date.now());

if ( DEBUG.chooseFastest && DEBUG.useWebRTC && socket && peer ) {
const choice = Math.random() >= RACE_SAMPLE;
if ( choice ) {
const otherChannel = channel === peer ? socket : peer;
connection.so(otherChannel, imgBuf);
DEBUG.logFastest && console.log('Race started');
}
}
}

//ack.bufSend = false;
ack.received = 0;
ack.count -= 1;
if ( ack.count < 0 ) {
ack.count = 0;
ack.buffer.length = 0;
//ack.bufSend = false;
ack.received = 0;
ack.count -= 1;
if ( ack.count < 0 ) {
ack.count = 0;
ack.buffer.length = 0;
}
await sleep(MIN_TIME_BETWEEN_SHOTS);
}
await sleep(MIN_TIME_BETWEEN_SHOTS);
}

ack.sending = false;
ack.sending = false;

if ( ack.sent.size > FRAME_GC_LIMIT ) {
for( const key of ack.sent.keys() ) {
if ( key < frameId ) ack.sent.delete(key);
if ( ack.sent.size > FRAME_GC_LIMIT ) {
for( const key of ack.sent.keys() ) {
if ( key < frameId ) ack.sent.delete(key);
}
}
}

DEBUG.acks && console.log(`Set ack received ${connectionId}`);
DEBUG.acks && console.log(`Set ack received ${connectionId}`);
}
} catch(e) {
console.warn('screenshotAck error', e);
ack.sending = false;
}
return {bandwidthIssue};
} else {
throw new TypeError(`No connection on port ${port}`);
}
Expand Down

0 comments on commit ed9bff5

Please sign in to comment.