Skip to content

Commit

Permalink
refactor: Update SegmentsStorage interface
Browse files Browse the repository at this point in the history
  • Loading branch information
DimaDemchenko committed Sep 6, 2024
1 parent d0d318c commit 1a0f65a
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 37 deletions.
2 changes: 2 additions & 0 deletions .eslintrc.common.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ module.exports = {
tsx: "never",
},
],

"@typescript-eslint/no-unused-vars": ["warn", { argsIgnorePattern: "^_" }],
},
};
10 changes: 6 additions & 4 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,13 @@ export class Core<TStream extends Stream = Stream> {
}

if (!this.segmentStorage) {
const storageClass = isLive
? (this.commonCoreConfig.liveSegmentsStorage ?? SegmentsMemoryStorage)
: (this.commonCoreConfig.vodSegmentsStorage ?? SegmentsMemoryStorage);
const createCustomStorage = isLive
? this.commonCoreConfig.liveSegmentsStorage
: this.commonCoreConfig.vodSegmentsStorage;

const segmentStorage = new storageClass();
const segmentStorage = createCustomStorage
? createCustomStorage(isLive)
: new SegmentsMemoryStorage();

await segmentStorage.initialize(
this.commonCoreConfig,
Expand Down
45 changes: 31 additions & 14 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,33 @@ export class HybridLoader {
segment.externalId,
segment.startTime,
segment.endTime,
this.config.swarmId ?? this.streamManifestUrl,
stream.type,
this.streamDetails.isLive,
);
const engineRequest = new EngineRequest(segment, callbacks);

const streamSwarmId = StreamUtils.getStreamSwarmId(
this.config.swarmId ?? this.streamManifestUrl,
stream,
);
const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, stream);

try {
if (this.segmentStorage.hasSegment(streamSwarmId, segment.externalId)) {
if (
this.segmentStorage.hasSegment(
streamSwarmId,
segment.externalId,
swarmId,
)
) {
const data = await this.segmentStorage.getSegmentData(
streamSwarmId,
segment.externalId,
swarmId,
);
if (data) {
const { queueDownloadRatio } = this.generateQueue();
engineRequest.resolve(data, this.getBandwidth(queueDownloadRatio));
}
} else {

this.engineRequest = engineRequest;
}
} catch {
Expand Down Expand Up @@ -197,17 +205,16 @@ export class HybridLoader {
}
this.requests.remove(request);

const streamSwarmId = StreamUtils.getStreamSwarmId(
this.config.swarmId ?? this.streamManifestUrl,
stream,
);
const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, stream);

void this.segmentStorage.storeSegment(
streamSwarmId,
segment.externalId,
request.data,
segment.startTime,
segment.endTime,
swarmId,
segment.stream.type,
this.streamDetails.isLive,
);
Expand Down Expand Up @@ -388,15 +395,20 @@ export class HybridLoader {
this.config,
this.p2pLoaders.currentLoader,
)) {
const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(
this.config.swarmId ?? this.streamManifestUrl,
swarmId,
segment.stream,
);

if (
!statuses.isHttpDownloadable ||
statuses.isP2PDownloadable ||
this.segmentStorage.hasSegment(streamSwarmId, segment.externalId)
this.segmentStorage.hasSegment(
streamSwarmId,
segment.externalId,
swarmId,
)
) {
continue;
}
Expand Down Expand Up @@ -490,13 +502,18 @@ export class HybridLoader {
maxPossibleLength++;
const { segment } = item;

const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(
this.config.swarmId ?? this.streamManifestUrl,
swarmId,
segment.stream,
);

if (
this.segmentStorage.hasSegment(streamSwarmId, segment.externalId) ||
this.segmentStorage.hasSegment(
streamSwarmId,
segment.externalId,
swarmId,
) ||
this.requests.get(segment)?.status === "succeed"
) {
alreadyLoadedCount++;
Expand Down
14 changes: 8 additions & 6 deletions packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,16 @@ export class P2PLoader {
}

private getSegmentsAnnouncement() {
const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(
this.config.swarmId ?? this.streamManifestUrl,
this.stream,
);

const loaded: number[] =
this.segmentStorage.getStoredSegmentIds(streamSwarmId);
const loaded: number[] = this.segmentStorage.getStoredSegmentIds(
swarmId,
streamSwarmId,
);
const httpLoading: number[] = [];

for (const request of this.requests.httpRequests()) {
Expand Down Expand Up @@ -137,14 +140,13 @@ export class P2PLoader {
);
if (!segment) return;

const streamSwarmId = StreamUtils.getStreamSwarmId(
this.config.swarmId ?? this.streamManifestUrl,
this.stream,
);
const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, this.stream);

const segmentData = await this.segmentStorage.getSegmentData(
streamSwarmId,
segmentExternalId,
swarmId,
);
if (!segmentData) {
peer.sendSegmentAbsentCommand(segmentExternalId, requestId);
Expand Down
8 changes: 6 additions & 2 deletions packages/p2p-media-loader-core/src/p2p/loaders-container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,15 @@ export class P2PLoadersContainer {
changeCurrentLoader(stream: StreamWithSegments) {
const loaderItem = this.loaders.get(stream.runtimeId);
if (this._currentLoaderItem) {
const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(
this.config.swarmId ?? this.streamManifestUrl,
swarmId,
this._currentLoaderItem.stream,
);
const ids = this.segmentStorage.getStoredSegmentIds(streamSwarmId);
const ids = this.segmentStorage.getStoredSegmentIds(
swarmId,
streamSwarmId,
);
if (!ids.length) this.destroyAndRemoveLoader(this._currentLoaderItem);
else this.setLoaderDestroyTimeout(this._currentLoaderItem);
}
Expand Down
24 changes: 18 additions & 6 deletions packages/p2p-media-loader-core/src/segments-storage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { CommonCoreConfig, StreamConfig, StreamType } from "../types.js";
export interface SegmentsStorage {
/**
* Initializes storage
* @param coreConfig - Storage configuration
* @param coreConfig - Core configuration with storage options
* @param mainStreamConfig - Main stream configuration
* @param secondaryStreamConfig - Secondary stream configuration
*/
Expand All @@ -14,33 +14,40 @@ export interface SegmentsStorage {
): Promise<void>;

/**
* Updates playback position
* Provides playback position from player
* @param position - Playback position
* @param rate - Playback rate
*/
onPlaybackUpdated(position: number, rate: number): void;

/**
* Updates segment request information
* Provides segment request information from player
* @param streamId - Stream identifier
* @param segmentId - Segment identifier
* @param startTime - Segment start time
* @param endTime - Segment end time
* @param swarmId - Swarm identifier
* @param streamType - Stream type
* @param isLiveStream - Is live stream
*/
onSegmentRequested(
streamId: string,
segmentId: number,
startTime: number,
endTime: number,
swarmId: string,
streamType: StreamType,
isLiveStream: boolean,
): void;

/**
* Stores segment data
* Stores segment data
* @param streamId - Stream identifier
* @param segmentId - Segment identifier
* @param data - Segment data
* @param startTime - Segment start time
* @param endTime - Segment end time
* @param swarmId - Swarm identifier
* @param streamType - Stream type
* @param isLiveStream - Is live stream
*/
Expand All @@ -50,6 +57,7 @@ export interface SegmentsStorage {
data: ArrayBuffer,
startTime: number,
endTime: number,
swarmId: string,
streamType: StreamType,
isLiveStream: boolean,
): Promise<void>;
Expand All @@ -58,24 +66,28 @@ export interface SegmentsStorage {
* Returns segment data
* @param streamId - Stream identifier
* @param segmentId - Segment identifier
* @param swarmId - Swarm identifier
*/
getSegmentData(
streamId: string,
segmentId: number,
swarmId: string,
): Promise<ArrayBuffer | undefined>;

/**
* Returns true if segment is in storage
* @param streamId - Stream identifier
* @param segmentId - Segment identifier
* @param swarmId - Swarm identifier
*/
hasSegment(streamId: string, segmentId: number): boolean;
hasSegment(streamId: string, segmentId: number, swarmId: string): boolean;

/**
* Returns segment IDs of a stream that are stored in the storage
* @param streamId - Stream identifier
* @param swarmId - Swarm identifier
*/
getStoredSegmentIds(streamId: string): number[];
getStoredSegmentIds(streamId: string, swarmId: string): number[];

/**
* Function to subscribe on stream updates
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CommonCoreConfig, StreamConfig } from "../types.js";
import { CommonCoreConfig, StreamConfig, StreamType } from "../types.js";
import debug from "debug";
import { EventTarget } from "../utils/event-target.js";
import { SegmentsStorage } from "./index.js";
Expand Down Expand Up @@ -69,6 +69,9 @@ export class SegmentsMemoryStorage implements SegmentsStorage {
segmentId: number,
startTime: number,
endTime: number,
_swarmId: string,
_streamType: StreamType,
_isLiveStream: boolean,
): void {
this.lastRequestedSegment = {
streamId,
Expand All @@ -85,7 +88,8 @@ export class SegmentsMemoryStorage implements SegmentsStorage {
data: ArrayBuffer,
startTime: number,
endTime: number,
streamType: string,
_swarmId: string,
streamType: StreamType,
isLiveStream: boolean,
) {
const storageId = getStorageItemId(streamId, segmentId);
Expand All @@ -107,6 +111,7 @@ export class SegmentsMemoryStorage implements SegmentsStorage {
async getSegmentData(
streamId: string,
segmentId: number,
_swarmId: string,
): Promise<ArrayBuffer | undefined> {
const segmentStorageId = getStorageItemId(streamId, segmentId);
const dataItem = this.cache.get(segmentStorageId);
Expand All @@ -116,7 +121,7 @@ export class SegmentsMemoryStorage implements SegmentsStorage {
return dataItem.data;
}

hasSegment(streamId: string, externalId: number): boolean {
hasSegment(streamId: string, externalId: number, _swarmId: string): boolean {
const segmentStorageId = getStorageItemId(streamId, externalId);
const segment = this.cache.get(segmentStorageId);

Expand Down
4 changes: 2 additions & 2 deletions packages/p2p-media-loader-core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export type CommonCoreConfig = {
* vodSegmentsStorage: undefined
* ```
*/
vodSegmentsStorage?: new () => SegmentsStorage;
vodSegmentsStorage?: (isLive: boolean) => SegmentsStorage;

/**
* Custom storage class for live segments.
Expand All @@ -144,7 +144,7 @@ export type CommonCoreConfig = {
* liveSegmentsStorage: undefined
* ```
*/
liveSegmentsStorage?: new () => SegmentsStorage;
liveSegmentsStorage?: (isLive: boolean) => SegmentsStorage;
};

/**
Expand Down

0 comments on commit 1a0f65a

Please sign in to comment.