cobalt/src/modules/stream/internal.js

103 lines
2.9 KiB
JavaScript
Raw Normal View History

2024-04-27 13:51:12 +01:00
import { request } from 'undici';
import { Readable } from 'node:stream';
import { assert } from 'console';
import { getHeaders, pipe } from './shared.js';
const CHUNK_SIZE = BigInt(8e6); // 8 MB
const min = (a, b) => a < b ? a : b;
async function* readChunks(streamInfo, size) {
let read = 0n;
while (read < size) {
if (streamInfo.controller.signal.aborted) {
throw new Error("controller aborted");
}
const chunk = await request(streamInfo.url, {
headers: {
...getHeaders('youtube'),
Range: `bytes=${read}-${read + CHUNK_SIZE}`
},
dispatcher: streamInfo.dispatcher,
signal: streamInfo.controller.signal
});
const expected = min(CHUNK_SIZE, size - read);
const received = BigInt(chunk.headers['content-length']);
if (received < expected / 2n) {
streamInfo.controller.abort();
}
for await (const data of chunk.body) {
yield data;
}
read += received;
}
}
function chunkedStream(streamInfo, size) {
assert(streamInfo.controller instanceof AbortController);
const stream = Readable.from(readChunks(streamInfo, size));
return stream;
}
async function handleYoutubeStream(streamInfo, res) {
try {
const req = await fetch(streamInfo.url, {
headers: getHeaders('youtube'),
method: 'HEAD',
dispatcher: streamInfo.dispatcher,
signal: streamInfo.controller.signal
});
streamInfo.url = req.url;
const size = BigInt(req.headers.get('content-length'));
if (req.status !== 200 || !size) {
return res.end();
}
const stream = chunkedStream(streamInfo, size);
for (const headerName of ['content-type', 'content-length']) {
const headerValue = req.headers.get(headerName);
if (headerValue) res.setHeader(headerName, headerValue);
}
pipe(stream, res, () => res.end());
} catch {
res.end();
}
}
2024-04-26 12:53:50 +01:00
export async function internalStream(streamInfo, res) {
if (streamInfo.service === 'youtube') {
return handleYoutubeStream(streamInfo, res);
}
2024-04-26 12:53:50 +01:00
try {
const req = await request(streamInfo.url, {
headers: {
...streamInfo.headers,
host: undefined
},
dispatcher: streamInfo.dispatcher,
2024-04-26 12:53:50 +01:00
signal: streamInfo.controller.signal,
maxRedirections: 16
});
res.status(req.statusCode);
for (const [ name, value ] of Object.entries(req.headers))
res.setHeader(name, value)
if (req.statusCode < 200 || req.statusCode > 299)
return res.end();
2024-04-26 12:53:50 +01:00
pipe(req.body, res, () => res.end());
2024-04-26 12:53:50 +01:00
} catch {
streamInfo.controller.abort();
}
}