api/stream: add Estimated-Content-Length header to tunnels

present where Content-Length cannot be accurately calculated,
pure proxy streams do not have this header and instead have
the accurate Content-Length one.
This commit is contained in:
jj 2024-12-16 17:07:30 +00:00
parent 11388cb418
commit 05e0f031ed
No known key found for this signature in database
5 changed files with 163 additions and 9 deletions

View file

@ -1,5 +1,6 @@
import HLS from "hls-parser";
import { createInternalStream } from "./manage.js";
import { request } from "undici";
function getURL(url) {
try {
@ -74,3 +75,59 @@ export async function handleHlsPlaylist(streamInfo, req, res) {
res.send(hlsPlaylist);
}
async function getSegmentSize(url, config) {
const segmentResponse = await request(url, {
...config,
throwOnError: true
});
if (segmentResponse.headers['content-length']) {
segmentResponse.body.dump();
return +segmentResponse.headers['content-length'];
}
// if the response does not have a content-length
// header, we have to compute it ourselves
let size = 0;
for await (const data of segmentResponse.body) {
size += data.length;
}
return size;
}
export async function probeInternalHLSTunnel(streamInfo) {
const { url, headers, dispatcher, signal } = streamInfo;
// remove all falsy headers
Object.keys(headers).forEach(key => {
if (!headers[key]) delete headers[key];
});
const config = { headers, dispatcher, signal, maxRedirections: 16 };
const manifestResponse = await fetch(url, config);
const manifest = HLS.parse(await manifestResponse.text());
if (manifest.segments.length === 0)
return -1;
const segmentSamples = await Promise.all(
Array(5).fill().map(async () => {
const manifestIdx = Math.floor(Math.random() * manifest.segments.length);
const randomSegment = manifest.segments[manifestIdx];
if (!randomSegment.uri)
throw "segment is missing URI";
const segmentSize = await getSegmentSize(randomSegment.uri, config) / randomSegment.duration;
return segmentSize;
})
);
const averageBitrate = segmentSamples.reduce((a, b) => a + b) / segmentSamples.length;
const totalDuration = manifest.segments.reduce((acc, segment) => acc + segment.duration, 0);
return averageBitrate * totalDuration;
}

View file

@ -1,7 +1,7 @@
import { request } from "undici";
import { Readable } from "node:stream";
import { closeRequest, getHeaders, pipe } from "./shared.js";
import { handleHlsPlaylist, isHlsResponse } from "./internal-hls.js";
import { handleHlsPlaylist, isHlsResponse, probeInternalHLSTunnel } from "./internal-hls.js";
const CHUNK_SIZE = BigInt(8e6); // 8 MB
const min = (a, b) => a < b ? a : b;
@ -130,3 +130,40 @@ export function internalStream(streamInfo, res) {
return handleGenericStream(streamInfo, res);
}
export async function probeInternalTunnel(streamInfo) {
try {
const signal = AbortSignal.timeout(3000);
const headers = {
...Object.fromEntries(streamInfo.headers || []),
...getHeaders(streamInfo.service),
host: undefined,
range: undefined
};
if (streamInfo.isHLS) {
return probeInternalHLSTunnel({
...streamInfo,
signal,
headers
});
}
const response = await request(streamInfo.url, {
method: 'HEAD',
headers,
dispatcher: streamInfo.dispatcher,
signal,
maxRedirections: 16
});
if (response.statusCode !== 200)
throw "status is not 200 OK";
const size = +response.headers['content-length'];
if (isNaN(size))
throw "content-length is not a number";
return size;
} catch {}
}

View file

@ -72,6 +72,16 @@ export function getInternalTunnel(id) {
return internalStreamCache.get(id);
}
export function getInternalTunnelFromURL(url) {
url = new URL(url);
if (url.hostname !== '127.0.0.1') {
return;
}
const id = url.searchParams.get('id');
return getInternalTunnel(id);
}
export function createInternalStream(url, obj = {}) {
assert(typeof url === 'string');

View file

@ -1,5 +1,7 @@
import { genericUserAgent } from "../config.js";
import { vkClientAgent } from "../processing/services/vk.js";
import { getInternalTunnelFromURL } from "./manage.js";
import { probeInternalTunnel } from "./internal.js";
const defaultHeaders = {
'user-agent': genericUserAgent
@ -47,3 +49,40 @@ export function pipe(from, to, done) {
from.pipe(to);
}
export async function estimateTunnelLength(streamInfo, multiplier = 1) {
let urls = streamInfo.urls;
if (!Array.isArray(urls)) {
urls = [ urls ];
}
const internalTunnels = urls.map(getInternalTunnelFromURL);
if (internalTunnels.some(t => !t))
return -1;
const sizes = await Promise.all(internalTunnels.map(probeInternalTunnel));
const estimatedSize = sizes.reduce(
// if one of the sizes is missing, let's just make a very
// bold guess that it's the same size as the existing one
(acc, cur) => cur <= 0 ? acc * 2 : acc + cur,
0
);
if (isNaN(estimatedSize) || estimatedSize <= 0) {
return -1;
}
return Math.floor(estimatedSize * multiplier);
}
export function estimateAudioMultiplier(streamInfo) {
if (streamInfo.audioFormat === 'wav') {
return 1411 / 128;
}
if (streamInfo.audioCopy) {
return 1;
}
return streamInfo.audioBitrate / 128;
}

View file

@ -1,4 +1,4 @@
import { Agent, request } from "undici";
import { Agent, request, stream } from "undici";
import ffmpeg from "ffmpeg-static";
import { spawn } from "child_process";
import { create as contentDisposition } from "content-disposition-header";
@ -6,7 +6,7 @@ import { create as contentDisposition } from "content-disposition-header";
import { env } from "../config.js";
import { destroyInternalStream } from "./manage.js";
import { hlsExceptions } from "../processing/service-config.js";
import { getHeaders, closeRequest, closeResponse, pipe } from "./shared.js";
import { getHeaders, closeRequest, closeResponse, pipe, estimateTunnelLength, estimateAudioMultiplier } from "./shared.js";
const ffmpegArgs = {
webm: ["-c:v", "copy", "-c:a", "copy"],
@ -73,6 +73,7 @@ const proxy = async (streamInfo, res) => {
try {
res.setHeader('Cross-Origin-Resource-Policy', 'cross-origin');
res.setHeader('Content-disposition', contentDisposition(streamInfo.filename));
res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo));
const { body: stream, headers, statusCode } = await request(streamInfo.urls, {
headers: {
@ -98,7 +99,7 @@ const proxy = async (streamInfo, res) => {
}
}
const merge = (streamInfo, res) => {
const merge = async (streamInfo, res) => {
let process;
const shutdown = () => (
killProcess(process),
@ -112,7 +113,7 @@ const merge = (streamInfo, res) => {
try {
if (streamInfo.urls.length !== 2) return shutdown();
const format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
const format = streamInfo.filename.split('.').pop();
let args = [
'-loglevel', '-8',
@ -152,6 +153,7 @@ const merge = (streamInfo, res) => {
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo));
pipe(muxOutput, res, shutdown);
@ -162,7 +164,7 @@ const merge = (streamInfo, res) => {
}
}
const remux = (streamInfo, res) => {
const remux = async (streamInfo, res) => {
let process;
const shutdown = () => (
killProcess(process),
@ -196,7 +198,7 @@ const remux = (streamInfo, res) => {
args.push('-bsf:a', 'aac_adtstoasc');
}
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
let format = streamInfo.filename.split('.').pop();
if (format === "mp4") {
args.push('-movflags', 'faststart+frag_keyframe+empty_moov')
}
@ -215,6 +217,7 @@ const remux = (streamInfo, res) => {
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo));
pipe(muxOutput, res, shutdown);
@ -225,7 +228,7 @@ const remux = (streamInfo, res) => {
}
}
const convertAudio = (streamInfo, res) => {
const convertAudio = async (streamInfo, res) => {
let process;
const shutdown = () => (
killProcess(process),
@ -284,6 +287,13 @@ const convertAudio = (streamInfo, res) => {
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
res.setHeader(
'Estimated-Content-Length',
await estimateTunnelLength(
streamInfo,
estimateAudioMultiplier(streamInfo) * 1.1
)
);
pipe(muxOutput, res, shutdown);
res.on('finish', shutdown);
@ -292,7 +302,7 @@ const convertAudio = (streamInfo, res) => {
}
}
const convertGif = (streamInfo, res) => {
const convertGif = async (streamInfo, res) => {
let process;
const shutdown = () => (killProcess(process), closeResponse(res));
@ -321,6 +331,7 @@ const convertGif = (streamInfo, res) => {
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo, 60));
pipe(muxOutput, res, shutdown);