Skip to content

Commit

Permalink
Refactor: segment-storage (#410)
Browse files Browse the repository at this point in the history
* refactor: segment-storage.ts

* refactor: optimize segment storage deletion logic

* refactor: Implemented segments-storage interface

* refactor: update segment locking logic in HybridLoader

* refactor: hasSegment function

* refactor: Remove unused segment storage related code

* refactor: Update segment storage initialization

* refactor: P2P configuration

* refactor: Custom segment storage handling

* refactor: Remove async keyword from destroy method in segments-storage.interface.ts

* fix: lint error

* refactor: Update segment storage initialization and handling

* refactor: Segments storage clear logic

* refactor: segments-storage-interface

* refactor: Files structure

* refactor: Improve clear segments storage logic

* docs: Add ISegmentStorage docs

* refactor: Improve stream time window handling in SegmentsMemoryStorage

* refactor: segments-storage interface

* refactor: Update initialize segment storage logic

* refactor: Update SegmentsStorage interface

* refactor: Added validation of customSegmentStorage from config

* refactor: Swap func params in correct order

* refactor: Update segment storage classes and interfaces

* fix: imports

* refactor: Naming

* refactor: Improve segment storage event handling

* refactor: Optimize segment memory storage

- Improve segment storage event handling
- Update segment storage classes and interfaces
- Swap function parameters in correct order
- Set memory storage limit based on user agent
- Clear segments based on memory storage limit

* refactor: Optimize segment memory storage and update segment storage classes and interfaces

- Refactored the segment-memory-storage.ts file to optimize the memory storage of segments.
- Updated the segment storage classes and interfaces to improve performance and efficiency.

* refactor: Update segment memory storage limit configuration

- Change the `segmentsMemoryStorageLimit` configuration in the `Core` class to allow for an undefined value, instead of a specific number. This provides more flexibility in managing the memory storage limit for segments.

- Update the `CommonCoreConfig` type definition in the `types.ts` file to reflect the change in the `segmentsMemoryStorageLimit` property.

* refactor: Add segment categories in clear logic

This commit optimizes the segment memory storage by introducing segment storage categories. The new SegmentCategories type is added to classify segments into different categories such as obsolete, beyondHalfHttpWindowBehind, behindPlayback, and aheadHttpWindow. The segment removal logic is updated to use these categories for better organization and efficiency.

* refactor: Update segment memory storage limit description

* refactor: Simplify segment memory storage limit configuration

* refactor: Simplify segment memory storage limit configuration and optimize segment memory storage

* refactor: Improve clear logic and added getAvailableSpace func

* refactor: Simplify segment memory storage limit configuration and optimize segment memory storage

- Added a new function getAvailableMemoryPercent() to calculate the available memory percentage.
- Updated the generateQueue() function to pass the available memory percentage to QueueUtils.generateQueue().
- Modified the getUsedMemory() function in SegmentMemoryStorage to return the memory limit and memory used.
- Updated the getSegmentPlaybackStatuses() function in utils/stream.ts to calculate the time windows based on the available memory percentage.

* refactor: Disable random http downloads if memory storage is running out of memory

* refactor: Clear logic

* Revert "refactor: Clear logic"

This reverts commit 8a631e7.

* refactor: Improve segment memory storage and clear logic

* refactor: Improve segment memory storage

* refactor: Naming

* refactor: Improve segment memory storage interface and getUsage() logic

* Refactor segment-memory-storage.ts: Swap parameters in getStoredSegmentIds()

* refactor: Swap parameters in getSegmentData()

* refactor: Update setSegmentChangeCallback parameter name
  • Loading branch information
DimaDemchenko authored Sep 26, 2024
1 parent fb5300b commit 21a030f
Show file tree
Hide file tree
Showing 13 changed files with 622 additions and 260 deletions.
1 change: 1 addition & 0 deletions .eslintrc.common.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ module.exports = {
tsx: "never",
},
],
"@typescript-eslint/no-unused-vars": ["warn", { argsIgnorePattern: "^_" }],
},
};
44 changes: 33 additions & 11 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
import { BandwidthCalculators, StreamDetails } from "./internal-types.js";
import * as StreamUtils from "./utils/stream.js";
import { BandwidthCalculator } from "./bandwidth-calculator.js";
import { SegmentsMemoryStorage } from "./segments-storage.js";
import { SegmentMemoryStorage } from "./segment-storage/segment-memory-storage.js";
import { EventTarget } from "./utils/event-target.js";
import {
overrideConfig,
Expand All @@ -24,13 +24,14 @@ import {
filterUndefinedProps,
} from "./utils/utils.js";
import { TRACKER_CLIENT_VERSION_PREFIX } from "./utils/peer.js";
import { SegmentStorage } from "./segment-storage/index.js";

/** Core class for managing media streams loading via P2P. */
export class Core<TStream extends Stream = Stream> {
/** Default configuration for common core settings. */
static readonly DEFAULT_COMMON_CORE_CONFIG: CommonCoreConfig = {
cachedSegmentExpiration: undefined,
cachedSegmentsCount: 0,
segmentMemoryStorageLimit: undefined,
customSegmentStorageFactory: undefined,
};

/** Default configuration for stream settings. */
Expand Down Expand Up @@ -74,7 +75,7 @@ export class Core<TStream extends Stream = Stream> {
all: new BandwidthCalculator(),
http: new BandwidthCalculator(),
};
private segmentStorage?: SegmentsMemoryStorage;
private segmentStorage?: SegmentStorage;
private mainStreamLoader?: HybridLoader;
private secondaryStreamLoader?: HybridLoader;
private streamDetails: StreamDetails = {
Expand Down Expand Up @@ -280,10 +281,7 @@ export class Core<TStream extends Stream = Stream> {
throw new Error("Manifest response url is not defined");
}

if (!this.segmentStorage) {
this.segmentStorage = new SegmentsMemoryStorage(this.commonCoreConfig);
await this.segmentStorage.initialize();
}
await this.initializeSegmentStorage();

const segment = this.identifySegment(segmentRuntimeId);

Expand Down Expand Up @@ -327,7 +325,7 @@ export class Core<TStream extends Stream = Stream> {
}

/**
* Updates the 'isLive' status of the stream.
* Updates the 'isLive' status of the stream
*
* @param isLive - Boolean indicating whether the stream is live.
*/
Expand Down Expand Up @@ -372,14 +370,38 @@ export class Core<TStream extends Stream = Stream> {
this.streams.clear();
this.mainStreamLoader?.destroy();
this.secondaryStreamLoader?.destroy();
void this.segmentStorage?.destroy();
this.segmentStorage?.destroy();
this.mainStreamLoader = undefined;
this.secondaryStreamLoader = undefined;
this.segmentStorage = undefined;
this.manifestResponseUrl = undefined;
this.streamDetails = { isLive: false, activeLevelBitrate: 0 };
}

private async initializeSegmentStorage() {
if (this.segmentStorage) return;

const isLive = this.streamDetails.isLive;
const createCustomStorage =
this.commonCoreConfig.customSegmentStorageFactory;

if (createCustomStorage && typeof createCustomStorage !== "function") {
throw new Error("Storage configuration is invalid");
}

const segmentStorage = createCustomStorage
? createCustomStorage(isLive)
: new SegmentMemoryStorage();

await segmentStorage.initialize(
this.commonCoreConfig,
this.mainStreamConfig,
this.secondaryStreamConfig,
);

this.segmentStorage = segmentStorage;
}

private identifySegment(segmentRuntimeId: string): SegmentWithStream {
if (!this.manifestResponseUrl) {
throw new Error("Manifest response url is undefined");
Expand Down Expand Up @@ -439,7 +461,7 @@ export class Core<TStream extends Stream = Stream> {
throw new Error("Manifest response url is not defined");
}

if (!this.segmentStorage?.isInitialized) {
if (!this.segmentStorage) {
throw new Error("Segment storage is not initialized");
}

Expand Down
110 changes: 88 additions & 22 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { HttpRequestExecutor } from "./http-loader.js";
import { SegmentsMemoryStorage } from "./segments-storage.js";
import {
CoreEventMap,
EngineCallbacks,
Expand All @@ -22,6 +21,7 @@ import * as Utils from "./utils/utils.js";
import debug from "debug";
import { QueueItem } from "./utils/queue.js";
import { EventTarget } from "./utils/event-target.js";
import { SegmentStorage } from "./segment-storage/index.js";

const FAILED_ATTEMPTS_CLEAR_INTERVAL = 60000;
const PEER_UPDATE_LATENCY = 1000;
Expand All @@ -45,7 +45,7 @@ export class HybridLoader {
private readonly streamDetails: Required<Readonly<StreamDetails>>,
private readonly config: StreamConfig,
private readonly bandwidthCalculators: BandwidthCalculators,
private readonly segmentStorage: SegmentsMemoryStorage,
private readonly segmentStorage: SegmentStorage,
private readonly eventTarget: EventTarget<CoreEventMap>,
) {
const activeStream = this.lastRequestedSegment.stream;
Expand All @@ -59,17 +59,10 @@ export class HybridLoader {
this.eventTarget,
);

if (!this.segmentStorage.isInitialized) {
if (!this.segmentStorage) {
throw new Error("Segment storage is not initialized.");
}
this.segmentStorage.addIsSegmentLockedPredicate((segment) => {
if (segment.stream !== activeStream) return false;
return StreamUtils.isSegmentActualInPlayback(
segment,
this.playback,
this.config,
);
});

this.p2pLoaders = new P2PLoadersContainer(
this.streamManifestUrl,
this.lastRequestedSegment.stream,
Expand Down Expand Up @@ -109,18 +102,46 @@ export class HybridLoader {
}
this.lastRequestedSegment = segment;

const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, stream);

this.segmentStorage.onSegmentRequested(
swarmId,
streamSwarmId,
segment.externalId,
segment.startTime,
segment.endTime,
stream.type,
this.streamDetails.isLive,
);
const engineRequest = new EngineRequest(segment, callbacks);
if (this.segmentStorage.hasSegment(segment)) {
// TODO: error handling
const data = await this.segmentStorage.getSegmentData(segment);
if (data) {
const { queueDownloadRatio } = this.generateQueue();
engineRequest.resolve(data, this.getBandwidth(queueDownloadRatio));

try {
const hasSegment = this.segmentStorage.hasSegment(
swarmId,
streamSwarmId,
segment.externalId,
);

if (hasSegment) {
const data = await this.segmentStorage.getSegmentData(
swarmId,
streamSwarmId,
segment.externalId,
);
if (data) {
const { queueDownloadRatio } = this.generateQueue();
engineRequest.resolve(data, this.getBandwidth(queueDownloadRatio));
return;
}
}
} else {

this.engineRequest = engineRequest;
} catch {
engineRequest.reject();
} finally {
this.requestProcessQueueMicrotask();
}
this.requestProcessQueueMicrotask();
}

private requestProcessQueueMicrotask = (force = true) => {
Expand Down Expand Up @@ -185,9 +206,18 @@ export class HybridLoader {
this.engineRequest = undefined;
}
this.requests.remove(request);

const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, stream);

void this.segmentStorage.storeSegment(
request.segment,
swarmId,
streamSwarmId,
segment.externalId,
request.data,
segment.startTime,
segment.endTime,
segment.stream.type,
this.streamDetails.isLive,
);
break;
Expand Down Expand Up @@ -350,6 +380,10 @@ export class HybridLoader {
}

private loadRandomThroughHttp() {
const availableStorageCapacityPercent =
this.getAvailableStorageCapacityPercent();
if (availableStorageCapacityPercent <= 10) return;

const { simultaneousHttpDownloads, httpErrorRetries } = this.config;
const p2pLoader = this.p2pLoaders.currentLoader;

Expand All @@ -366,11 +400,22 @@ export class HybridLoader {
this.playback,
this.config,
this.p2pLoaders.currentLoader,
availableStorageCapacityPercent,
)) {
const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(
swarmId,
segment.stream,
);

if (
!statuses.isHttpDownloadable ||
statuses.isP2PDownloadable ||
this.segmentStorage.hasSegment(segment)
this.segmentStorage.hasSegment(
swarmId,
streamSwarmId,
segment.externalId,
)
) {
continue;
}
Expand Down Expand Up @@ -450,21 +495,41 @@ export class HybridLoader {
return false;
}

private getAvailableStorageCapacityPercent(): number {
const { totalCapacity, usedCapacity } = this.segmentStorage.getUsage();
return 100 - (usedCapacity / totalCapacity) * 100;
}

private generateQueue() {
const queue: QueueItem[] = [];
const queueSegmentIds = new Set<string>();
let maxPossibleLength = 0;
let alreadyLoadedCount = 0;

const availableStorageCapacityPercent =
this.getAvailableStorageCapacityPercent();
for (const item of QueueUtils.generateQueue(
this.lastRequestedSegment,
this.playback,
this.config,
this.p2pLoaders.currentLoader,
availableStorageCapacityPercent,
)) {
maxPossibleLength++;
const { segment } = item;

const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(
swarmId,
segment.stream,
);

if (
this.segmentStorage.hasSegment(segment) ||
this.segmentStorage.hasSegment(
swarmId,
streamSwarmId,
segment.externalId,
) ||
this.requests.get(segment)?.status === "succeed"
) {
alreadyLoadedCount++;
Expand Down Expand Up @@ -534,6 +599,7 @@ export class HybridLoader {
this.logger("position significantly changed");
this.engineRequest?.markAsShouldBeStartedImmediately();
}
this.segmentStorage.onPlaybackUpdated(position, rate);
void this.requestProcessQueueMicrotask(isPositionSignificantlyChanged);
}

Expand Down
1 change: 1 addition & 0 deletions packages/p2p-media-loader-core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { Core } from "./core.js";
export * from "./types.js";
export type { SegmentStorage } from "./segment-storage/index.js";
export { debug } from "debug";
Loading

0 comments on commit 21a030f

Please sign in to comment.