feat: internal streams

This commit is contained in:
dumbmoron 2024-04-26 11:53:50 +00:00
parent ff93f7e42f
commit 66e58d21ec
No known key found for this signature in database
5 changed files with 118 additions and 35 deletions

View file

@ -11,7 +11,7 @@ import { Bright, Cyan } from "../modules/sub/consoleText.js";
import stream from "../modules/stream/stream.js"; import stream from "../modules/stream/stream.js";
import loc from "../localization/manager.js"; import loc from "../localization/manager.js";
import { generateHmac } from "../modules/sub/crypto.js"; import { generateHmac } from "../modules/sub/crypto.js";
import { verifyStream } from "../modules/stream/manage.js"; import { verifyStream, getInternalStream } from "../modules/stream/manage.js";
export function runAPI(express, app, gitCommit, gitBranch, __dirname) { export function runAPI(express, app, gitCommit, gitBranch, __dirname) {
const corsConfig = process.env.CORS_WILDCARD === '0' ? { const corsConfig = process.env.CORS_WILDCARD === '0' ? {
@ -123,13 +123,13 @@ export function runAPI(express, app, gitCommit, gitBranch, __dirname) {
app.get('/api/:type', (req, res) => { app.get('/api/:type', (req, res) => {
try { try {
let j;
switch (req.params.type) { switch (req.params.type) {
case 'stream': case 'stream':
const q = req.query; const q = req.query;
const checkQueries = q.t && q.e && q.h && q.s && q.i; const checkQueries = q.t && q.e && q.h && q.s && q.i;
const checkBaseLength = q.t.length === 21 && q.e.length === 13; const checkBaseLength = q.t.length === 21 && q.e.length === 13;
const checkSafeLength = q.h.length === 43 && q.s.length === 43 && q.i.length === 22; const checkSafeLength = q.h.length === 43 && q.s.length === 43 && q.i.length === 22;
if (checkQueries && checkBaseLength && checkSafeLength) { if (checkQueries && checkBaseLength && checkSafeLength) {
let streamInfo = verifyStream(q.t, q.h, q.e, q.s, q.i); let streamInfo = verifyStream(q.t, q.h, q.e, q.s, q.i);
if (streamInfo.error) { if (streamInfo.error) {
@ -141,12 +141,23 @@ export function runAPI(express, app, gitCommit, gitBranch, __dirname) {
}); });
} }
return stream(res, streamInfo); return stream(res, streamInfo);
} else {
let j = apiJSON(0, {
t: "bad request. stream link may be incomplete or corrupted."
})
return res.status(j.status).json(j.body);
} }
j = apiJSON(0, {
t: "bad request. stream link may be incomplete or corrupted."
})
return res.status(j.status).json(j.body);
case 'istream':
if (!req.ip.endsWith('127.0.0.1'))
return res.sendStatus(403);
if (('' + req.query.t).length !== 21)
return res.sendStatus(400);
let streamInfo = getInternalStream(req.query.t);
if (!streamInfo) return res.sendStatus(404);
streamInfo.headers = req.headers;
return stream(res, { type: 'internal', ...streamInfo });
case 'serverInfo': case 'serverInfo':
return res.status(200).json({ return res.status(200).json({
version: version, version: version,
@ -158,7 +169,7 @@ export function runAPI(express, app, gitCommit, gitBranch, __dirname) {
startTime: `${startTimestamp}` startTime: `${startTimestamp}`
}); });
default: default:
let j = apiJSON(0, { j = apiJSON(0, {
t: "unknown response type" t: "unknown response type"
}) })
return res.status(j.status).json(j.body); return res.status(j.status).json(j.body);

View file

@ -0,0 +1,24 @@
import { request } from 'undici'
export async function internalStream(streamInfo, res) {
try {
const req = await request(streamInfo.url, {
headers: streamInfo.headers,
signal: streamInfo.controller.signal,
maxRedirections: 16
});
res.status(req.statusCode);
for (const [ name, value ] of Object.entries(req.headers))
res.setHeader(name, value)
if (req.statusCode < 200 || req.statusCode > 299)
return res.destroy();
req.body.pipe(res);
req.body.on('error', () => res.destroy());
} catch {
streamInfo.controller.abort();
}
}

View file

@ -4,6 +4,7 @@ import { nanoid } from 'nanoid';
import { decryptStream, encryptStream, generateHmac } from "../sub/crypto.js"; import { decryptStream, encryptStream, generateHmac } from "../sub/crypto.js";
import { streamLifespan } from "../config.js"; import { streamLifespan } from "../config.js";
import { strict as assert } from "assert";
const streamNoAccess = { const streamNoAccess = {
error: "i couldn't verify if you have access to this stream. go back and try again!", error: "i couldn't verify if you have access to this stream. go back and try again!",
@ -24,6 +25,7 @@ streamCache.on("expired", (key) => {
streamCache.del(key); streamCache.del(key);
}) })
const internalStreamCache = {};
const hmacSalt = randomBytes(64).toString('hex'); const hmacSalt = randomBytes(64).toString('hex');
export function createStream(obj) { export function createStream(obj) {
@ -67,6 +69,34 @@ export function createStream(obj) {
return streamLink.toString(); return streamLink.toString();
} }
export function getInternalStream(id) {
return internalStreamCache[id];
}
export function createInternalStream(obj = {}) {
assert(typeof obj.url === 'string');
const streamID = nanoid();
internalStreamCache[streamID] = {
url: obj.url,
controller: new AbortController()
};
let streamLink = new URL('/api/istream', `http://127.0.0.1:${process.env.API_PORT}`);
streamLink.searchParams.set('t', streamID);
return streamLink.toString();
}
export function destroyInternalStream(url) {
const id = new URL(url).searchParams.get('t');
assert(id);
if (internalStreamCache[id]) {
internalStreamCache[id].controller.abort();
delete internalStreamCache[id];
}
}
export function verifyStream(id, hmac, exp, secret, iv) { export function verifyStream(id, hmac, exp, secret, iv) {
try { try {
const ghmac = generateHmac(`${id},${exp},${iv},${secret}`, hmacSalt); const ghmac = generateHmac(`${id},${exp},${iv},${secret}`, hmacSalt);
@ -82,6 +112,18 @@ export function verifyStream(id, hmac, exp, secret, iv) {
if (Number(exp) <= new Date().getTime()) if (Number(exp) <= new Date().getTime())
return streamNoExist; return streamNoExist;
if (!streamInfo.originalUrls) {
streamInfo.originalUrls = streamInfo.urls;
}
if (typeof streamInfo.originalUrls === 'string') {
streamInfo.urls = createInternalStream({ url: streamInfo.originalUrls });
} else if (Array.isArray(streamInfo.originalUrls)) {
for (const idx in streamInfo.originalUrls) {
streamInfo.originalUrls[idx] = createInternalStream({ url: streamInfo.originalUrls[idx] });
}
} else throw 'invalid urls';
return streamInfo; return streamInfo;
} }
catch (e) { catch (e) {

View file

@ -1,4 +1,5 @@
import { streamAudioOnly, streamDefault, streamLiveRender, streamVideoOnly, convertToGif } from "./types.js"; import { streamAudioOnly, streamDefault, streamLiveRender, streamVideoOnly, convertToGif } from "./types.js";
import { internalStream } from './internal.js'
export default async function(res, streamInfo) { export default async function(res, streamInfo) {
try { try {
@ -7,6 +8,8 @@ export default async function(res, streamInfo) {
return; return;
} }
switch (streamInfo.type) { switch (streamInfo.type) {
case "internal":
return await internalStream(streamInfo, res);
case "render": case "render":
await streamLiveRender(streamInfo, res); await streamLiveRender(streamInfo, res);
break; break;

View file

@ -1,10 +1,12 @@
import { spawn } from "child_process";
import ffmpeg from "ffmpeg-static";
import { ffmpegArgs, genericUserAgent } from "../config.js";
import { metadataManager } from "../sub/utils.js";
import { request } from "undici"; import { request } from "undici";
import ffmpeg from "ffmpeg-static";
import { spawn } from "child_process";
import { create as contentDisposition } from "content-disposition-header"; import { create as contentDisposition } from "content-disposition-header";
import { metadataManager } from "../sub/utils.js";
import { destroyInternalStream } from "./manage.js";
import { ffmpegArgs, genericUserAgent } from "../config.js";
const defaultHeaders = { const defaultHeaders = {
'user-agent': genericUserAgent 'user-agent': genericUserAgent
} }
@ -67,7 +69,11 @@ function getCommand(args) {
export async function streamDefault(streamInfo, res) { export async function streamDefault(streamInfo, res) {
const abortController = new AbortController(); const abortController = new AbortController();
const shutdown = () => (closeRequest(abortController), closeResponse(res)); const shutdown = () => (
closeRequest(abortController),
closeResponse(res),
destroyInternalStream(streamInfo.urls)
);
try { try {
let filename = streamInfo.filename; let filename = streamInfo.filename;
@ -91,13 +97,12 @@ export async function streamDefault(streamInfo, res) {
} }
} }
export async function streamLiveRender(streamInfo, res) { export function streamLiveRender(streamInfo, res) {
let process, abortController = new AbortController(); let process;
const shutdown = () => ( const shutdown = () => (
closeRequest(abortController),
killProcess(process), killProcess(process),
closeResponse(res) closeResponse(res),
streamInfo.urls.map(destroyInternalStream)
); );
const headers = getHeaders(streamInfo.service); const headers = getHeaders(streamInfo.service);
@ -106,19 +111,13 @@ export async function streamLiveRender(streamInfo, res) {
try { try {
if (streamInfo.urls.length !== 2) return shutdown(); if (streamInfo.urls.length !== 2) return shutdown();
const { body: audio } = await request(streamInfo.urls[1], {
headers,
signal: abortController.signal,
maxRedirections: 16
});
const format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; const format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
let args = [ let args = [
'-loglevel', '-8', '-loglevel', '-8',
'-headers', rawHeaders, '-headers', rawHeaders,
'-i', streamInfo.urls[0], '-i', streamInfo.urls[0],
'-i', 'pipe:3', '-i', streamInfo.urls[1],
'-map', '0:v', '-map', '0:v',
'-map', '1:a', '-map', '1:a',
] ]
@ -129,25 +128,21 @@ export async function streamLiveRender(streamInfo, res) {
args = args.concat(metadataManager(streamInfo.metadata)) args = args.concat(metadataManager(streamInfo.metadata))
} }
args.push('-f', format, 'pipe:4'); args.push('-f', format, 'pipe:3');
process = spawn(...getCommand(args), { process = spawn(...getCommand(args), {
windowsHide: true, windowsHide: true,
stdio: [ stdio: [
'inherit', 'inherit', 'inherit', 'inherit', 'inherit', 'inherit',
'pipe', 'pipe' 'pipe'
], ],
}); });
const [,,, audioInput, muxOutput] = process.stdio; const [,,, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
audio.on('error', shutdown);
audioInput.on('error', shutdown);
audio.pipe(audioInput);
pipe(muxOutput, res, shutdown); pipe(muxOutput, res, shutdown);
process.on('close', shutdown); process.on('close', shutdown);
@ -159,7 +154,11 @@ export async function streamLiveRender(streamInfo, res) {
export function streamAudioOnly(streamInfo, res) { export function streamAudioOnly(streamInfo, res) {
let process; let process;
const shutdown = () => (killProcess(process), closeResponse(res)); const shutdown = () => (
killProcess(process),
closeResponse(res),
destroyInternalStream(streamInfo.urls)
);
try { try {
let args = [ let args = [
@ -209,7 +208,11 @@ export function streamAudioOnly(streamInfo, res) {
export function streamVideoOnly(streamInfo, res) { export function streamVideoOnly(streamInfo, res) {
let process; let process;
const shutdown = () => (killProcess(process), closeResponse(res)); const shutdown = () => (
killProcess(process),
closeResponse(res),
destroyInternalStream(streamInfo.urls)
);
try { try {
let args = [ let args = [