Skip to content

Commit

Permalink
Feat: Segment validation (#336)
Browse files Browse the repository at this point in the history
* segment validation

* improvements

* test func

* error type

* fixed p2p segment validation

* Revert abort callback type

---------

Co-authored-by: Andriy Lysnevych <[email protected]>
  • Loading branch information
DimaDemchenko and mrlika authored Feb 29, 2024
1 parent 051db68 commit f728163
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 21 deletions.
9 changes: 6 additions & 3 deletions packages/p2p-media-loader-core/src/p2p/peer-protocol.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { PeerConnection } from "bittorrent-tracker";
import * as Command from "./commands";
import * as Utils from "../utils/utils";
import { Settings } from "../types";
import * as Utils from "../utils/utils";
import * as Command from "./commands";

export type PeerSettings = Pick<
Settings,
"p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" | "p2pErrorRetries"
| "p2pNotReceivingBytesTimeoutMs"
| "webRtcMaxMessageSize"
| "p2pErrorRetries"
| "validateP2PSegment"
>;

export class PeerProtocol {
Expand Down
46 changes: 34 additions & 12 deletions packages/p2p-media-loader-core/src/p2p/peer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { PeerConnection } from "bittorrent-tracker";
import { PeerProtocol, PeerSettings } from "./peer-protocol";
import debug from "debug";
import {
PeerRequestErrorType,
Request,
RequestControls,
RequestError,
PeerRequestErrorType,
RequestInnerErrorType,
} from "../requests/request";
import * as Command from "./commands";
import { Segment } from "../types";
import * as Utils from "../utils/utils";
import debug from "debug";
import * as Command from "./commands";
import { PeerProtocol, PeerSettings } from "./peer-protocol";

const { PeerCommandType } = Command;
type PeerEventHandlers = {
Expand Down Expand Up @@ -45,6 +45,7 @@ export class Peer {
this.id = Peer.getPeerIdFromConnection(connection);
this.peerProtocol = new PeerProtocol(connection, settings, {
onSegmentChunkReceived: this.onSegmentChunkReceived,
// eslint-disable-next-line @typescript-eslint/no-misused-promises
onCommandReceived: this.onCommandReceived,
});

Expand All @@ -62,7 +63,7 @@ export class Peer {
if (this.httpLoadingSegments.has(externalId)) return "http-loading";
}

private onCommandReceived = (command: Command.PeerCommand) => {
private onCommandReceived = async (command: Command.PeerCommand) => {
switch (command.c) {
case PeerCommandType.SegmentsAnnouncement:
this.loadedSegments = new Set(command.l);
Expand All @@ -85,19 +86,24 @@ export class Peer {
request.setTotalBytes(command.s);
} else if (request.totalBytes - request.loadedBytes !== command.s) {
request.clearLoadedBytes();
this.cancelSegmentDownloading("peer-response-bytes-mismatch");
this.sendCancelSegmentRequestCommand(request.segment);
this.cancelSegmentDownloading(
"peer-response-bytes-length-mismatch",
);
this.destroy();
}
}
break;

case PeerCommandType.SegmentDataSendingCompleted: {
if (!this.downloadingContext?.isSegmentDataCommandReceived) return;
const downloadingContext = this.downloadingContext;

if (!downloadingContext?.isSegmentDataCommandReceived) return;

const { request, controls } = this.downloadingContext;
const { request, controls } = downloadingContext;

const isWrongSegment =
this.downloadingContext.request.segment.externalId !== command.i;
downloadingContext.request.segment.externalId !== command.i;

if (isWrongSegment) {
request.clearLoadedBytes();
Expand All @@ -110,7 +116,22 @@ export class Peer {

if (isWrongBytes) {
request.clearLoadedBytes();
this.cancelSegmentDownloading("peer-response-bytes-mismatch");
this.cancelSegmentDownloading("peer-response-bytes-length-mismatch");
this.destroy();
return;
}

const isValid =
(await this.settings.validateP2PSegment?.(
request.segment.url,
request.segment.byteRange,
)) ?? true;

if (this.downloadingContext !== downloadingContext) return;

if (!isValid) {
request.clearLoadedBytes();
this.cancelSegmentDownloading("p2p-segment-validation-failed");
this.destroy();
return;
}
Expand Down Expand Up @@ -145,7 +166,7 @@ export class Peer {

if (isOverflow) {
request.clearLoadedBytes();
this.cancelSegmentDownloading("peer-response-bytes-mismatch");
this.cancelSegmentDownloading("peer-response-bytes-length-mismatch");
this.destroy();
return;
}
Expand All @@ -168,9 +189,10 @@ export class Peer {
abort: (error) => {
if (!this.downloadingContext) return;
const { request } = this.downloadingContext;

this.sendCancelSegmentRequestCommand(request.segment);
this.downloadingContext = undefined;
this.downloadingErrors.push(error);
this.downloadingContext = undefined;

const timeoutErrors = this.downloadingErrors.filter(
(error) => error.type === "bytes-receiving-timeout",
Expand Down
11 changes: 6 additions & 5 deletions packages/p2p-media-loader-core/src/requests/request.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Segment, Playback, BandwidthCalculators } from "../types";
import debug from "debug";
import { BandwidthCalculators, Playback, Segment } from "../types";
import * as LoggerUtils from "../utils/logger";
import * as StreamUtils from "../utils/stream";
import * as Utils from "../utils/utils";
import * as LoggerUtils from "../utils/logger";
import debug from "debug";

export type LoadProgress = {
startTimestamp: number;
Expand Down Expand Up @@ -323,10 +323,11 @@ export type HttpRequestErrorType =
| "http-unexpected-status-code";

export type PeerRequestErrorType =
| "peer-response-bytes-mismatch"
| "peer-response-bytes-length-mismatch"
| "peer-protocol-violation"
| "peer-segment-absent"
| "peer-closed";
| "peer-closed"
| "p2p-segment-validation-failed";

type RequestErrorType =
| RequestInnerErrorType
Expand Down
3 changes: 2 additions & 1 deletion packages/p2p-media-loader-core/src/types.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RequestAttempt } from "./requests/request";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { RequestAttempt } from "./requests/request";

export type StreamType = "main" | "secondary";

Expand Down Expand Up @@ -53,6 +53,7 @@ export type Settings = {
httpNotReceivingBytesTimeoutMs: number;
httpErrorRetries: number;
p2pErrorRetries: number;
validateP2PSegment?: (url: string, byteRange?: ByteRange) => Promise<boolean>;
};

export type CoreEventHandlers = {
Expand Down

0 comments on commit f728163

Please sign in to comment.