Skip to content

Commit

Permalink
Tweak
Browse files Browse the repository at this point in the history
  • Loading branch information
nzws committed May 15, 2024
1 parent 2d35df9 commit 8fe5555
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 36 deletions.
2 changes: 1 addition & 1 deletion apps/push-serverless/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RUN apt-get update &&\
curl -fsSL https://deb.nodesource.com/setup_20.x | bash - &&\
# Caddy
curl -1sLf 'https://dl.cloudsmith.io/public/caddy/stable/gpg.key' | gpg --dearmor -o /usr/share/keyrings/caddy-stable-archive-keyring.gpg &&\
curl -1sLf 'https://dl.cloudsmith.io/public/caddy/stable/debian.deb.txt' | tee /etc/apt/sources.list.d/caddy-stable.list &&\
curl -1sLf 'https://dl.cloudsmith.io/public/caddy/stable/debian.deb.txt' | tee /etc/apt/sources.list.d/caddy-stable.list &&\
apt-get update &&\
apt-get install -y nodejs caddy &&\
apt-get clean &&\
Expand Down
62 changes: 60 additions & 2 deletions apps/push-serverless/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,61 @@
# knzklive2/push-serverless
# knzklive2/push-serverless (push v3)

> (Super experimental) KnzkLive (Serverless) Push Agent
> (Super experimental) KnzkLive: Serverless Push Service
⚠ This is a experimental service and sometimes it is exploded.

## Concepts

- 主に fly.io 上で動作させる事を目的としたサーバーレスプッシュサービスの実験です
- プッシュサーバーに必要なプロセスを一つのイメージに全て詰め込み、無理やり一つの Firecracker VM 上で動作させます
- 使用者がいないときはゼロスケールしインフラコストを極限まで抑えることに焦点を当てています
- Fly Proxy によって TLS 接続を生の TCP に変換させる事で、間接的に RTMPS に対応します

## Spec

- 以下のプロセスが一つの Firecracker VM 内で動作します
- Caddy - 静的ファイルの配信とリバースプロキシ
- SRS - RTMP サーバー
- KnzkLive Push Agent - 制御基盤
- FFmpeg - ビデオエンコーダ

```mermaid
graph LR
api["KnzkLive API"]
caddy["Caddy"]
srs["SRS"]
agent["KnzkLive Push Agent"]
ffmpeg["FFmpeg"]
storage["ストレージ"]
streamer(["配信者"])
viewer(["視聴者"])
streamer -- RTMP(S) 映像 --> srs
streamer -- WebSocket 映像 (ブラウザ配信) --> agent
agent <-- 状態制御/認証 --> api
subgraph KnzkLive: Serverless Push Service
agent -- WebSocket 映像 --> ffmpeg
ffmpeg -- RTMP 映像 --> srs
srs <-- 認証 --> agent
agent -- エンコーダ制御 --> ffmpeg
srs -- RTMP 映像 --> ffmpeg
ffmpeg -- HLS --> storage
caddy --> storage
caddy --> srs
end
caddy -- FLV (低遅延) --> viewer
caddy -- HLS --> viewer
```

## Limitations

