diff --git a/api/src/stream/internal-hls.js b/api/src/stream/internal-hls.js index 61f3d361..e4416288 100644 --- a/api/src/stream/internal-hls.js +++ b/api/src/stream/internal-hls.js @@ -1,5 +1,6 @@ import HLS from "hls-parser"; import { createInternalStream } from "./manage.js"; +import { request } from "undici"; function getURL(url) { try { @@ -74,3 +75,59 @@ export async function handleHlsPlaylist(streamInfo, req, res) { res.send(hlsPlaylist); } + +async function getSegmentSize(url, config) { + const segmentResponse = await request(url, { + ...config, + throwOnError: true + }); + + if (segmentResponse.headers['content-length']) { + segmentResponse.body.dump(); + return +segmentResponse.headers['content-length']; + } + + // if the response does not have a content-length + // header, we have to compute it ourselves + let size = 0; + + for await (const data of segmentResponse.body) { + size += data.length; + } + + return size; +} + +export async function probeInternalHLSTunnel(streamInfo) { + const { url, headers, dispatcher, signal } = streamInfo; + + // remove all falsy headers + Object.keys(headers).forEach(key => { + if (!headers[key]) delete headers[key]; + }); + + const config = { headers, dispatcher, signal, maxRedirections: 16 }; + + const manifestResponse = await fetch(url, config); + + const manifest = HLS.parse(await manifestResponse.text()); + if (manifest.segments.length === 0) + return -1; + + const segmentSamples = await Promise.all( + Array(5).fill().map(async () => { + const manifestIdx = Math.floor(Math.random() * manifest.segments.length); + const randomSegment = manifest.segments[manifestIdx]; + if (!randomSegment.uri) + throw "segment is missing URI"; + + const segmentSize = await getSegmentSize(randomSegment.uri, config) / randomSegment.duration; + return segmentSize; + }) + ); + + const averageBitrate = segmentSamples.reduce((a, b) => a + b) / segmentSamples.length; + const totalDuration = manifest.segments.reduce((acc, segment) => acc + segment.duration, 0); + + return averageBitrate * totalDuration; +} diff --git a/api/src/stream/internal.js b/api/src/stream/internal.js index c32a3692..2cfc990c 100644 --- a/api/src/stream/internal.js +++ b/api/src/stream/internal.js @@ -1,7 +1,7 @@ import { request } from "undici"; import { Readable } from "node:stream"; import { closeRequest, getHeaders, pipe } from "./shared.js"; -import { handleHlsPlaylist, isHlsResponse } from "./internal-hls.js"; +import { handleHlsPlaylist, isHlsResponse, probeInternalHLSTunnel } from "./internal-hls.js"; const CHUNK_SIZE = BigInt(8e6); // 8 MB const min = (a, b) => a < b ? a : b; @@ -130,3 +130,40 @@ export function internalStream(streamInfo, res) { return handleGenericStream(streamInfo, res); } + +export async function probeInternalTunnel(streamInfo) { + try { + const signal = AbortSignal.timeout(3000); + const headers = { + ...Object.fromEntries(streamInfo.headers || []), + ...getHeaders(streamInfo.service), + host: undefined, + range: undefined + }; + + if (streamInfo.isHLS) { + return probeInternalHLSTunnel({ + ...streamInfo, + signal, + headers + }); + } + + const response = await request(streamInfo.url, { + method: 'HEAD', + headers, + dispatcher: streamInfo.dispatcher, + signal, + maxRedirections: 16 + }); + + if (response.statusCode !== 200) + throw "status is not 200 OK"; + + const size = +response.headers['content-length']; + if (isNaN(size)) + throw "content-length is not a number"; + + return size; + } catch {} +} diff --git a/api/src/stream/manage.js b/api/src/stream/manage.js index a090ca22..10d25384 100644 --- a/api/src/stream/manage.js +++ b/api/src/stream/manage.js @@ -72,6 +72,16 @@ export function getInternalTunnel(id) { return internalStreamCache.get(id); } +export function getInternalTunnelFromURL(url) { + url = new URL(url); + if (url.hostname !== '127.0.0.1') { + return; + } + + const id = url.searchParams.get('id'); + return getInternalTunnel(id); +} + export function createInternalStream(url, obj = {}) { assert(typeof url === 'string'); diff --git a/api/src/stream/shared.js b/api/src/stream/shared.js index 65af03f0..9a2b6779 100644 --- a/api/src/stream/shared.js +++ b/api/src/stream/shared.js @@ -1,5 +1,7 @@ import { genericUserAgent } from "../config.js"; import { vkClientAgent } from "../processing/services/vk.js"; +import { getInternalTunnelFromURL } from "./manage.js"; +import { probeInternalTunnel } from "./internal.js"; const defaultHeaders = { 'user-agent': genericUserAgent @@ -47,3 +49,40 @@ export function pipe(from, to, done) { from.pipe(to); } + +export async function estimateTunnelLength(streamInfo, multiplier = 1) { + let urls = streamInfo.urls; + if (!Array.isArray(urls)) { + urls = [ urls ]; + } + + const internalTunnels = urls.map(getInternalTunnelFromURL); + if (internalTunnels.some(t => !t)) + return -1; + + const sizes = await Promise.all(internalTunnels.map(probeInternalTunnel)); + const estimatedSize = sizes.reduce( + // if one of the sizes is missing, let's just make a very + // bold guess that it's the same size as the existing one + (acc, cur) => cur <= 0 ? acc * 2 : acc + cur, + 0 + ); + + if (isNaN(estimatedSize) || estimatedSize <= 0) { + return -1; + } + + return Math.floor(estimatedSize * multiplier); +} + +export function estimateAudioMultiplier(streamInfo) { + if (streamInfo.audioFormat === 'wav') { + return 1411 / 128; + } + + if (streamInfo.audioCopy) { + return 1; + } + + return streamInfo.audioBitrate / 128; +} diff --git a/api/src/stream/types.js b/api/src/stream/types.js index 0a4e2d47..688e6eef 100644 --- a/api/src/stream/types.js +++ b/api/src/stream/types.js @@ -1,4 +1,4 @@ -import { Agent, request } from "undici"; +import { Agent, request, stream } from "undici"; import ffmpeg from "ffmpeg-static"; import { spawn } from "child_process"; import { create as contentDisposition } from "content-disposition-header"; @@ -6,7 +6,7 @@ import { create as contentDisposition } from "content-disposition-header"; import { env } from "../config.js"; import { destroyInternalStream } from "./manage.js"; import { hlsExceptions } from "../processing/service-config.js"; -import { getHeaders, closeRequest, closeResponse, pipe } from "./shared.js"; +import { getHeaders, closeRequest, closeResponse, pipe, estimateTunnelLength, estimateAudioMultiplier } from "./shared.js"; const ffmpegArgs = { webm: ["-c:v", "copy", "-c:a", "copy"], @@ -73,6 +73,7 @@ const proxy = async (streamInfo, res) => { try { res.setHeader('Cross-Origin-Resource-Policy', 'cross-origin'); res.setHeader('Content-disposition', contentDisposition(streamInfo.filename)); + res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo)); const { body: stream, headers, statusCode } = await request(streamInfo.urls, { headers: { @@ -98,7 +99,7 @@ const proxy = async (streamInfo, res) => { } } -const merge = (streamInfo, res) => { +const merge = async (streamInfo, res) => { let process; const shutdown = () => ( killProcess(process), @@ -112,7 +113,7 @@ const merge = (streamInfo, res) => { try { if (streamInfo.urls.length !== 2) return shutdown(); - const format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; + const format = streamInfo.filename.split('.').pop(); let args = [ '-loglevel', '-8', @@ -152,6 +153,7 @@ const merge = (streamInfo, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo)); pipe(muxOutput, res, shutdown); @@ -162,7 +164,7 @@ const merge = (streamInfo, res) => { } } -const remux = (streamInfo, res) => { +const remux = async (streamInfo, res) => { let process; const shutdown = () => ( killProcess(process), @@ -196,7 +198,7 @@ const remux = (streamInfo, res) => { args.push('-bsf:a', 'aac_adtstoasc'); } - let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; + let format = streamInfo.filename.split('.').pop(); if (format === "mp4") { args.push('-movflags', 'faststart+frag_keyframe+empty_moov') } @@ -215,6 +217,7 @@ const remux = (streamInfo, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo)); pipe(muxOutput, res, shutdown); @@ -225,7 +228,7 @@ const remux = (streamInfo, res) => { } } -const convertAudio = (streamInfo, res) => { +const convertAudio = async (streamInfo, res) => { let process; const shutdown = () => ( killProcess(process), @@ -284,6 +287,13 @@ const convertAudio = (streamInfo, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + res.setHeader( + 'Estimated-Content-Length', + await estimateTunnelLength( + streamInfo, + estimateAudioMultiplier(streamInfo) * 1.1 + ) + ); pipe(muxOutput, res, shutdown); res.on('finish', shutdown); @@ -292,7 +302,7 @@ const convertAudio = (streamInfo, res) => { } } -const convertGif = (streamInfo, res) => { +const convertGif = async (streamInfo, res) => { let process; const shutdown = () => (killProcess(process), closeResponse(res)); @@ -321,6 +331,7 @@ const convertGif = (streamInfo, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo, 60)); pipe(muxOutput, res, shutdown);