From 66e58d21ec3e7bdd4f2b38e1e36cfb8d14d6ca6b Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Fri, 26 Apr 2024 11:53:50 +0000 Subject: [PATCH] feat: internal streams --- src/core/api.js | 29 ++++++++++++------ src/modules/stream/internal.js | 24 +++++++++++++++ src/modules/stream/manage.js | 42 ++++++++++++++++++++++++++ src/modules/stream/stream.js | 3 ++ src/modules/stream/types.js | 55 ++++++++++++++++++---------------- 5 files changed, 118 insertions(+), 35 deletions(-) create mode 100644 src/modules/stream/internal.js diff --git a/src/core/api.js b/src/core/api.js index eda3c01..9dd4b1c 100644 --- a/src/core/api.js +++ b/src/core/api.js @@ -11,7 +11,7 @@ import { Bright, Cyan } from "../modules/sub/consoleText.js"; import stream from "../modules/stream/stream.js"; import loc from "../localization/manager.js"; import { generateHmac } from "../modules/sub/crypto.js"; -import { verifyStream } from "../modules/stream/manage.js"; +import { verifyStream, getInternalStream } from "../modules/stream/manage.js"; export function runAPI(express, app, gitCommit, gitBranch, __dirname) { const corsConfig = process.env.CORS_WILDCARD === '0' ? { @@ -123,13 +123,13 @@ export function runAPI(express, app, gitCommit, gitBranch, __dirname) { app.get('/api/:type', (req, res) => { try { + let j; switch (req.params.type) { case 'stream': const q = req.query; const checkQueries = q.t && q.e && q.h && q.s && q.i; const checkBaseLength = q.t.length === 21 && q.e.length === 13; const checkSafeLength = q.h.length === 43 && q.s.length === 43 && q.i.length === 22; - if (checkQueries && checkBaseLength && checkSafeLength) { let streamInfo = verifyStream(q.t, q.h, q.e, q.s, q.i); if (streamInfo.error) { @@ -141,12 +141,23 @@ export function runAPI(express, app, gitCommit, gitBranch, __dirname) { }); } return stream(res, streamInfo); - } else { - let j = apiJSON(0, { - t: "bad request. stream link may be incomplete or corrupted." - }) - return res.status(j.status).json(j.body); - } + } + + j = apiJSON(0, { + t: "bad request. stream link may be incomplete or corrupted." + }) + return res.status(j.status).json(j.body); + case 'istream': + if (!req.ip.endsWith('127.0.0.1')) + return res.sendStatus(403); + if (('' + req.query.t).length !== 21) + return res.sendStatus(400); + + let streamInfo = getInternalStream(req.query.t); + if (!streamInfo) return res.sendStatus(404); + streamInfo.headers = req.headers; + + return stream(res, { type: 'internal', ...streamInfo }); case 'serverInfo': return res.status(200).json({ version: version, @@ -158,7 +169,7 @@ export function runAPI(express, app, gitCommit, gitBranch, __dirname) { startTime: `${startTimestamp}` }); default: - let j = apiJSON(0, { + j = apiJSON(0, { t: "unknown response type" }) return res.status(j.status).json(j.body); diff --git a/src/modules/stream/internal.js b/src/modules/stream/internal.js new file mode 100644 index 0000000..a1c99ff --- /dev/null +++ b/src/modules/stream/internal.js @@ -0,0 +1,24 @@ +import { request } from 'undici' + +export async function internalStream(streamInfo, res) { + try { + const req = await request(streamInfo.url, { + headers: streamInfo.headers, + signal: streamInfo.controller.signal, + maxRedirections: 16 + }); + + res.status(req.statusCode); + + for (const [ name, value ] of Object.entries(req.headers)) + res.setHeader(name, value) + + if (req.statusCode < 200 || req.statusCode > 299) + return res.destroy(); + + req.body.pipe(res); + req.body.on('error', () => res.destroy()); + } catch { + streamInfo.controller.abort(); + } +} \ No newline at end of file diff --git a/src/modules/stream/manage.js b/src/modules/stream/manage.js index d4cb1e6..680fd8f 100644 --- a/src/modules/stream/manage.js +++ b/src/modules/stream/manage.js @@ -4,6 +4,7 @@ import { nanoid } from 'nanoid'; import { decryptStream, encryptStream, generateHmac } from "../sub/crypto.js"; import { streamLifespan } from "../config.js"; +import { strict as assert } from "assert"; const streamNoAccess = { error: "i couldn't verify if you have access to this stream. go back and try again!", @@ -24,6 +25,7 @@ streamCache.on("expired", (key) => { streamCache.del(key); }) +const internalStreamCache = {}; const hmacSalt = randomBytes(64).toString('hex'); export function createStream(obj) { @@ -67,6 +69,34 @@ export function createStream(obj) { return streamLink.toString(); } +export function getInternalStream(id) { + return internalStreamCache[id]; +} + +export function createInternalStream(obj = {}) { + assert(typeof obj.url === 'string'); + + const streamID = nanoid(); + internalStreamCache[streamID] = { + url: obj.url, + controller: new AbortController() + }; + + let streamLink = new URL('/api/istream', `http://127.0.0.1:${process.env.API_PORT}`); + streamLink.searchParams.set('t', streamID); + return streamLink.toString(); +} + +export function destroyInternalStream(url) { + const id = new URL(url).searchParams.get('t'); + assert(id); + + if (internalStreamCache[id]) { + internalStreamCache[id].controller.abort(); + delete internalStreamCache[id]; + } +} + export function verifyStream(id, hmac, exp, secret, iv) { try { const ghmac = generateHmac(`${id},${exp},${iv},${secret}`, hmacSalt); @@ -82,6 +112,18 @@ export function verifyStream(id, hmac, exp, secret, iv) { if (Number(exp) <= new Date().getTime()) return streamNoExist; + if (!streamInfo.originalUrls) { + streamInfo.originalUrls = streamInfo.urls; + } + + if (typeof streamInfo.originalUrls === 'string') { + streamInfo.urls = createInternalStream({ url: streamInfo.originalUrls }); + } else if (Array.isArray(streamInfo.originalUrls)) { + for (const idx in streamInfo.originalUrls) { + streamInfo.originalUrls[idx] = createInternalStream({ url: streamInfo.originalUrls[idx] }); + } + } else throw 'invalid urls'; + return streamInfo; } catch (e) { diff --git a/src/modules/stream/stream.js b/src/modules/stream/stream.js index f254dac..0b9ba42 100644 --- a/src/modules/stream/stream.js +++ b/src/modules/stream/stream.js @@ -1,4 +1,5 @@ import { streamAudioOnly, streamDefault, streamLiveRender, streamVideoOnly, convertToGif } from "./types.js"; +import { internalStream } from './internal.js' export default async function(res, streamInfo) { try { @@ -7,6 +8,8 @@ export default async function(res, streamInfo) { return; } switch (streamInfo.type) { + case "internal": + return await internalStream(streamInfo, res); case "render": await streamLiveRender(streamInfo, res); break; diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index d3e3343..10bd3a6 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -1,10 +1,12 @@ -import { spawn } from "child_process"; -import ffmpeg from "ffmpeg-static"; -import { ffmpegArgs, genericUserAgent } from "../config.js"; -import { metadataManager } from "../sub/utils.js"; import { request } from "undici"; +import ffmpeg from "ffmpeg-static"; +import { spawn } from "child_process"; import { create as contentDisposition } from "content-disposition-header"; +import { metadataManager } from "../sub/utils.js"; +import { destroyInternalStream } from "./manage.js"; +import { ffmpegArgs, genericUserAgent } from "../config.js"; + const defaultHeaders = { 'user-agent': genericUserAgent } @@ -67,7 +69,11 @@ function getCommand(args) { export async function streamDefault(streamInfo, res) { const abortController = new AbortController(); - const shutdown = () => (closeRequest(abortController), closeResponse(res)); + const shutdown = () => ( + closeRequest(abortController), + closeResponse(res), + destroyInternalStream(streamInfo.urls) + ); try { let filename = streamInfo.filename; @@ -91,13 +97,12 @@ export async function streamDefault(streamInfo, res) { } } -export async function streamLiveRender(streamInfo, res) { - let process, abortController = new AbortController(); - +export function streamLiveRender(streamInfo, res) { + let process; const shutdown = () => ( - closeRequest(abortController), killProcess(process), - closeResponse(res) + closeResponse(res), + streamInfo.urls.map(destroyInternalStream) ); const headers = getHeaders(streamInfo.service); @@ -106,19 +111,13 @@ export async function streamLiveRender(streamInfo, res) { try { if (streamInfo.urls.length !== 2) return shutdown(); - const { body: audio } = await request(streamInfo.urls[1], { - headers, - signal: abortController.signal, - maxRedirections: 16 - }); - const format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; let args = [ '-loglevel', '-8', '-headers', rawHeaders, '-i', streamInfo.urls[0], - '-i', 'pipe:3', + '-i', streamInfo.urls[1], '-map', '0:v', '-map', '1:a', ] @@ -129,25 +128,21 @@ export async function streamLiveRender(streamInfo, res) { args = args.concat(metadataManager(streamInfo.metadata)) } - args.push('-f', format, 'pipe:4'); + args.push('-f', format, 'pipe:3'); process = spawn(...getCommand(args), { windowsHide: true, stdio: [ 'inherit', 'inherit', 'inherit', - 'pipe', 'pipe' + 'pipe' ], }); - const [,,, audioInput, muxOutput] = process.stdio; + const [,,, muxOutput] = process.stdio; res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - audio.on('error', shutdown); - audioInput.on('error', shutdown); - - audio.pipe(audioInput); pipe(muxOutput, res, shutdown); process.on('close', shutdown); @@ -159,7 +154,11 @@ export async function streamLiveRender(streamInfo, res) { export function streamAudioOnly(streamInfo, res) { let process; - const shutdown = () => (killProcess(process), closeResponse(res)); + const shutdown = () => ( + killProcess(process), + closeResponse(res), + destroyInternalStream(streamInfo.urls) + ); try { let args = [ @@ -209,7 +208,11 @@ export function streamAudioOnly(streamInfo, res) { export function streamVideoOnly(streamInfo, res) { let process; - const shutdown = () => (killProcess(process), closeResponse(res)); + const shutdown = () => ( + killProcess(process), + closeResponse(res), + destroyInternalStream(streamInfo.urls) + ); try { let args = [