stream: fix some memory leaks in internal stream handling (#581)

This commit is contained in:
jj 2024-06-22 12:57:30 +02:00 committed by GitHub
parent 21d5b4b8d4
commit ef97ff06af
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 24 additions and 10 deletions

View file

@ -1,6 +1,5 @@
import { request } from 'undici'; import { request } from 'undici';
import { Readable } from 'node:stream'; import { Readable } from 'node:stream';
import { assert } from 'console';
import { getHeaders, pipe } from './shared.js'; import { getHeaders, pipe } from './shared.js';
import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js'; import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js';
@ -38,19 +37,15 @@ async function* readChunks(streamInfo, size) {
} }
} }
function chunkedStream(streamInfo, size) {
assert(streamInfo.controller instanceof AbortController);
const stream = Readable.from(readChunks(streamInfo, size));
return stream;
}
async function handleYoutubeStream(streamInfo, res) { async function handleYoutubeStream(streamInfo, res) {
const { signal } = streamInfo.controller;
try { try {
const req = await fetch(streamInfo.url, { const req = await fetch(streamInfo.url, {
headers: getHeaders('youtube'), headers: getHeaders('youtube'),
method: 'HEAD', method: 'HEAD',
dispatcher: streamInfo.dispatcher, dispatcher: streamInfo.dispatcher,
signal: streamInfo.controller.signal signal
}); });
streamInfo.url = req.url; streamInfo.url = req.url;
@ -60,7 +55,16 @@ async function handleYoutubeStream(streamInfo, res) {
return res.end(); return res.end();
} }
const stream = chunkedStream(streamInfo, size); const generator = readChunks(streamInfo, size);
const abortGenerator = () => {
generator.return();
signal.removeEventListener('abort', abortGenerator);
}
signal.addEventListener('abort', abortGenerator);
const stream = Readable.from(generator);
for (const headerName of ['content-type', 'content-length']) { for (const headerName of ['content-type', 'content-length']) {
const headerValue = req.headers.get(headerName); const headerValue = req.headers.get(headerName);
@ -69,6 +73,7 @@ async function handleYoutubeStream(streamInfo, res) {
pipe(stream, res, () => res.end()); pipe(stream, res, () => res.end());
} catch { } catch {
signal.abort();
res.end(); res.end();
} }
} }

View file

@ -78,16 +78,25 @@ export function createInternalStream(url, obj = {}) {
} }
const streamID = nanoid(); const streamID = nanoid();
const controller = new AbortController();
internalStreamCache[streamID] = { internalStreamCache[streamID] = {
url, url,
service: obj.service, service: obj.service,
headers: obj.headers, headers: obj.headers,
controller: new AbortController(), controller,
dispatcher dispatcher
}; };
let streamLink = new URL('/api/istream', `http://127.0.0.1:${env.apiPort}`); let streamLink = new URL('/api/istream', `http://127.0.0.1:${env.apiPort}`);
streamLink.searchParams.set('id', streamID); streamLink.searchParams.set('id', streamID);
const cleanup = () => {
destroyInternalStream(streamLink);
controller.signal.removeEventListener('abort', cleanup);
}
controller.signal.addEventListener('abort', cleanup);
return streamLink.toString(); return streamLink.toString();
} }