api/stream: implement itunnel transplants

This commit is contained in:
jj 2025-01-20 15:55:26 +00:00
parent c07940bfa4
commit 600c769141
No known key found for this signature in database
3 changed files with 83 additions and 4 deletions

View file

@ -29,3 +29,7 @@ export function splitFilenameExtension(filename) {
return [ parts.join('.'), ext ] return [ parts.join('.'), ext ]
} }
} }
export function zip(a, b) {
return a.map((value, i) => [ value, b[i] ]);
}

View file

@ -7,7 +7,7 @@ const CHUNK_SIZE = BigInt(8e6); // 8 MB
const min = (a, b) => a < b ? a : b; const min = (a, b) => a < b ? a : b;
async function* readChunks(streamInfo, size) { async function* readChunks(streamInfo, size) {
let read = 0n; let read = 0n, chunksSinceTransplant = 0;
while (read < size) { while (read < size) {
if (streamInfo.controller.signal.aborted) { if (streamInfo.controller.signal.aborted) {
throw new Error("controller aborted"); throw new Error("controller aborted");
@ -22,6 +22,16 @@ async function* readChunks(streamInfo, size) {
signal: streamInfo.controller.signal signal: streamInfo.controller.signal
}); });
if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) {
chunksSinceTransplant = 0;
try {
await streamInfo.transplant(streamInfo.dispatcher);
continue;
} catch {}
}
chunksSinceTransplant++;
const expected = min(CHUNK_SIZE, size - read); const expected = min(CHUNK_SIZE, size - read);
const received = BigInt(chunk.headers['content-length']); const received = BigInt(chunk.headers['content-length']);

View file

@ -9,6 +9,7 @@ import { env } from "../config.js";
import { closeRequest } from "./shared.js"; import { closeRequest } from "./shared.js";
import { decryptStream, encryptStream } from "../misc/crypto.js"; import { decryptStream, encryptStream } from "../misc/crypto.js";
import { hashHmac } from "../security/secrets.js"; import { hashHmac } from "../security/secrets.js";
import { zip } from "../misc/utils.js";
// optional dependency // optional dependency
const freebind = env.freebindCIDR && await import('freebind').catch(() => {}); const freebind = env.freebindCIDR && await import('freebind').catch(() => {});
@ -40,7 +41,7 @@ export function createStream(obj) {
audioFormat: obj.audioFormat, audioFormat: obj.audioFormat,
isHLS: obj.isHLS || false, isHLS: obj.isHLS || false,
originalRequest: obj.parameters originalRequest: obj.originalRequest
}; };
// FIXME: this is now a Promise, but it is not awaited // FIXME: this is now a Promise, but it is not awaited
@ -101,6 +102,7 @@ export function createInternalStream(url, obj = {}) {
controller, controller,
dispatcher, dispatcher,
isHLS: obj.isHLS, isHLS: obj.isHLS,
transplant: obj.transplant
}); });
let streamLink = new URL('/itunnel', `http://127.0.0.1:${env.tunnelPort}`); let streamLink = new URL('/itunnel', `http://127.0.0.1:${env.tunnelPort}`);
@ -116,13 +118,17 @@ export function createInternalStream(url, obj = {}) {
return streamLink.toString(); return streamLink.toString();
} }
export function destroyInternalStream(url) { function getInternalTunnelId(url) {
url = new URL(url); url = new URL(url);
if (url.hostname !== '127.0.0.1') { if (url.hostname !== '127.0.0.1') {
return; return;
} }
const id = url.searchParams.get('id'); return url.searchParams.get('id');
}
export function destroyInternalStream(url) {
const id = getInternalTunnelId(url);
if (internalStreamCache.has(id)) { if (internalStreamCache.has(id)) {
closeRequest(getInternalStream(id)?.controller); closeRequest(getInternalStream(id)?.controller);
@ -130,9 +136,68 @@ export function destroyInternalStream(url) {
} }
} }
const transplantInternalTunnels = function(tunnelUrls, transplantUrls) {
if (tunnelUrls.length !== transplantUrls.length) {
return;
}
for (const [ tun, url ] of zip(tunnelUrls, transplantUrls)) {
const id = getInternalTunnelId(tun);
const itunnel = getInternalStream(id);
if (!itunnel) continue;
itunnel.url = url;
}
}
const transplantTunnel = async function (dispatcher) {
if (this.pendingTransplant) {
await this.pendingTransplant;
return;
}
let finished;
this.pendingTransplant = new Promise(r => finished = r);
try {
const handler = await import(`../processing/services/${this.service}.js`);
const response = await handler.default({
...this.originalRequest,
dispatcher
});
if (!response.urls) {
return;
}
response.urls = [response.urls].flat();
if (this.originalRequest.isAudioOnly && response.urls.length > 1) {
response.urls = [response.urls[1]];
} else if (this.originalRequest.isAudioMuted) {
response.urls = [response.urls[0]];
}
const tunnels = [this.urls].flat();
if (tunnels.length !== response.urls.length) {
return;
}
transplantInternalTunnels(tunnels, response.urls);
}
catch {}
finally {
finished();
delete this.pendingTransplant;
}
}
function wrapStream(streamInfo) { function wrapStream(streamInfo) {
const url = streamInfo.urls; const url = streamInfo.urls;
if (streamInfo.originalRequest) {
streamInfo.transplant = transplantTunnel.bind(streamInfo);
}
if (typeof url === 'string') { if (typeof url === 'string') {
streamInfo.urls = createInternalStream(url, streamInfo); streamInfo.urls = createInternalStream(url, streamInfo);
} else if (Array.isArray(url)) { } else if (Array.isArray(url)) {