Skip to content

Commit

Permalink
Add flv (websocket) protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
nzws committed May 31, 2024
1 parent bed67fd commit 41cd74f
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 37 deletions.
92 changes: 80 additions & 12 deletions apps/push-serverless/src/streaming/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Server } from 'http';
import { WebSocketServer } from 'ws';
import type WebSocket from 'ws';
import WebSocket from 'ws';
import { pathToRegexp } from 'path-to-regexp';
import { Encoder } from '../services/encoder';
import { Readable } from 'stream';
Expand All @@ -9,6 +9,9 @@ import { checkToken } from '../utils/api';
const streamPushRegexp = pathToRegexp(
'/api/externals/websocket/v1/stream-push/:liveId'
);
const streamWatchRegexp = pathToRegexp(
'/api/externals/websocket/v1/stream-watch/:liveId'
);

export class Streaming {
constructor(server: Server) {
Expand All @@ -17,31 +20,53 @@ export class Streaming {
});

ws.on('connection', (socket, req) => {
try {
console.log('websocket connected', req.url);

if (req.url) {
void this.handleConnection(socket, req.url);
void (async () => {
try {
console.log('websocket connected', req.url);

if (req.url) {
await this.handleConnection(socket, req.url);
}
} catch (e) {
console.warn(e);
socket.close();
}
} catch (e) {
console.warn(e);
socket.close();
}
})();
});
}

private handleConnection(socket: WebSocket, Url: string) {
private async handleConnection(socket: WebSocket, Url: string) {
const url = new URL(Url, process.env.SERVER_ENDPOINT);
const watchToken = url.searchParams.get('watchToken') || undefined;
const token = url.searchParams.get('token') || undefined;

const streamPush = streamPushRegexp.exec(url.pathname);
const streamWatch = streamWatchRegexp.exec(url.pathname);

if (streamPush) {
const liveId = Number(streamPush[1]);
if (!token || !watchToken) {
return this.closeConnection(socket, 'Token is required');
}
return this.handleV1Push(socket, liveId, watchToken, token);
try {
await this.handleV1Push(socket, liveId, watchToken, token);
return;
} catch (e) {
console.error(e);
return this.closeConnection(socket, 'An error occurred');
}
} else if (streamWatch) {
const liveId = Number(streamWatch[1]);
if (!token || !watchToken) {
return this.closeConnection(socket, 'Token is required');
}
try {
await this.handleV1Watch(socket, liveId, watchToken, token);
return;
} catch (e) {
console.error(e);
return this.closeConnection(socket, 'An error occurred');
}
}

return this.closeConnection(socket, 'invalid_path');
Expand Down Expand Up @@ -94,6 +119,49 @@ export class Streaming {
socket.send(JSON.stringify({ type: 'ready' }));
}

private async handleV1Watch(
socket: WebSocket,
liveId: number,
watchToken: string,
token: string
) {
const url = `http://localhost:8080/streaming/live/${liveId}_${watchToken}.flv?token=${token}`;
console.log('websocket connected', liveId);
const response = await fetch(url);

if (!response.ok) {
return this.closeConnection(
socket,
`Failed to connect: ${response.status}`
);
}

const reader = response.body?.getReader();
if (!reader) {
return this.closeConnection(socket, 'Failed to get reader');
}

let success = false;
while (!success) {
const { done, value } = await reader.read();
if (done || socket.readyState !== WebSocket.OPEN) {
success = true;
socket.close();
break;
}

socket.send(value, err => {
if (err) {
console.error('[handleV1Watch] failed to send data', err);
this.closeConnection(socket, 'Failed to send data');
success = true;
}
});
}

console.log('websocket closed', liveId);
}

private closeConnection(socket: WebSocket, error?: string) {
if (error) {
socket.send(JSON.stringify({ error }));
Expand Down
5 changes: 3 additions & 2 deletions apps/server/src/controllers/v1/lives/get-url.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Methods } from 'api-types/api/v1/lives/_liveId@number/url';
import { lives } from '../../../models';
import { jwtEdge } from '../../../services/jwt';
import { basePushPlay } from '../../../utils/constants';
import { basePushPlay, basePushPlayWs } from '../../../utils/constants';
import { APIRoute, LiveState } from '../../../utils/types';

type Response = Methods['get']['resBody'];
Expand Down Expand Up @@ -45,7 +45,8 @@ export const getV1LivesUrl: APIRoute<
}

ctx.body = {
flv: `${basePushPlay}/streaming/live/${live.id}_${live.watchToken}.flv?token=${token}`,
flvWs: `${basePushPlayWs}/api/externals/websocket/v1/stream-watch/${live.id}?watchToken=${live.watchToken}&token=${token}`,
flvHttp: `${basePushPlay}/streaming/live/${live.id}_${live.watchToken}.flv?token=${token}`,
hlsHq: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/high/stream.m3u8?token=${token}`,
hlsLq: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/low/stream.m3u8?token=${token}`,
audio: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/audio/stream.m3u8?token=${token}`
Expand Down
5 changes: 5 additions & 0 deletions apps/server/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ export const REDIS_CONNECTION = {
};

export const PROTOCOL = `http${process.env.USE_HTTP ? '' : 's'}`;
export const PROTOCOL_WS = `ws${process.env.USE_HTTP ? '' : 's'}`;

export const basePushStream = `${PROTOCOL}://${process.env.PUSH_DOMAIN || ''}`;
export const enableVideo = !!process.env.VIDEO_DOMAIN;
export const baseVideoStream = `${PROTOCOL}://${
Expand All @@ -15,6 +17,9 @@ export const baseVideoStream = `${PROTOCOL}://${
export const basePushPlay = `${PROTOCOL}://${
process.env.PUSH_CDN_DOMAIN || process.env.PUSH_DOMAIN || ''
}`;
export const basePushPlayWs = `${PROTOCOL_WS}://${
process.env.PUSH_CDN_DOMAIN || process.env.PUSH_DOMAIN || ''
}`;
export const baseVideoPlay = `${PROTOCOL}://${
process.env.VIDEO_CDN_DOMAIN || process.env.VIDEO_DOMAIN || ''
}`;
Expand Down
6 changes: 4 additions & 2 deletions apps/web/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@
"video.cache-deleted": "The video cannot be played because the cache has been deleted. The streamer can restore the recording from the live history.",
"video.recording-deleted": "This live was not recorded.",
"live.player.settings.type": "Change streaming method",
"live.player.settings.type.flv.title": "Source quality",
"live.player.settings.type.flv.note": "Ultra-Low latency (iOS is not supported)",
"live.player.settings.type.flvWs.title": "Source quality",
"live.player.settings.type.flvWs.note": "Ultra-Low latency (iOS is not supported) (Beta)",
"live.player.settings.type.flvHttp.title": "Source quality",
"live.player.settings.type.flvHttp.note": "Low latency (iOS is not supported)",
"live.player.settings.type.hlsHq.title": "High quality",
"live.player.settings.type.hlsHq.note": "Low latency (For iOS)",
"live.player.settings.type.hlsLq.title": "Low quality",
Expand Down
8 changes: 5 additions & 3 deletions apps/web/locales/ja.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,14 @@
"video.cache-deleted": "キャッシュが削除されたため動画は再生できません。配信履歴より復活させる事ができます。",
"video.recording-deleted": "この配信の録画はありません。",
"live.player.settings.type": "再生方式を変更",
"live.player.settings.type.flv.title": "ソース画質",
"live.player.settings.type.flv.note": "超低遅延 (iOS 非対応)",
"live.player.settings.type.flvWs.title": "ソース画質",
"live.player.settings.type.flvWs.note": "超低遅延 (iOS 非対応) (Beta)",
"live.player.settings.type.flvHttp.title": "ソース画質",
"live.player.settings.type.flvHttp.note": "低遅延 (iOS 非対応)",
"live.player.settings.type.hlsHq.title": "高画質",
"live.player.settings.type.hlsHq.note": "低遅延 (iOS 向け)",
"live.player.settings.type.hlsLq.title": "低画質",
"live.player.settings.type.hlsLq.note": "携帯回線向け",
"live.player.settings.type.hlsLq.note": "安定性重視モード (携帯回線向け)",
"live.player.settings.type.audio.title": "音声のみ",
"live.player.settings.type.audio.note": "バックグラウンド再生向け",
"video.player.settings.type.hlsHq.title": "高画質",
Expand Down
18 changes: 12 additions & 6 deletions apps/web/organisms/live/video/controller.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,18 @@ export const Controller: FC<Props> = props => {
>
{(isLive ? livePlayTypes : videoPlayTypes).map(playType => (
<MenuItemOption value={playType.id} key={playType.id}>
{playType.badge && (
<Badge mr="1">{playType.badge}</Badge>
)}

<FormattedMessage
id={`${
isLive ? 'live' : 'video'
}.player.settings.type.${playType.id}.title`}
/>

{playType.badge && (
<Badge ml="1" mb="1">
{playType.badge}
</Badge>
)}

<Text fontSize="xs" color="gray.500">
<FormattedMessage
id={`${
Expand Down Expand Up @@ -297,8 +299,12 @@ const Container = styled.div`

const livePlayTypes: { id: LivePlayType; badge?: string }[] = [
{
id: LivePlayType.Flv,
badge: 'FLV'
id: LivePlayType.FlvWs,
badge: 'FLV-WS'
},
{
id: LivePlayType.FlvHttp,
badge: 'FLV-HTTP'
},
{
id: LivePlayType.HlsHq,
Expand Down
39 changes: 28 additions & 11 deletions apps/web/utils/hooks/use-video-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ import { VideoUrls } from 'api-types/api/v1/videos/_liveId@number';
import type { HlsConfig } from 'hls.js';

export enum LivePlayType {
Flv = 'flv',
FlvHttp = 'flvHttp',
FlvWs = 'flvWs',
HlsHq = 'hlsHq',
HlsLq = 'hlsLq',
Audio = 'audio'
}

const LowLatency = [LivePlayType.Flv, LivePlayType.HlsHq, LivePlayType.Audio];
const LowLatency = [
LivePlayType.FlvHttp,
LivePlayType.FlvWs,
LivePlayType.HlsHq,
LivePlayType.Audio
];

export enum VideoPlayType {
HlsHq = 'hlsHq'
Expand Down Expand Up @@ -42,7 +48,7 @@ export const useVideoStream = <T extends 'live' | 'video'>(
}, [videoTagRef]);

const handleFlv = useCallback(
async (url: string) => {
async (playType: LivePlayType, url: string) => {
if (!videoTagRef.current) {
return;
}
Expand All @@ -61,7 +67,7 @@ export const useVideoStream = <T extends 'live' | 'video'>(

const player = Mpegts.createPlayer(
{
type: 'flv',
type: playType === LivePlayType.FlvWs ? 'mse' : 'flv',
isLive: true,
url
},
Expand Down Expand Up @@ -146,6 +152,10 @@ export const useVideoStream = <T extends 'live' | 'video'>(
player.on(Hls.Events.ERROR, (event, data) => {
console.warn(event, data);

if (!data.fatal) {
return;
}

switch (data.type) {
case Hls.ErrorTypes.NETWORK_ERROR: {
if (data.details === Hls.ErrorDetails.MANIFEST_LOAD_ERROR) {
Expand All @@ -164,12 +174,9 @@ export const useVideoStream = <T extends 'live' | 'video'>(
setTimeout(() => {
player.recoverMediaError();
}, 1000);

break;
default:
if (data.fatal) {
player.destroy();
}
player.destroy();
break;
}
});
Expand All @@ -196,10 +203,20 @@ export const useVideoStream = <T extends 'live' | 'video'>(

if (entityType === 'live') {
const live = url as LiveUrls;
if (playType === LivePlayType.Flv) {
if (playType === LivePlayType.FlvHttp) {
void (async () => {
try {
await handleFlv(playType, live.flvHttp);
} catch (e) {
if (e instanceof FlvNotSupportedError) {
setPlayType(LivePlayType.HlsHq as PlayType<T>);
}
}
})();
} else if (playType === LivePlayType.FlvWs) {
void (async () => {
try {
await handleFlv(live.flv);
await handleFlv(playType, live.flvWs);
} catch (e) {
if (e instanceof FlvNotSupportedError) {
setPlayType(LivePlayType.HlsHq as PlayType<T>);
Expand All @@ -213,7 +230,7 @@ export const useVideoStream = <T extends 'live' | 'video'>(
} else if (playType === LivePlayType.Audio) {
void handleHls(live.audio, playType, true);
} else {
setPlayType(LivePlayType.Flv as PlayType<T>);
setPlayType(LivePlayType.FlvWs as PlayType<T>);
}
} else {
const video = url as VideoUrls;
Expand Down
3 changes: 2 additions & 1 deletion packages/api-types/api/v1/lives/_liveId@number/url/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { AuthorizationHeader } from '../../../../../common/types';

export type LiveUrls = {
flv: string;
flvWs: string;
flvHttp: string;
hlsHq: string;
hlsLq: string;
audio: string;
Expand Down

0 comments on commit 41cd74f

Please sign in to comment.