Skip to content

Commit

Permalink
Merge branch 'refactor/segmentStorage' into custom-storage-example
Browse files Browse the repository at this point in the history
  • Loading branch information
DimaDemchenko committed Aug 23, 2024
2 parents 496a0c7 + 4fbb299 commit 38dd3e2
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 95 deletions.
21 changes: 16 additions & 5 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
import { BandwidthCalculators, StreamDetails } from "./internal-types.js";
import * as StreamUtils from "./utils/stream.js";
import { BandwidthCalculator } from "./bandwidth-calculator.js";
import { SegmentsMemoryStorage } from "./segments-storage.js";
import { SegmentsMemoryStorage } from "./segments-storage/segments-storage.js";
import { EventTarget } from "./utils/event-target.js";
import {
overrideConfig,
Expand All @@ -30,7 +30,6 @@ import { ISegmentsStorage } from "./segments-storage/segments-storage.interface.
export class Core<TStream extends Stream = Stream> {
/** Default configuration for common core settings. */
static readonly DEFAULT_COMMON_CORE_CONFIG: CommonCoreConfig = {
cachedSegmentExpiration: undefined,
cachedSegmentsCount: 0,
customSegmentStorage: undefined,
};
Expand Down Expand Up @@ -282,10 +281,22 @@ export class Core<TStream extends Stream = Stream> {
throw new Error("Manifest response url is not defined");
}

if (
this.segmentStorage?.isInitialized() &&
this.streamDetails.isLive &&
!(this.segmentStorage instanceof SegmentsMemoryStorage)
) {
this.segmentStorage.destroy();
this.segmentStorage = undefined;
}

if (!this.segmentStorage) {
this.segmentStorage = this.commonCoreConfig.customSegmentStorage
? new this.commonCoreConfig.customSegmentStorage()
: new SegmentsMemoryStorage();
const StorageToUseIfNotLive =
this.commonCoreConfig.customSegmentStorage ?? SegmentsMemoryStorage;

this.segmentStorage = this.streamDetails.isLive
? new SegmentsMemoryStorage()
: new StorageToUseIfNotLive();

await this.segmentStorage.initialize(
this.commonCoreConfig,
Expand Down
2 changes: 1 addition & 1 deletion packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class HybridLoader {
if (!this.segmentStorage.isInitialized()) {
throw new Error("Segment storage is not initialized.");
}
this.segmentStorage.setEngineRequestSegmentDurationCallback(() => {
this.segmentStorage.setLastRequestedSegmentDurationCallback(() => {
return {
startTime: this.lastRequestedSegment.startTime,
endTime: this.lastRequestedSegment.endTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ export interface ISegmentsStorage {

setSegmentPlaybackCallback(getCurrentPlaybackTime: () => number): void;

setEngineRequestSegmentDurationCallback(
getSegmentDurationFromEngineRequest: () => {
setLastRequestedSegmentDurationCallback(
getLastRequestedSegmentDuration: () => {
startTime: number;
endTime: number;
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { CommonCoreConfig, StreamConfig } from "./types.js";
import { CommonCoreConfig, StreamConfig } from "../types.js";
import debug from "debug";
import { EventTarget } from "./utils/event-target.js";
import { ISegmentsStorage } from "./segments-storage/segments-storage.interface.js";
import { EventTarget } from "../utils/event-target.js";
import { ISegmentsStorage } from "./segments-storage.interface.js";

type SegmentDataItem = {
segmentId: number;
Expand All @@ -16,8 +16,8 @@ type StorageEventHandlers = {
[key in `onStorageUpdated-${string}`]: () => void;
};

function getStorageItemId(streamSwarmId: string, externalId: number) {
return `${streamSwarmId}|${externalId}`;
function getStorageItemId(streamId: string, segmentId: number) {
return `${streamId}|${segmentId}`;
}

export class SegmentsMemoryStorage implements ISegmentsStorage {
Expand All @@ -33,7 +33,10 @@ export class SegmentsMemoryStorage implements ISegmentsStorage {
private mainStreamConfig?: StreamConfig;
private secondaryStreamConfig?: StreamConfig;
private getCurrentPlaybackTime?: () => number;
private getSegmentDuration?: () => { startTime: number; endTime: number };
private getLastRequestedSegmentDuration?: () => {
startTime: number;
endTime: number;
};

constructor() {
this.logger = debug("p2pml-core:segment-memory-storage");
Expand Down Expand Up @@ -62,13 +65,13 @@ export class SegmentsMemoryStorage implements ISegmentsStorage {
this.getCurrentPlaybackTime = getCurrentPlaybackTime;
}

setEngineRequestSegmentDurationCallback(
setLastRequestedSegmentDurationCallback(
getSegmentDurationFromEngineRequest: () => {
startTime: number;
endTime: number;
},
) {
this.getSegmentDuration = getSegmentDurationFromEngineRequest;
this.getLastRequestedSegmentDuration = getSegmentDurationFromEngineRequest;
}

// eslint-disable-next-line @typescript-eslint/require-await
Expand All @@ -82,7 +85,6 @@ export class SegmentsMemoryStorage implements ISegmentsStorage {
isLiveStream: boolean,
) {
const storageId = getStorageItemId(streamId, segmentId);

this.cache.set(storageId, {
data,
segmentId,
Expand All @@ -92,7 +94,7 @@ export class SegmentsMemoryStorage implements ISegmentsStorage {
streamType,
});

this.logger(`add segment: ${segmentId}`);
this.logger(`add segment: ${segmentId} to ${streamId}`);
this.dispatchStorageUpdatedEvent(streamId);
void this.clear(isLiveStream);
}
Expand Down Expand Up @@ -130,54 +132,47 @@ export class SegmentsMemoryStorage implements ISegmentsStorage {

// eslint-disable-next-line @typescript-eslint/require-await
private async clear(isLiveStream: boolean): Promise<boolean> {
if (isLiveStream) {
return this.clearLive();
}

return this.clearVOD();
}

subscribeOnUpdate(
streamId: string,
listener: StorageEventHandlers["onStorageUpdated-"],
) {
this.eventTarget.addEventListener(`onStorageUpdated-${streamId}`, listener);
}

unsubscribeFromUpdate(
streamId: string,
listener: StorageEventHandlers["onStorageUpdated-"],
) {
this.eventTarget.removeEventListener(
`onStorageUpdated-${streamId}`,
listener,
);
}

private clearLive() {
if (
!this.getCurrentPlaybackTime ||
!this.mainStreamConfig ||
!this.secondaryStreamConfig
!this.secondaryStreamConfig ||
!this.storageConfig
) {
return false;
}

const currentPlayback = this.getCurrentPlaybackTime();
const affectedStreams = new Set<string>();
const maxStorageCapacity = isLiveStream
? Infinity
: this.getStorageMaxCacheCount();

for (const [itemId, item] of this.cache.entries()) {
const { endTime, streamType, streamId } = item;
if (
!isLiveStream &&
(maxStorageCapacity === 0 || this.cache.size <= maxStorageCapacity)
) {
return false;
}

for (const [storageId, segmentData] of this.cache.entries()) {
const { endTime, streamType, streamId } = segmentData;
const highDemandTimeWindow = this.getHighDemandTimeWindow(streamType);

const isPastHighDemandWindow =
currentPlayback > endTime + highDemandTimeWindow;
let shouldRemove = false;

if (isPastHighDemandWindow) {
this.logger(`remove segment: ${item.segmentId}`);
if (isLiveStream) {
shouldRemove = currentPlayback >= highDemandTimeWindow + endTime;
} else {
const httpDownloadTimeWindow =
this.getHttpDownloadTimeWindow(streamType);
shouldRemove =
currentPlayback >= endTime + httpDownloadTimeWindow * 1.05;
}

if (shouldRemove) {
this.logger(`remove segment: ${segmentData.segmentId}`);
this.cache.delete(storageId);
affectedStreams.add(streamId);
this.cache.delete(itemId);
}
}

Expand All @@ -188,50 +183,52 @@ export class SegmentsMemoryStorage implements ISegmentsStorage {
return affectedStreams.size > 0;
}

private clearVOD() {
if (
!this.getCurrentPlaybackTime ||
!this.mainStreamConfig ||
!this.secondaryStreamConfig ||
!this.storageConfig
) {
return false;
}
subscribeOnUpdate(
streamId: string,
listener: StorageEventHandlers["onStorageUpdated-"],
) {
this.eventTarget.addEventListener(`onStorageUpdated-${streamId}`, listener);
}

const cachedSegmentsCount = this.storageConfig.cachedSegmentsCount;
unsubscribeFromUpdate(
streamId: string,
listener: StorageEventHandlers["onStorageUpdated-"],
) {
this.eventTarget.removeEventListener(
`onStorageUpdated-${streamId}`,
listener,
);
}

private getStorageMaxCacheCount() {
if (
cachedSegmentsCount === 0 ||
this.cache.size + 1 <= cachedSegmentsCount
!this.storageConfig ||
!this.getLastRequestedSegmentDuration ||
!this.mainStreamConfig ||
!this.secondaryStreamConfig
) {
return false;
return 0;
}

const currentPlayback = this.getCurrentPlaybackTime();
const affectedStreams = new Set<string>();

for (const [itemId, item] of this.cache.entries()) {
const { endTime, streamType, streamId } = item;

const httpDownloadTimeWindow = this.getHttpDownloadTimeWindow(streamType);
const highDemandTimeWindow = this.getHighDemandTimeWindow(streamType);
const cachedSegmentsCount = this.storageConfig.cachedSegmentsCount;
if (cachedSegmentsCount === 0) return 0;

const isPastThreshold =
endTime <
currentPlayback - (httpDownloadTimeWindow - highDemandTimeWindow);
const maxHttpTimeWindow =
this.mainStreamConfig.httpDownloadTimeWindow >=
this.secondaryStreamConfig.httpDownloadTimeWindow
? this.mainStreamConfig.httpDownloadTimeWindow
: this.secondaryStreamConfig.httpDownloadTimeWindow;

if (isPastThreshold) {
this.logger(`remove segment: ${item.segmentId}`);
this.cache.delete(itemId);
affectedStreams.add(streamId);
}
}
const { startTime, endTime } = this.getLastRequestedSegmentDuration();
const segmentDuration = endTime - startTime;
const segmentsInTimeWindow = Math.ceil(maxHttpTimeWindow / segmentDuration);

affectedStreams.forEach((stream) =>
this.dispatchStorageUpdatedEvent(stream),
);
const isCachedSegmentCountValid =
cachedSegmentsCount >= segmentsInTimeWindow;

return affectedStreams.size > 0;
return isCachedSegmentCountValid
? cachedSegmentsCount
: segmentsInTimeWindow;
}

private dispatchStorageUpdatedEvent(streamId: string) {
Expand Down
10 changes: 0 additions & 10 deletions packages/p2p-media-loader-core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,6 @@ export type DynamicCoreConfig = Partial<

/** Represents the configuration for the Core functionality that is common to all streams. */
export type CommonCoreConfig = {
/**
* Time after which a cached segment expires, in seconds.
* If set to undefined, the cacheSegmentExpiration is disabled for VOD streams, and a default value (20 minutes) is used for live streams.
*
* @default
* ```typescript
* cachedSegmentExpiration: undefined
* ```
*/
cachedSegmentExpiration?: number;
/**
* Maximum number of segments to store in the cache.
* Has to be less then httpDownloadTimeWindow and p2pDownloadTimeWindow.
Expand Down

0 comments on commit 38dd3e2

Please sign in to comment.