Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge master to latest #2149

Merged
merged 10 commits into from
Apr 24, 2024
14 changes: 12 additions & 2 deletions packages/api/src/compile-schemas.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const data = _.merge({}, apiData, dbData);
const ajv = new Ajv({ sourceCode: true });

const index = [];
const types = [];
let types = [];

for (const [name, schema] of Object.entries(data.components.schemas)) {
schema.title = name;
Expand All @@ -72,7 +72,17 @@ const data = _.merge({}, apiData, dbData);
const indexPath = path.resolve(validatorDir, "index.js");
write(indexPath, indexStr);

const typeStr = types.join("\n");
const typeDefinition = `export type InputCreatorId =
| {
type: "unverified";
value: string;
}
| string;`;

let typeStr = types.join("\n\n");
const cleanedTypeStr = typeStr.split(typeDefinition).join("");
typeStr = `${cleanedTypeStr.trim()}\n\n${typeDefinition}`;

const typePath = path.resolve(schemaDir, "types.d.ts");
write(typePath, typeStr);
})().catch((err) => {
Expand Down
9 changes: 8 additions & 1 deletion packages/api/src/controllers/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,13 @@ function parseFiltersRaw(fieldsMap: FieldsMap, val: string): SQLStatement[] {
q.push(sql``.append(fv).append(sql` = ${filter.value}`));
} else if (fv.val) {
if (fv.type === "boolean") {
if (typeof filter.value !== "boolean") {
throw new Error(
`expected boolean value for field "${
filter.id
}", got: ${JSON.stringify(filter.value)}`
);
}
q.push(
sql``.append(
`coalesce((${fv.val})::boolean, FALSE) IS ${
Expand Down Expand Up @@ -653,7 +660,7 @@ export const triggerCatalystPullStart =
url.searchParams.set("lon", lon.toString());
playbackUrl = url.toString();
console.log(
`triggering catalyst pull start for streamId=${stream.id} playbackId=${stream.playbackId} lat=${lat} lon=${lon} pullRegion=${stream.pullRegion}`
`triggering catalyst pull start for streamId=${stream.id} playbackId=${stream.playbackId} lat=${lat} lon=${lon} pullRegion=${stream.pullRegion}, playbackUrl=${playbackUrl}`
);
}

Expand Down
11 changes: 9 additions & 2 deletions packages/api/src/controllers/playback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,16 @@ app.get("/:id", async (req, res) => {
res.status(501);
return res.json({ errors: ["Ingest not configured"] });
}
const ingest = ingests[0].base;

let ingest = ingests[0].base;
let { id } = req.params;

if (
(id === "1ba7nrr34rbjl4bb" || req.user?.directPlayback) &&
ingests[0].baseDirect
) {
ingest = ingests[0].baseDirect;
}

const withRecordings = req.query.recordings === "true";

const origin = req.headers["origin"] ?? "";
Expand Down
50 changes: 48 additions & 2 deletions packages/api/src/controllers/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
import serverPromise, { TestServer } from "../test-server";
import { semaphore, sleep } from "../util";
import { generateUniquePlaybackId } from "./generate-keys";
import { extractRegionFrom } from "./stream";
import { extractUrlFrom, extractRegionFrom } from "./stream";

const uuidRegex = /[0-9a-f]+(-[0-9a-f]+){4}/;

Expand Down Expand Up @@ -550,13 +550,33 @@ describe("controllers/stream", () => {
const stream = await res.json();

// Mark stream as active
await db.stream.update(stream.id, { isActive: true });
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now(),
});

// Requesting pull lock should fail, because the stream is active (so it should be replicated instead of being pulled)
const reslockPull = await client.post(`/stream/${stream.id}/lockPull`);
expect(reslockPull.status).toBe(423);
});

it("should still lock pull for an active stream that got lost", async () => {
// Create stream pull
const res = await client.put("/stream/pull", postMockPullStream);
expect(res.status).toBe(201);
const stream = await res.json();

// Mark stream as active
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - 24 * 60 * 60 * 1000,
});

// Requesting pull lock should work, because the stream is not actually active (outdated lastSeen)
const reslockPull = await client.post(`/stream/${stream.id}/lockPull`);
expect(reslockPull.status).toBe(204);
});

it("should not lock pull for already locked pull", async () => {
// Create stream pull
const res = await client.put("/stream/pull", postMockPullStream);
Expand Down Expand Up @@ -713,6 +733,32 @@ describe("controllers/stream", () => {
const document = await db.stream.get(stream.id);
expect(db.stream.addDefaultFields(document)).toEqual(updatedStream);
});
it("should extract host from redirected playback url", async () => {
expect(
extractUrlFrom(
"https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8"
)
).toBe("https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+");
expect(
extractUrlFrom(
"https://mos2-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8"
)
).toBe(
"https://mos2-prod-catalyst-0.lp-playback.studio:443/hls/video+"
);
expect(
extractUrlFrom(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+not-used-playback/index.m3u8"
)
).toBe(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+"
);
expect(
extractUrlFrom(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+other-playback/index.m3u8"
)
).toBe(null);
});
it("should extract region from redirected playback url", async () => {
expect(
extractRegionFrom(
Expand Down
62 changes: 46 additions & 16 deletions packages/api/src/controllers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import logger from "../logger";
import { authorizer } from "../middleware";
import { validatePost } from "../middleware";
import { geolocateMiddleware } from "../middleware";
import { fetchWithTimeout } from "../util";
import { fetchWithTimeoutAndRedirects } from "../util";
import { CliArgs } from "../parse-cli";
import {
DetectionWebhookPayload,
Expand Down Expand Up @@ -236,12 +236,12 @@ async function triggerManyIdleStreamsWebhook(ids: string[], queue: Queue) {
);
}

async function resolvePullRegion(
async function resolvePullUrlAndRegion(
stream: NewStreamPayload,
ingest: string
): Promise<string> {
): Promise<{ pullUrl: string; pullRegion: string }> {
if (process.env.NODE_ENV === "test") {
return null;
return { pullUrl: null, pullRegion: null };
}
const url = new URL(
pathJoin(ingest, `hls`, "not-used-playback", `index.m3u8`)
Expand All @@ -253,13 +253,23 @@ async function resolvePullRegion(
}
const playbackUrl = url.toString();
// Send any playback request to catalyst-api, which effectively resolves the region using MistUtilLoad
const response = await fetchWithTimeout(playbackUrl, { redirect: "manual" });
if (response.status < 300 || response.status >= 400) {
// not a redirect response, so we can't determine the region
const response = await fetchWithTimeoutAndRedirects(playbackUrl, {});
if (response.status !== 200) {
// not a correct status code, so we can't determine the region/host
return null;
}
const redirectUrl = response.headers.get("location");
return extractRegionFrom(redirectUrl);
return {
pullUrl: extractUrlFrom(response.url),
pullRegion: extractRegionFrom(response.url),
};
}

// Extracts Mist URL from redirected node URL, e.g. "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8"
export function extractUrlFrom(playbackUrl: string): string {
const hostRegex =
/(https?:\/\/.+-\w+-catalyst.+\/hls\/.+)not-used-playback\/index.m3u8/;
const matches = playbackUrl.match(hostRegex);
return matches ? matches[1] : null;
}

// Extracts region from redirected node URL, e.g. "sto" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8"
Expand Down Expand Up @@ -379,7 +389,8 @@ const fieldsMap: FieldsMap = {
val: `stream.data->'transcodedSegmentsDuration'`,
type: "real",
},
isHealthy: { val: `stream.data->'isHealthy'`, type: "boolean" },
// isHealthy field is sometimes JSON-`null` so we query it as a string (->>)
isHealthy: { val: `stream.data->>'isHealthy'`, type: "boolean" },
};

app.get("/", authorizer({}), async (req, res) => {
Expand Down Expand Up @@ -1085,7 +1096,10 @@ app.put(
}
const streamExisted = streams.length === 1;

const pullRegion = await resolvePullRegion(rawPayload, ingest);
const { pullUrl, pullRegion } = await resolvePullUrlAndRegion(
rawPayload,
ingest
);

let stream: DBStream;
if (!streamExisted) {
Expand Down Expand Up @@ -1122,8 +1136,12 @@ app.put(
await triggerCatalystStreamUpdated(req, stream.playbackId);
}

// If pullHost was resolved, then stick to that host for triggering Catalyst pull start
const playbackUrl = pullUrl
? pathJoin(pullUrl + stream.playbackId, `index.m3u8`)
: getHLSPlaybackUrl(ingest, stream);
if (!stream.isActive || streamExisted) {
await triggerCatalystPullStart(stream, getHLSPlaybackUrl(ingest, stream));
await triggerCatalystPullStart(stream, playbackUrl);
}

res.status(streamExisted ? 200 : 201);
Expand Down Expand Up @@ -1153,12 +1171,24 @@ app.post("/:id/lockPull", authorizer({ anyAdmin: true }), async (req, res) => {
return res.json({ errors: ["not found"] });
}

// We have an issue that some of the streams/sessions are not marked as inactive when they should be.
// This is a workaround to clean up the stream in the background
const doingActiveCleanup = activeCleanupOne(
req.config,
stream,
req.queue,
await getIngestBase(req)
);

// the `isActive` field is only cleared later in background, so we ignore it
// in the query below in case we triggered an active cleanup logic above.
const leaseDeadline = Date.now() - leaseTimeout;
const updateRes = await db.stream.update(
[
sql`id = ${stream.id}`,
sql`(data->>'pullLockedBy' = ${host} OR (COALESCE((data->>'pullLockedAt')::bigint,0) < ${
Date.now() - leaseTimeout
} AND COALESCE((data->>'isActive')::boolean,FALSE) = FALSE))`,
doingActiveCleanup
? sql`(data->>'pullLockedBy' = ${host} OR (COALESCE((data->>'pullLockedAt')::bigint,0) < ${leaseDeadline}))`
: sql`(data->>'pullLockedBy' = ${host} OR (COALESCE((data->>'pullLockedAt')::bigint,0) < ${leaseDeadline} AND COALESCE((data->>'isActive')::boolean,FALSE) = FALSE))`,
],
{ pullLockedAt: Date.now(), pullLockedBy: host },
{ throwIfEmpty: false }
Expand All @@ -1169,7 +1199,7 @@ app.post("/:id/lockPull", authorizer({ anyAdmin: true }), async (req, res) => {
return;
}
logger.info(
`/lockPull failed for stream=${id}, isActive=${stream.isActive}, pullLockedBy=${stream.pullLockedBy}, pullLockedAt=${stream.pullLockedAt}`
`/lockPull failed for stream=${id}, isActive=${stream.isActive}, lastSeen=${stream.lastSeen}, pullLockedBy=${stream.pullLockedBy}, pullLockedAt=${stream.pullLockedAt}`
);
res.status(423).end();
});
Expand Down
13 changes: 12 additions & 1 deletion packages/api/src/controllers/user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@
res.json(user);
});

app.patch("/:id", authorizer({}), async (req, res) => {
app.patch("/:id/email", authorizer({}), async (req, res) => {
Dismissed Show dismissed Hide dismissed
const { email } = req.body;
const userId = req.user.id;

Expand Down Expand Up @@ -731,6 +731,17 @@
}
);

app.patch("/:id", authorizer({ anyAdmin: true }), async (req, res) => {
Dismissed Show dismissed Hide dismissed
const { id } = req.params;
const { directPlayback } = req.body;

if (typeof directPlayback !== "undefined") {
await db.user.update(id, { directPlayback });
}

res.status(204).end();
});

app.post("/token", validatePost("user"), async (req, res) => {
const user = await findUserByEmail(req.body.email);
const [hashedPassword] = await hash(req.body.password, user.salt);
Expand Down
Loading
Loading