- 水平方向へのスケーリングは行えません
- 現状の設計だと根本的にできないので、v4 作るときは設計からやり直したさがある
6 changes: 5 additions & 1 deletion apps/push-serverless/src/controllers/externals/thumbnail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ export const apiExternalThumbnail: Middleware = async ctx => {

try {
const buffer = await readFile(path);
await fetch(body.signedUploadUrl, {
const res = await fetch(body.signedUploadUrl, {
method: 'PUT',
body: buffer
});
console.log('Uploaded thumbnail', res.status, await res.text());
if (!res.ok) {
throw new Error('Failed to upload thumbnail');
}
} catch (e) {
ctx.status = 500;
ctx.body = {
Expand Down
1 change: 1 addition & 0 deletions apps/push-serverless/src/services/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class Action {
});

setTimeout(() => {
void encoder.encodeToHighQualityHls();
void encoder.encodeToLowQualityHls();
void encoder.encodeToAudio();

Expand Down
50 changes: 50 additions & 0 deletions apps/push-serverless/src/services/encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,56 @@ export class Encoder {
await rm(this.persistentDir, { recursive: true });
}

async encodeToHighQualityHls() {
if (this.streams.find(s => s.type === 'high')) {
return;
}

const idx = Math.round(Date.now() / 1000);
const path = await this.cleanupDirectory('high');

const stream = ffmpeg(this.rtmp)
.audioCodec('copy')
.videoCodec('copy')
.autopad()
.format('hls')
.outputOptions([
'-g 30',
'-hls_time 1',
'-hls_list_size 10',
'-hls_flags delete_segments+omit_endlist',
`-hls_segment_filename ${path}/${idx}-%d.ts`
])
.output(`${path}/stream.m3u8`)
.inputOptions(['-re', '-preset', 'ultrafast', '-tune', 'zerolatency']);

stream.on('start', (cmd: string) => {
console.log('Start HQ-HLS', this.liveId, cmd);
});

stream.on('error', err => {
console.warn('Error HQ-HLS', this.liveId, err);
});

stream.on('end', () => {
console.log('End HQ-HLS', this.liveId);
this.streams = this.streams.filter(s => s.stream !== stream);
});

stream.run();
this.streams.push({
type: 'high',
stream,
onRequestClose() {
stream.kill('SIGKILL');

return Promise.resolve();
}
});

return stream;
}

async encodeToLowQualityHls() {
if (this.streams.find(s => s.type === 'low')) {
return;
Expand Down
10 changes: 0 additions & 10 deletions apps/push-serverless/vm/usr/local/srs/conf/knzklive.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,4 @@ vhost __defaultVhost__ {
mount [vhost]/streaming/[app]/[stream].flv;
hstrs on;
}

hls {
enabled on;
hls_fragment 1;
hls_window 10;
hls_wait_keyframe off;
hls_path /tmp/knzklive/static;
hls_m3u8_file [app]/[stream]/source/stream.m3u8;
hls_ts_file [app]/[stream]/source/[seq]-[timestamp].ts;
}
}
2 changes: 1 addition & 1 deletion apps/server/src/controllers/v1/lives/get-url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export const getV1LivesUrl: APIRoute<

ctx.body = {
flv: `${basePushPlay}/streaming/live/${live.id}_${live.watchToken}.flv?token=${token}`,
hlsHq: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/source/stream.m3u8?token=${token}`,
hlsHq: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/high/stream.m3u8?token=${token}`,
hlsLq: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/low/stream.m3u8?token=${token}`,
audio: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/audio/stream.m3u8?token=${token}`
};
Expand Down
7 changes: 4 additions & 3 deletions apps/server/src/controllers/v1/streams/get-url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { Methods } from 'api-types/api/v1/streams/_liveId@number/url';
import { jwtEdge } from '../../../services/jwt';
import {
getPushStreamKey,
getPushUrl,
getPushWebsocketUrl
getPushWebsocketUrl,
pushDomain
} from '../../../utils/constants';
import { APIRoute, LiveState } from '../../../utils/types';

Expand All @@ -21,7 +21,8 @@ export const getV1StreamsUrl: APIRoute<

ctx.body = {
rtmp: {
url: getPushUrl(),
unsecure_url: `rtmp://${pushDomain}/live`,
secure_url: `rtmps://${pushDomain}:1936/live`,
streamKey: getPushStreamKey(live.id, token, live.watchToken || undefined)
},
websocket: {
Expand Down
4 changes: 3 additions & 1 deletion apps/server/src/services/jwt/_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class JWT {
payload: jose.JWTPayload,
expirationTime: string
): Promise<string> {
const { privateKey, publicKey } = await this.getKey();
const { privateKey } = await this.getKey();

const jwt = await new jose.SignJWT(payload)
.setProtectedHeader({ alg })
Expand All @@ -66,6 +66,7 @@ export class JWT {
.setExpirationTime(expirationTime)
.sign(privateKey);

/*
try {
const { payload } = await jose.jwtVerify(jwt, publicKey, {
issuer: ISSUER,
Expand All @@ -75,6 +76,7 @@ export class JWT {
} catch (e) {
logDebug(this.subject, 'sign test failed', e);
}
*/

return jwt;
}
Expand Down
4 changes: 2 additions & 2 deletions apps/server/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ export const baseVideoPlay = `${PROTOCOL}://${
process.env.VIDEO_CDN_DOMAIN || process.env.VIDEO_DOMAIN || ''
}`;
export const frontendUrl = process.env.FRONTEND_ENDPOINT || '';
export const getPushUrl = () =>
`rtmps://${(process.env.PUSH_DOMAIN || '').split(':')[0]}:1936/live`;
export const pushDomain = (process.env.PUSH_DOMAIN || '').split(':')[0];

export const getPushStreamKey = (
liveId: number,
pushToken: string,
Expand Down
6 changes: 3 additions & 3 deletions apps/web/locales/ja.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@
"video.recording-deleted": "この配信の録画はありません。",
"live.player.settings.type": "再生方式を変更",
"live.player.settings.type.flv.title": "ソース画質",
"live.player.settings.type.flv.note": "低遅延 (iOS 非対応)",
"live.player.settings.type.flv.note": "超低遅延 (iOS 非対応)",
"live.player.settings.type.hlsHq.title": "高画質",
"live.player.settings.type.hlsHq.note": "遅延あり、iOS 向け",
"live.player.settings.type.hlsHq.note": "低遅延 (iOS 向け)",
"live.player.settings.type.hlsLq.title": "低画質",
"live.player.settings.type.hlsLq.note": "高遅延、携帯回線向け",
"live.player.settings.type.hlsLq.note": "携帯回線向け",
"live.player.settings.type.audio.title": "音声のみ",
"live.player.settings.type.audio.note": "バックグラウンド再生向け",
"video.player.settings.type.hlsHq.title": "高画質",
Expand Down
2 changes: 1 addition & 1 deletion apps/web/organisms/live/admin/live-info-modal/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export const LiveInfoModal: FC<Props> = ({
);
const [hashTag, setHashTag] = useState(live?.hashtag || '');
const [sensitive, setSensitive] = useState(live?.sensitive || false);
const [isRecording, setIsRecording] = useState(live?.isRecording ?? true);
const [isRecording, setIsRecording] = useState(live?.isRecording ?? false);
const [preferThumbnailType, setPreferThumbnailType] = useState(
live?.config?.preferThumbnailType || 'generate'
);
Expand Down
54 changes: 44 additions & 10 deletions apps/web/organisms/live/admin/push-key.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import {
Button,
FormControl,
FormLabel,
HStack,
Input,
InputGroup,
VStack,
useToast
} from '@chakra-ui/react';
import { FC, FocusEvent, Fragment, useCallback, useState } from 'react';
Expand All @@ -17,7 +17,8 @@ type Props = {
};

type Rtmp = {
url: string;
unsecure_url: string;
secure_url: string | undefined;
streamKey: string;
};

Expand Down Expand Up @@ -55,18 +56,38 @@ export const PushKey: FC<Props> = ({ liveId }) => {
})();
}, [token, liveId]);

const handleUrlFocus = useCallback(
const handleUnsecureUrlFocus = useCallback(
(e: FocusEvent<HTMLInputElement>) => {
if (!rtmp) {
return;
}

void (async () => {
e.target.select();
await navigator.clipboard.writeText(rtmp.url);
await navigator.clipboard.writeText(rtmp.unsecure_url);

toast({
title: 'サーバーURL をコピーしました',
title: 'サーバーURL (RTMP) をコピーしました',
status: 'success',
isClosable: true
});
})();
},
[rtmp, toast]
);

const handleSecureUrlFocus = useCallback(
(e: FocusEvent<HTMLInputElement>) => {
if (!rtmp) {
return;
}

void (async () => {
e.target.select();
await navigator.clipboard.writeText(rtmp.secure_url || '');

toast({
title: 'サーバーURL (RTMPS) をコピーしました',
status: 'success',
isClosable: true
});
Expand Down Expand Up @@ -108,15 +129,28 @@ export const PushKey: FC<Props> = ({ liveId }) => {
</Button>
)}
{rtmp && (
<HStack spacing={4}>
<VStack gap={4}>
<FormControl>
<FormLabel>サーバーURL (RTMPS: 推奨)</FormLabel>

<InputGroup size="md">
<Input
type="text"
value={rtmp.secure_url}
onFocus={handleSecureUrlFocus}
readOnly
/>
</InputGroup>
</FormControl>

<FormControl>
<FormLabel>サーバーURL</FormLabel>
<FormLabel>サーバーURL (RTMP: IPv6 only)</FormLabel>

<InputGroup size="md">
<Input
type="text"
value={rtmp.url}
onFocus={handleUrlFocus}
value={rtmp.unsecure_url}
onFocus={handleUnsecureUrlFocus}
readOnly
/>
</InputGroup>
Expand All @@ -132,7 +166,7 @@ export const PushKey: FC<Props> = ({ liveId }) => {
readOnly
/>
</FormControl>
</HStack>
</VStack>
)}
</Fragment>
);
Expand Down
4 changes: 4 additions & 0 deletions apps/web/utils/hooks/use-video-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ export const useVideoStream = <T extends 'live' | 'video'>(
player.attachMediaElement(videoTagRef.current);
player.load();
} catch (e) {
if (e instanceof FlvNotSupportedError) {
throw e;
}

setError(e);
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ export type Methods = {

resBody: {
rtmp: {
url: string;
unsecure_url: string;
secure_url: string | undefined;
streamKey: string;
};
websocket: {
Expand Down

0 comments on commit 8fe5555

Please sign in to comment.