From 9001d401dae64579c647cf0f107685b7a8186446 Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Thu, 19 Oct 2023 20:36:05 +0000 Subject: [PATCH 1/8] stream: improve shutdown handling, minor clean up - try to close as many things as possible when shutting down - remove redundant (e.g. `exit` on process when listening for `close`) and straight up useless (`disconnect`) event listeners --- package.json | 1 + src/modules/stream/types.js | 130 ++++++++++++++++-------------------- 2 files changed, 58 insertions(+), 73 deletions(-) diff --git a/package.json b/package.json index 946476c..6f37657 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ }, "homepage": "https://github.com/wukko/cobalt#readme", "dependencies": { + "abort-controller": "3.0.0", "content-disposition-header": "0.6.0", "cors": "^2.8.5", "dotenv": "^16.0.1", diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index e979e34..65df420 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -2,40 +2,48 @@ import { spawn } from "child_process"; import ffmpeg from "ffmpeg-static"; import { ffmpegArgs, genericUserAgent } from "../config.js"; import { getThreads, metadataManager } from "../sub/utils.js"; -import { request } from 'undici'; +import { request } from "undici"; import { create as contentDisposition } from "content-disposition-header"; +import { AbortController } from "abort-controller" -function fail(res) { +function closeResponse(res) { if (!res.headersSent) res.sendStatus(500); return res.destroy(); } export async function streamDefault(streamInfo, res) { + const abortController = new AbortController(); + const shutdown = () => (abortController.abort(), closeResponse(res)); + try { - let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; - res.setHeader('Content-disposition', contentDisposition(streamInfo.isAudioOnly ? `${streamInfo.filename}.${streamInfo.audioFormat}` : streamInfo.filename)); + const filename = streamInfo.isAudioOnly ? `${streamInfo.filename}.${streamInfo.audioFormat}` : streamInfo.filename; + res.setHeader('Content-disposition', contentDisposition(filename)); const { body: stream, headers } = await request(streamInfo.urls, { headers: { 'user-agent': genericUserAgent }, + signal: abortController.signal, maxRedirections: 16 }); res.setHeader('content-type', headers['content-type']); res.setHeader('content-length', headers['content-length']); - stream.pipe(res).on('error', () => fail(res)); - stream.on('error', () => fail(res)); - stream.on('aborted', () => fail(res)); - } catch (e) { - fail(res); + stream.on('error', shutdown) + .pipe(res).on('error', shutdown); + } catch { + shutdown(); } } -export async function streamLiveRender(streamInfo, res) { - try { - if (streamInfo.urls.length !== 2) return fail(res); - let { body: audio } = await request(streamInfo.urls[1], { - maxRedirections: 16 +export async function streamLiveRender(streamInfo, res) { + let abortController = new AbortController(), process; + const shutdown = () => (abortController.abort(), process?.kill(), closeResponse(res)); + + try { + if (streamInfo.urls.length !== 2) return shutdown(); + + const { body: audio } = await request(streamInfo.urls[1], { + maxRedirections: 16, signal: abortController.signal }); let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1], @@ -51,58 +59,41 @@ export async function streamLiveRender(streamInfo, res) { args = args.concat(ffmpegArgs[format]); if (streamInfo.metadata) args = args.concat(metadataManager(streamInfo.metadata)); args.push('-f', format, 'pipe:4'); - let ffmpegProcess = spawn(ffmpeg, args, { + + process = spawn(ffmpeg, args, { windowsHide: true, stdio: [ 'inherit', 'inherit', 'inherit', 'pipe', 'pipe' ], }); + res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - res.on('error', () => { - ffmpegProcess.kill(); - fail(res); - }); - ffmpegProcess.stdio[4].pipe(res).on('error', () => { - ffmpegProcess.kill(); - fail(res); - }); - audio.pipe(ffmpegProcess.stdio[3]).on('error', () => { - ffmpegProcess.kill(); - fail(res); - }); - - audio.on('error', () => { - ffmpegProcess.kill(); - fail(res); - }); - audio.on('aborted', () => { - ffmpegProcess.kill(); - fail(res); - }); - ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); - ffmpegProcess.on('close', () => ffmpegProcess.kill()); - ffmpegProcess.on('exit', () => ffmpegProcess.kill()); - res.on('finish', () => ffmpegProcess.kill()); - res.on('close', () => ffmpegProcess.kill()); - ffmpegProcess.on('error', () => { - ffmpegProcess.kill(); - fail(res); - }); + audio.on('error', shutdown) + .pipe(process.stdio[3]).on('error', shutdown); - } catch (e) { - fail(res); + process.stdio[4].pipe(res).on('error', shutdown); + process.on('close', shutdown); + res.on('finish', shutdown); + res.on('close', shutdown); + } catch { + shutdown(); } } + export function streamAudioOnly(streamInfo, res) { + let process; + const shutdown = () => (process?.kill(), closeResponse(res)); + try { let args = [ '-loglevel', '-8', '-threads', `${getThreads()}`, '-i', streamInfo.urls ] + if (streamInfo.metadata) { if (streamInfo.metadata.cover) { // currently corrupts the audio args.push('-i', streamInfo.metadata.cover, '-map', '0:a', '-map', '1:0') @@ -113,13 +104,14 @@ export function streamAudioOnly(streamInfo, res) { } else { args.push('-vn') } + let arg = streamInfo.copy ? ffmpegArgs["copy"] : ffmpegArgs["audio"]; args = args.concat(arg); if (ffmpegArgs[streamInfo.audioFormat]) args = args.concat(ffmpegArgs[streamInfo.audioFormat]); args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3'); - const ffmpegProcess = spawn(ffmpeg, args, { + process = spawn(ffmpeg, args, { windowsHide: true, stdio: [ 'inherit', 'inherit', 'inherit', @@ -128,22 +120,20 @@ export function streamAudioOnly(streamInfo, res) { }); res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`)); - ffmpegProcess.stdio[3].pipe(res); - ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); - ffmpegProcess.on('close', () => ffmpegProcess.kill()); - ffmpegProcess.on('exit', () => ffmpegProcess.kill()); - res.on('finish', () => ffmpegProcess.kill()); - res.on('close', () => ffmpegProcess.kill()); - ffmpegProcess.on('error', () => { - ffmpegProcess.kill(); - fail(res); - }); - } catch (e) { - fail(res); + process.stdio[3].pipe(res); + process.on('close', shutdown); + res.on('finish', shutdown); + res.on('close', shutdown); + } catch { + shutdown(); } } + export function streamVideoOnly(streamInfo, res) { + let process; + const shutdown = () => (process?.kill(), closeResponse(res)); + try { let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1], args = [ '-loglevel', '-8', @@ -155,7 +145,7 @@ export function streamVideoOnly(streamInfo, res) { if (streamInfo.service === "vimeo" || streamInfo.service === "rutube") args.push('-bsf:a', 'aac_adtstoasc'); if (format === "mp4") args.push('-movflags', 'faststart+frag_keyframe+empty_moov'); args.push('-f', format, 'pipe:3'); - const ffmpegProcess = spawn(ffmpeg, args, { + process = spawn(ffmpeg, args, { windowsHide: true, stdio: [ 'inherit', 'inherit', 'inherit', @@ -164,18 +154,12 @@ export function streamVideoOnly(streamInfo, res) { }); res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - ffmpegProcess.stdio[3].pipe(res); - ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); - ffmpegProcess.on('close', () => ffmpegProcess.kill()); - ffmpegProcess.on('exit', () => ffmpegProcess.kill()); - res.on('finish', () => ffmpegProcess.kill()); - res.on('close', () => ffmpegProcess.kill()); - ffmpegProcess.on('error', () => { - ffmpegProcess.kill(); - fail(res); - }); - } catch (e) { - fail(res); + process.stdio[3].pipe(res); + process.on('close', shutdown); + res.on('finish', shutdown); + res.on('close', shutdown); + } catch { + shutdown(); } } From 1508a0bff48049b92bdb8f1262fefd87c6f2b435 Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Sun, 5 Nov 2023 22:07:34 +0000 Subject: [PATCH 2/8] stream: send SIGKILL after timeout in case the ffmpeg process decides to hang when SIGTERM'd --- src/modules/stream/types.js | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index 65df420..5c7fbdb 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -11,6 +11,14 @@ function closeResponse(res) { return res.destroy(); } +function killProcess(p) { + p?.kill(); + setTimeout(() => { + if (p?.exitCode === null) + p?.kill(9); + }, 5000); +} + export async function streamDefault(streamInfo, res) { const abortController = new AbortController(); const shutdown = () => (abortController.abort(), closeResponse(res)); @@ -37,7 +45,7 @@ export async function streamDefault(streamInfo, res) { export async function streamLiveRender(streamInfo, res) { let abortController = new AbortController(), process; - const shutdown = () => (abortController.abort(), process?.kill(), closeResponse(res)); + const shutdown = () => (abortController.abort(), killProcess(process), closeResponse(res)); try { if (streamInfo.urls.length !== 2) return shutdown(); @@ -85,7 +93,7 @@ export async function streamLiveRender(streamInfo, res) { export function streamAudioOnly(streamInfo, res) { let process; - const shutdown = () => (process?.kill(), closeResponse(res)); + const shutdown = () => (killProcess(process), closeResponse(res)); try { let args = [ @@ -132,7 +140,7 @@ export function streamAudioOnly(streamInfo, res) { export function streamVideoOnly(streamInfo, res) { let process; - const shutdown = () => (process?.kill(), closeResponse(res)); + const shutdown = () => (killProcess(process), closeResponse(res)); try { let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1], args = [ From ed646b826db54ad3afd532eec51cdea29b090a08 Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Sun, 5 Nov 2023 22:09:54 +0000 Subject: [PATCH 3/8] stream: wrap abort controller in try-catch --- src/modules/stream/types.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index 5c7fbdb..2c1bece 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -6,6 +6,10 @@ import { request } from "undici"; import { create as contentDisposition } from "content-disposition-header"; import { AbortController } from "abort-controller" +function closeRequest(controller) { + try { controller.abort() } catch {} +} + function closeResponse(res) { if (!res.headersSent) res.sendStatus(500); return res.destroy(); @@ -21,7 +25,7 @@ function killProcess(p) { export async function streamDefault(streamInfo, res) { const abortController = new AbortController(); - const shutdown = () => (abortController.abort(), closeResponse(res)); + const shutdown = () => (closeRequest(abortController), closeResponse(res)); try { const filename = streamInfo.isAudioOnly ? `${streamInfo.filename}.${streamInfo.audioFormat}` : streamInfo.filename; @@ -45,7 +49,7 @@ export async function streamDefault(streamInfo, res) { export async function streamLiveRender(streamInfo, res) { let abortController = new AbortController(), process; - const shutdown = () => (abortController.abort(), killProcess(process), closeResponse(res)); + const shutdown = () => (closeRequest(abortController), killProcess(process), closeResponse(res)); try { if (streamInfo.urls.length !== 2) return shutdown(); From aabde229ed39bab6701e39d545490fe9f20b0533 Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Sun, 5 Nov 2023 22:12:17 +0000 Subject: [PATCH 4/8] stream: generalize pipe event handling --- src/modules/stream/types.js | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index 2c1bece..8a1afd7 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -23,6 +23,16 @@ function killProcess(p) { }, 5000); } +function pipe(from, to, done) { + from.on('error', done) + .on('close', done); + + to.on('error', done) + .on('close', done); + + from.pipe(to); +} + export async function streamDefault(streamInfo, res) { const abortController = new AbortController(); const shutdown = () => (closeRequest(abortController), closeResponse(res)); @@ -40,8 +50,7 @@ export async function streamDefault(streamInfo, res) { res.setHeader('content-type', headers['content-type']); res.setHeader('content-length', headers['content-length']); - stream.on('error', shutdown) - .pipe(res).on('error', shutdown); + pipe(stream, res, shutdown); } catch { shutdown(); } @@ -83,13 +92,11 @@ export async function streamLiveRender(streamInfo, res) { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - audio.on('error', shutdown) - .pipe(process.stdio[3]).on('error', shutdown); + pipe(audio, process.stdio[3], shutdown); + pipe(process.stdio[4], res, shutdown); - process.stdio[4].pipe(res).on('error', shutdown); process.on('close', shutdown); res.on('finish', shutdown); - res.on('close', shutdown); } catch { shutdown(); } @@ -133,10 +140,8 @@ export function streamAudioOnly(streamInfo, res) { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`)); - process.stdio[3].pipe(res); - process.on('close', shutdown); + pipe(process.stdio[3], res, shutdown); res.on('finish', shutdown); - res.on('close', shutdown); } catch { shutdown(); } @@ -167,10 +172,10 @@ export function streamVideoOnly(streamInfo, res) { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - process.stdio[3].pipe(res); + pipe(process.stdio[3], res, shutdown); + process.on('close', shutdown); res.on('finish', shutdown); - res.on('close', shutdown); } catch { shutdown(); } From 58f7ed7827131aed59e4039e77606d095a5bf9bf Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Sun, 5 Nov 2023 22:16:49 +0000 Subject: [PATCH 5/8] stream: use descriptive variables for i/o for better readability --- src/modules/stream/types.js | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index 8a1afd7..a926098 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -88,12 +88,13 @@ export async function streamLiveRender(streamInfo, res) { 'pipe', 'pipe' ], }); + const [,,, audioInput, muxOutput] = process.stdio; res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - pipe(audio, process.stdio[3], shutdown); - pipe(process.stdio[4], res, shutdown); + pipe(audio, audioInput, shutdown); + pipe(muxOutput, res, shutdown); process.on('close', shutdown); res.on('finish', shutdown); @@ -137,10 +138,11 @@ export function streamAudioOnly(streamInfo, res) { 'pipe' ], }); + const [,,, muxOutput] = process.stdio; res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`)); - pipe(process.stdio[3], res, shutdown); + pipe(muxOutput, res, shutdown); res.on('finish', shutdown); } catch { shutdown(); @@ -169,10 +171,11 @@ export function streamVideoOnly(streamInfo, res) { 'pipe' ], }); + const [,,, muxOutput] = process.stdio; res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - pipe(process.stdio[3], res, shutdown); + pipe(muxOutput, res, shutdown); process.on('close', shutdown); res.on('finish', shutdown); From 33072003bcb83a8f25ebb62143282817948ce7f2 Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Mon, 6 Nov 2023 00:31:44 +0000 Subject: [PATCH 6/8] stream: use strings for signals instead of number hopefully a little more explanatory than "9" --- src/modules/stream/types.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index a926098..38ebc68 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -16,10 +16,10 @@ function closeResponse(res) { } function killProcess(p) { - p?.kill(); + p?.kill('SIGTERM'); setTimeout(() => { if (p?.exitCode === null) - p?.kill(9); + p?.kill('SIGKILL'); }, 5000); } From 758bb8fef7f6322aee60edb9804164e0c3447bb4 Mon Sep 17 00:00:00 2001 From: wukko Date: Mon, 6 Nov 2023 06:44:34 +0600 Subject: [PATCH 7/8] types: added comments --- src/modules/stream/types.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index 38ebc68..dc9579a 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -16,9 +16,11 @@ function closeResponse(res) { } function killProcess(p) { + // ask the process to terminate itself gracefully p?.kill('SIGTERM'); setTimeout(() => { if (p?.exitCode === null) + // brutally murder the process if it didn't quit p?.kill('SIGKILL'); }, 5000); } From b01c9f3e54789de25166115b92cec4f66683f22d Mon Sep 17 00:00:00 2001 From: wukko Date: Mon, 6 Nov 2023 06:53:54 +0600 Subject: [PATCH 8/8] types: make streamVideoOnly more readable --- src/modules/stream/types.js | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index dc9579a..bf6d759 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -90,6 +90,7 @@ export async function streamLiveRender(streamInfo, res) { 'pipe', 'pipe' ], }); + const [,,, audioInput, muxOutput] = process.stdio; res.setHeader('Connection', 'keep-alive'); @@ -140,7 +141,9 @@ export function streamAudioOnly(streamInfo, res) { 'pipe' ], }); + const [,,, muxOutput] = process.stdio; + res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`)); @@ -156,7 +159,7 @@ export function streamVideoOnly(streamInfo, res) { const shutdown = () => (killProcess(process), closeResponse(res)); try { - let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1], args = [ + let args = [ '-loglevel', '-8', '-threads', `${getThreads()}`, '-i', streamInfo.urls, @@ -164,8 +167,11 @@ export function streamVideoOnly(streamInfo, res) { ] if (streamInfo.mute) args.push('-an'); if (streamInfo.service === "vimeo" || streamInfo.service === "rutube") args.push('-bsf:a', 'aac_adtstoasc'); + + let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; if (format === "mp4") args.push('-movflags', 'faststart+frag_keyframe+empty_moov'); args.push('-f', format, 'pipe:3'); + process = spawn(ffmpeg, args, { windowsHide: true, stdio: [ @@ -173,7 +179,9 @@ export function streamVideoOnly(streamInfo, res) { 'pipe' ], }); + const [,,, muxOutput] = process.stdio; + res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));