stream/internal: don't abort immediately after close for generic streams (#584)

* stream: move closeRequest to shared functions

* stream: use closeRequest instead of abort() directly

* stream/internal: don't abort immediately after close for generic streams
This commit is contained in:
jj 2024-06-23 17:37:02 +02:00 committed by GitHub
parent a6733ef0cc
commit 33c3c398fc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 14 additions and 11 deletions

View file

@ -1,6 +1,6 @@
import { request } from 'undici'; import { request } from 'undici';
import { Readable } from 'node:stream'; import { Readable } from 'node:stream';
import { getHeaders, pipe } from './shared.js'; import { closeRequest, getHeaders, pipe } from './shared.js';
import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js'; import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js';
const CHUNK_SIZE = BigInt(8e6); // 8 MB const CHUNK_SIZE = BigInt(8e6); // 8 MB
@ -26,7 +26,7 @@ async function* readChunks(streamInfo, size) {
const received = BigInt(chunk.headers['content-length']); const received = BigInt(chunk.headers['content-length']);
if (received < expected / 2n) { if (received < expected / 2n) {
streamInfo.controller.abort(); closeRequest(streamInfo.controller);
} }
for await (const data of chunk.body) { for await (const data of chunk.body) {
@ -39,7 +39,7 @@ async function* readChunks(streamInfo, size) {
async function handleYoutubeStream(streamInfo, res) { async function handleYoutubeStream(streamInfo, res) {
const { signal } = streamInfo.controller; const { signal } = streamInfo.controller;
const cleanup = () => (res.end(), streamInfo.controller.abort()); const cleanup = () => (res.end(), closeRequest(streamInfo.controller));
try { try {
const req = await fetch(streamInfo.url, { const req = await fetch(streamInfo.url, {
@ -80,7 +80,7 @@ async function handleYoutubeStream(streamInfo, res) {
async function handleGenericStream(streamInfo, res) { async function handleGenericStream(streamInfo, res) {
const { signal } = streamInfo.controller; const { signal } = streamInfo.controller;
const cleanup = () => (res.end(), streamInfo.controller.abort()); const cleanup = () => res.end();
try { try {
const req = await request(streamInfo.url, { const req = await request(streamInfo.url, {
@ -94,12 +94,13 @@ async function handleGenericStream(streamInfo, res) {
}); });
res.status(req.statusCode); res.status(req.statusCode);
req.body.on('error', () => {});
for (const [ name, value ] of Object.entries(req.headers)) for (const [ name, value ] of Object.entries(req.headers))
res.setHeader(name, value) res.setHeader(name, value)
if (req.statusCode < 200 || req.statusCode > 299) if (req.statusCode < 200 || req.statusCode > 299)
return res.end(); return cleanup();
if (isHlsRequest(req)) { if (isHlsRequest(req)) {
await handleHlsPlaylist(streamInfo, req, res); await handleHlsPlaylist(streamInfo, req, res);
@ -107,6 +108,7 @@ async function handleGenericStream(streamInfo, res) {
pipe(req.body, res, cleanup); pipe(req.body, res, cleanup);
} }
} catch { } catch {
closeRequest(streamInfo.controller);
cleanup(); cleanup();
} }
} }

View file

@ -5,6 +5,7 @@ import { nanoid } from "nanoid";
import { decryptStream, encryptStream, generateHmac } from "../sub/crypto.js"; import { decryptStream, encryptStream, generateHmac } from "../sub/crypto.js";
import { env } from "../config.js"; import { env } from "../config.js";
import { strict as assert } from "assert"; import { strict as assert } from "assert";
import { closeRequest } from "./shared.js";
// optional dependency // optional dependency
const freebind = env.freebindCIDR && await import('freebind').catch(() => {}); const freebind = env.freebindCIDR && await import('freebind').catch(() => {});
@ -109,7 +110,7 @@ export function destroyInternalStream(url) {
const id = url.searchParams.get('id'); const id = url.searchParams.get('id');
if (internalStreamCache[id]) { if (internalStreamCache[id]) {
internalStreamCache[id].controller.abort(); closeRequest(internalStreamCache[id].controller);
delete internalStreamCache[id]; delete internalStreamCache[id];
} }
} }

View file

@ -16,6 +16,10 @@ const serviceHeaders = {
} }
} }
export function closeRequest(controller) {
try { controller.abort() } catch {}
}
export function closeResponse(res) { export function closeResponse(res) {
if (!res.headersSent) { if (!res.headersSent) {
res.sendStatus(500); res.sendStatus(500);

View file

@ -6,7 +6,7 @@ import { create as contentDisposition } from "content-disposition-header";
import { metadataManager } from "../sub/utils.js"; import { metadataManager } from "../sub/utils.js";
import { destroyInternalStream } from "./manage.js"; import { destroyInternalStream } from "./manage.js";
import { env, ffmpegArgs, hlsExceptions } from "../config.js"; import { env, ffmpegArgs, hlsExceptions } from "../config.js";
import { getHeaders, closeResponse, pipe } from "./shared.js"; import { getHeaders, closeRequest, closeResponse, pipe } from "./shared.js";
function toRawHeaders(headers) { function toRawHeaders(headers) {
return Object.entries(headers) return Object.entries(headers)
@ -14,10 +14,6 @@ function toRawHeaders(headers) {
.join(''); .join('');
} }
function closeRequest(controller) {
try { controller.abort() } catch {}
}
function killProcess(p) { function killProcess(p) {
// ask the process to terminate itself gracefully // ask the process to terminate itself gracefully
p?.kill('SIGTERM'); p?.kill('SIGTERM');