Skip to content

Commit

Permalink
Refactor: commands (#392)
Browse files Browse the repository at this point in the history
* refactor: commands

* Update peer protocol version

* Add requestId in uploading context

* Update requestId generation in Peer class
  • Loading branch information
DimaDemchenko authored Jul 17, 2024
1 parent 6ab020d commit 6549f70
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 28 deletions.
6 changes: 3 additions & 3 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ export class Core<TStream extends Stream = Stream> {
/** Default configuration for stream settings. */
static readonly DEFAULT_STREAM_CONFIG: StreamConfig = {
isP2PDisabled: false,
simultaneousHttpDownloads: 3,
simultaneousHttpDownloads: 2,
simultaneousP2PDownloads: 3,
highDemandTimeWindow: 15,
httpDownloadTimeWindow: 3000,
p2pDownloadTimeWindow: 6000,
webRtcMaxMessageSize: 64 * 1024 - 1,
p2pNotReceivingBytesTimeoutMs: 1000,
p2pNotReceivingBytesTimeoutMs: 2000,
p2pInactiveLoaderDestroyTimeoutMs: 30 * 1000,
httpNotReceivingBytesTimeoutMs: 1000,
httpNotReceivingBytesTimeoutMs: 3000,
httpErrorRetries: 3,
p2pErrorRetries: 3,
trackerClientVersionPrefix: TRACKER_CLIENT_VERSION_PREFIX,
Expand Down
3 changes: 3 additions & 0 deletions packages/p2p-media-loader-core/src/p2p/commands/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ function serializePeerSegmentCommand(
) {
const creator = new BinaryCommandCreator(command.c, maxChunkSize);
creator.addInteger("i", command.i);
creator.addInteger("r", command.r);
creator.complete();
return creator.getResultBuffers();
}
Expand All @@ -39,6 +40,7 @@ function serializePeerSendSegmentCommand(
const creator = new BinaryCommandCreator(command.c, maxChunkSize);
creator.addInteger("i", command.i);
creator.addInteger("s", command.s);
creator.addInteger("r", command.r);
creator.complete();
return creator.getResultBuffers();
}
Expand All @@ -49,6 +51,7 @@ function serializePeerSegmentRequestCommand(
) {
const creator = new BinaryCommandCreator(command.c, maxChunkSize);
creator.addInteger("i", command.i);
creator.addInteger("r", command.r);
if (command.b) creator.addInteger("b", command.b);
creator.complete();
return creator.getResultBuffers();
Expand Down
3 changes: 3 additions & 0 deletions packages/p2p-media-loader-core/src/p2p/commands/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ export type PeerSegmentCommand = BasePeerCommand<
| PeerCommandType.SegmentDataSendingCompleted
> & {
i: number; // segment id
r: number; // request id
};

export type PeerRequestSegmentCommand =
BasePeerCommand<PeerCommandType.SegmentRequest> & {
i: number; // segment id
r: number; // request id
b?: number; // byte from
};

Expand All @@ -34,6 +36,7 @@ export type PeerSegmentAnnouncementCommand =
export type PeerSendSegmentCommand =
BasePeerCommand<PeerCommandType.SegmentData> & {
i: number; // segment id
r: number; // request id
s: number; // size in bytes
};

Expand Down
4 changes: 3 additions & 1 deletion packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ export class P2PLoader {
private onSegmentRequested = async (
peer: Peer,
segmentExternalId: number,
requestId: number,
byteFrom?: number,
) => {
const segment = StreamUtils.getSegmentFromStreamByExternalId(
Expand All @@ -132,11 +133,12 @@ export class P2PLoader {
if (!segment) return;
const segmentData = await this.segmentStorage.getSegmentData(segment);
if (!segmentData) {
peer.sendSegmentAbsentCommand(segmentExternalId);
peer.sendSegmentAbsentCommand(segmentExternalId, requestId);
return;
}
await peer.uploadSegmentData(
segment,
requestId,
byteFrom !== undefined ? segmentData.slice(byteFrom) : segmentData,
);
};
Expand Down
16 changes: 12 additions & 4 deletions packages/p2p-media-loader-core/src/p2p/peer-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export type PeerConfig = Pick<

export class PeerProtocol {
private commandChunks?: Command.BinaryCommandChunksJoiner;
private uploadingContext?: { stopUploading: () => void };
private uploadingContext?: { stopUploading: () => void; requestId: number };
private readonly onChunkDownloaded: CoreEventMap["onChunkDownloaded"];
private readonly onChunkUploaded: CoreEventMap["onChunkUploaded"];

Expand All @@ -39,7 +39,7 @@ export class PeerProtocol {
} else {
this.eventHandlers.onSegmentChunkReceived(data);

this.onChunkDownloaded(data.length, "p2p", this.connection.idUtf8);
this.onChunkDownloaded(data.byteLength, "p2p", this.connection.idUtf8);
}
};

Expand All @@ -49,7 +49,7 @@ export class PeerProtocol {
this.peerConfig.webRtcMaxMessageSize,
);
for (const buffer of binaryCommandBuffers) {
this.connection.send(buffer);
this.connection.write(buffer);
}
}

Expand All @@ -58,7 +58,14 @@ export class PeerProtocol {
this.uploadingContext = undefined;
}

async splitSegmentDataToChunksAndUploadAsync(data: Uint8Array) {
getUploadingRequestId() {
return this.uploadingContext?.requestId;
}

async splitSegmentDataToChunksAndUploadAsync(
data: Uint8Array,
requestId: number,
) {
if (this.uploadingContext) {
throw new Error(`Some segment data is already uploading.`);
}
Expand All @@ -71,6 +78,7 @@ export class PeerProtocol {
stopUploading: () => {
isUploadingSegmentData = false;
},
requestId,
};

this.uploadingContext = uploadingContext;
Expand Down
66 changes: 50 additions & 16 deletions packages/p2p-media-loader-core/src/p2p/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type PeerEventHandlers = {
onSegmentRequested: (
peer: Peer,
segmentId: number,
requestId: number,
byteFrom?: number,
) => void;
onSegmentsAnnouncement: () => void;
Expand All @@ -31,6 +32,7 @@ export class Peer {
request: Request;
controls: RequestControls;
isSegmentDataCommandReceived: boolean;
requestId: number;
};
private loadedSegments = new Set<number>();
private httpLoadingSegments = new Set<number>();
Expand Down Expand Up @@ -91,16 +93,26 @@ export class Peer {

case PeerCommandType.SegmentRequest:
this.peerProtocol.stopUploadingSegmentData();
this.eventHandlers.onSegmentRequested(this, command.i, command.b);
this.eventHandlers.onSegmentRequested(
this,
command.i,
command.r,
command.b,
);
break;

case PeerCommandType.SegmentData:
{
if (!this.downloadingContext) break;
if (this.downloadingContext.isSegmentDataCommandReceived) break;

const { request, controls } = this.downloadingContext;
if (request.segment.externalId !== command.i) break;
const { request, controls, requestId } = this.downloadingContext;
if (
request.segment.externalId !== command.i ||
requestId !== command.r
) {
break;
}

this.downloadingContext.isSegmentDataCommandReceived = true;
controls.firstBytesReceived();
Expand All @@ -109,7 +121,7 @@ export class Peer {
request.setTotalBytes(command.s);
} else if (request.totalBytes - request.loadedBytes !== command.s) {
request.clearLoadedBytes();
this.sendCancelSegmentRequestCommand(request.segment);
this.sendCancelSegmentRequestCommand(request.segment, requestId);
this.cancelSegmentDownloading(
"peer-response-bytes-length-mismatch",
);
Expand All @@ -126,7 +138,8 @@ export class Peer {
const { request, controls } = downloadingContext;

const isWrongSegment =
downloadingContext.request.segment.externalId !== command.i;
downloadingContext.request.segment.externalId !== command.i ||
downloadingContext.requestId !== command.r;

if (isWrongSegment) {
request.clearLoadedBytes();
Expand Down Expand Up @@ -166,13 +179,20 @@ export class Peer {
}

case PeerCommandType.SegmentAbsent:
if (this.downloadingContext?.request.segment.externalId === command.i) {
if (
this.downloadingContext?.request.segment.externalId === command.i &&
this.downloadingContext?.requestId === command.r
) {
this.cancelSegmentDownloading("peer-segment-absent");
this.loadedSegments.delete(command.i);
}
break;

case PeerCommandType.CancelSegmentRequest:
const uploadingRequestId = this.peerProtocol.getUploadingRequestId();

if (uploadingRequestId !== command.r) break;

this.peerProtocol.stopUploadingSegmentData();
break;
}
Expand Down Expand Up @@ -203,6 +223,7 @@ export class Peer {
}
this.downloadingContext = {
request: segmentRequest,
requestId: Math.floor(Math.random() * 1000),
isSegmentDataCommandReceived: false,
controls: segmentRequest.start(
{ downloadSource: "p2p", peerId: this.id },
Expand All @@ -211,9 +232,8 @@ export class Peer {
this.peerConfig.p2pNotReceivingBytesTimeoutMs,
abort: (error) => {
if (!this.downloadingContext) return;
const { request } = this.downloadingContext;

this.sendCancelSegmentRequestCommand(request.segment);
const { request, requestId } = this.downloadingContext;
this.sendCancelSegmentRequestCommand(request.segment, requestId);
this.downloadingErrors.push(error);
this.downloadingContext = undefined;

Expand All @@ -230,26 +250,33 @@ export class Peer {
};
const command: Command.PeerRequestSegmentCommand = {
c: PeerCommandType.SegmentRequest,
r: this.downloadingContext.requestId,
i: segmentRequest.segment.externalId,
};
if (segmentRequest.loadedBytes) command.b = segmentRequest.loadedBytes;
this.peerProtocol.sendCommand(command);
}

async uploadSegmentData(segment: SegmentWithStream, data: ArrayBuffer) {
async uploadSegmentData(
segment: SegmentWithStream,
requestId: number,
data: ArrayBuffer,
) {
const { externalId } = segment;
this.logger(`send segment ${segment.externalId} to ${this.id}`);
const command: Command.PeerSendSegmentCommand = {
c: PeerCommandType.SegmentData,
i: externalId,
r: requestId,
s: data.byteLength,
};
this.peerProtocol.sendCommand(command);
try {
await this.peerProtocol.splitSegmentDataToChunksAndUploadAsync(
data as Uint8Array,
requestId,
);
this.sendSegmentDataSendingCompletedCommand(segment);
this.sendSegmentDataSendingCompletedCommand(segment, requestId);
this.logger(`segment ${externalId} has been sent to ${this.id}`);
} catch (err) {
this.logger(`cancel segment uploading ${externalId}`);
Expand Down Expand Up @@ -279,23 +306,32 @@ export class Peer {
this.peerProtocol.sendCommand(command);
}

sendSegmentAbsentCommand(segmentExternalId: number) {
sendSegmentAbsentCommand(segmentExternalId: number, requestId: number) {
this.peerProtocol.sendCommand({
c: PeerCommandType.SegmentAbsent,
i: segmentExternalId,
r: requestId,
});
}

private sendCancelSegmentRequestCommand(segment: SegmentWithStream) {
private sendCancelSegmentRequestCommand(
segment: SegmentWithStream,
requestId: number,
) {
this.peerProtocol.sendCommand({
c: PeerCommandType.CancelSegmentRequest,
i: segment.externalId,
r: requestId,
});
}

private sendSegmentDataSendingCompletedCommand(segment: SegmentWithStream) {
private sendSegmentDataSendingCompletedCommand(
segment: SegmentWithStream,
requestId: number,
) {
this.peerProtocol.sendCommand({
c: PeerCommandType.SegmentDataSendingCompleted,
r: requestId,
i: segment.externalId,
});
}
Expand All @@ -313,8 +349,6 @@ export class Peer {
this.destroy();
} else if (code === "ERR_CONNECTION_FAILURE") {
this.destroy();
} else if (code === "ERR_CONNECTION_FAILURE") {
this.destroy();
}
};

Expand Down
7 changes: 6 additions & 1 deletion packages/p2p-media-loader-core/src/p2p/tracker-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ type PeerItem = {

type P2PTrackerClientEventHandlers = {
onPeerConnected: (peer: Peer) => void;
onSegmentRequested: (peer: Peer, segmentExternalId: number) => void;
onSegmentRequested: (
peer: Peer,
segmentExternalId: number,
requestId: number,
bytesFrom?: number,
) => void;
onSegmentsAnnouncement: () => void;
};

Expand Down
2 changes: 0 additions & 2 deletions packages/p2p-media-loader-core/src/requests/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ export type RequestStatus =
| "aborted";

export class Request {
readonly id: string;
private currentAttempt?: RequestAttempt;
private _failedAttempts = new FailedRequestAttempts();
private finalData?: ArrayBuffer;
Expand Down Expand Up @@ -86,7 +85,6 @@ export class Request {
this.onSegmentStart = eventTarget.getEventDispatcher("onSegmentStart");
this.onSegmentLoaded = eventTarget.getEventDispatcher("onSegmentLoaded");

this.id = this.segment.runtimeId;
const { byteRange } = this.segment;
if (byteRange) {
const { end, start } = byteRange;
Expand Down
2 changes: 1 addition & 1 deletion packages/p2p-media-loader-core/src/utils/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export type PlaybackTimeWindowsConfig = Pick<
"highDemandTimeWindow" | "httpDownloadTimeWindow" | "p2pDownloadTimeWindow"
>;

const PEER_PROTOCOL_VERSION = "v1";
const PEER_PROTOCOL_VERSION = "v2";

export function getStreamSwarmId(
swarmId: string,
Expand Down

0 comments on commit 6549f70

Please sign in to comment.