web/workers: use opfs instead of blobs for better memory management

spent almost an entire day figuring this out but it's so worth it
This commit is contained in:
wukko 2025-02-01 23:26:57 +06:00
parent 0a8323be54
commit 5464574a3e
No known key found for this signature in database
GPG key ID: 3E30B3F26C7B4AA2
12 changed files with 124 additions and 79 deletions

View file

@ -65,7 +65,7 @@
<div class="file-status {info.state}"> <div class="file-status {info.state}">
{#if info.state === "done"} {#if info.state === "done"}
<IconCheck /> {formatFileSize(info.resultFile?.size)} <IconCheck /> {formatFileSize(info.resultFile?.file?.size)}
{/if} {/if}
{#if info.state === "running"} {#if info.state === "running"}
@ -95,7 +95,7 @@
{#if info.state === "done" && info.resultFile} {#if info.state === "done" && info.resultFile}
<button <button
class="action-button" class="action-button"
on:click={() => download(info.resultFile)} on:click={() => download(info.resultFile.file)}
> >
<IconDownload /> <IconDownload />
</button> </button>

View file

@ -1,3 +1,4 @@
import { OPFSStorage } from "$lib/storage";
import LibAV, { type LibAV as LibAVInstance } from "@imput/libav.js-remux-cli"; import LibAV, { type LibAV as LibAVInstance } from "@imput/libav.js-remux-cli";
import type { FfprobeData } from "fluent-ffmpeg"; import type { FfprobeData } from "fluent-ffmpeg";
@ -69,27 +70,18 @@ export default class LibAVWrapper {
try { try {
for (let i = 0; i < files.length; i++) { for (let i = 0; i < files.length; i++) {
await libav.mkreadaheadfile(`input${i}`, files[i]); const file = files[i].file;
await libav.mkreadaheadfile(`input${i}`, file);
ffInputs.push('-i', `input${i}`); ffInputs.push('-i', `input${i}`);
} }
await libav.mkwriterdev(outputName); await libav.mkwriterdev(outputName);
await libav.mkwriterdev('progress.txt'); await libav.mkwriterdev('progress.txt');
const totalInputSize = files.reduce((a, b) => a + b.size, 0); const storage = await OPFSStorage.init();
const MB = 1024 * 1024; libav.onwrite = async (name, pos, data) => {
const chunks: Uint8Array[] = [];
const chunkSize = Math.min(512 * MB, totalInputSize);
// since we expect the output file to be roughly the same size
// as inputs, preallocate its size for the output
for (let toAllocate = totalInputSize; toAllocate > 0; toAllocate -= chunkSize) {
chunks.push(new Uint8Array(chunkSize));
}
let actualSize = 0;
libav.onwrite = (name, pos, data) => {
if (name === 'progress.txt') { if (name === 'progress.txt') {
try { try {
return this.#emitProgress(data); return this.#emitProgress(data);
@ -98,26 +90,7 @@ export default class LibAVWrapper {
} }
} else if (name !== outputName) return; } else if (name !== outputName) return;
const writeEnd = pos + data.length; await storage.write(data, pos);
if (writeEnd > chunkSize * chunks.length) {
chunks.push(new Uint8Array(chunkSize));
}
const chunkIndex = pos / chunkSize | 0;
const offset = pos - (chunkSize * chunkIndex);
if (offset + data.length > chunkSize) {
chunks[chunkIndex].set(
data.subarray(0, chunkSize - offset), offset
);
chunks[chunkIndex + 1].set(
data.subarray(chunkSize - offset), 0
);
} else {
chunks[chunkIndex].set(data, offset);
}
actualSize = Math.max(writeEnd, actualSize);
}; };
await libav.ffmpeg([ await libav.ffmpeg([
@ -130,30 +103,14 @@ export default class LibAVWrapper {
outputName outputName
]); ]);
// if we didn't need as much space as we allocated for some reason, const file = await storage.res();
// shrink the buffers so that we don't inflate the file with zeroes
const outputView: Uint8Array[] = [];
for (let i = 0; i < chunks.length; ++i) { if (file.size === 0) return;
outputView.push(
chunks[i].subarray(
0, Math.min(chunkSize, actualSize)
)
);
actualSize -= chunkSize; return {
if (actualSize <= 0) { file,
break; type: output.type,
}
} }
const renderBlob = new Blob(
outputView,
{ type: output.type }
);
if (renderBlob.size === 0) return;
return renderBlob;
} finally { } finally {
try { try {
await libav.unlink(outputName); await libav.unlink(outputName);

View file

@ -23,7 +23,10 @@ export const createRemuxPipeline = (file: File) => {
workerId: crypto.randomUUID(), workerId: crypto.randomUUID(),
parentId, parentId,
workerArgs: { workerArgs: {
files: [file], files: [{
file,
type: file.type,
}],
ffargs: [ ffargs: [
"-c", "copy", "-c", "copy",
"-map", "0" "-map", "0"

View file

@ -8,6 +8,7 @@ import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue";
import type { FileInfo } from "$lib/types/libav"; import type { FileInfo } from "$lib/types/libav";
import type { CobaltQueue } from "$lib/types/queue"; import type { CobaltQueue } from "$lib/types/queue";
import type { CobaltPipelineItem } from "$lib/types/workers"; import type { CobaltPipelineItem } from "$lib/types/workers";
import type { CobaltFileReference } from "$lib/types/storage";
const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => { const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => {
unsubscribe(); unsubscribe();
@ -15,7 +16,14 @@ const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.T
if (interval) clearInterval(interval); if (interval) clearInterval(interval);
} }
export const runRemuxWorker = async (workerId: string, parentId: string, files: File[], args: string[], output: FileInfo, filename: string) => { export const runRemuxWorker = async (
workerId: string,
parentId: string,
files: CobaltFileReference[],
args: string[],
output: FileInfo,
filename: string
) => {
const worker = new RemuxWorker(); const worker = new RemuxWorker();
// sometimes chrome refuses to start libav wasm, // sometimes chrome refuses to start libav wasm,
@ -86,9 +94,7 @@ export const runRemuxWorker = async (workerId: string, parentId: string, files:
return pipelineTaskDone( return pipelineTaskDone(
parentId, parentId,
workerId, workerId,
new File([eventData.render], eventData.filename, { eventData.render,
type: eventData.render.type,
})
); );
} }
@ -127,12 +133,12 @@ export const runFetchWorker = async (workerId: string, parentId: string, url: st
}) })
} }
if (eventData.file) { if (eventData.result) {
killWorker(worker, unsubscribe); killWorker(worker, unsubscribe);
return pipelineTaskDone( return pipelineTaskDone(
parentId, parentId,
workerId, workerId,
eventData.file, eventData.result,
); );
} }
@ -144,7 +150,7 @@ export const runFetchWorker = async (workerId: string, parentId: string, url: st
} }
export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => { export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => {
let files: File[] = []; let files: CobaltFileReference[] = [];
switch (worker) { switch (worker) {
case "remux": case "remux":

View file

@ -1,8 +1,10 @@
import { readable, type Updater } from "svelte/store"; import { readable, type Updater } from "svelte/store";
import { checkTasks } from "$lib/queen-bee/scheduler"; import { checkTasks } from "$lib/queen-bee/scheduler";
import type { CobaltQueue, CobaltQueueItem } from "$lib/types/queue";
import { clearCurrentTasks, removeWorkerFromQueue } from "$lib/state/queen-bee/current-tasks"; import { clearCurrentTasks, removeWorkerFromQueue } from "$lib/state/queen-bee/current-tasks";
import type { CobaltFileReference } from "$lib/types/storage";
import type { CobaltQueue, CobaltQueueItem } from "$lib/types/queue";
let update: (_: Updater<CobaltQueue>) => void; let update: (_: Updater<CobaltQueue>) => void;
const queue = readable<CobaltQueue>( const queue = readable<CobaltQueue>(
@ -39,7 +41,7 @@ export function itemError(id: string, workerId: string, error: string) {
checkTasks(); checkTasks();
} }
export function itemDone(id: string, file: File) { export function itemDone(id: string, file: CobaltFileReference) {
update(queueData => { update(queueData => {
if (queueData[id]) { if (queueData[id]) {
if (queueData[id].state === "running" && queueData[id].pipelineResults) { if (queueData[id].state === "running" && queueData[id].pipelineResults) {
@ -58,7 +60,7 @@ export function itemDone(id: string, file: File) {
checkTasks(); checkTasks();
} }
export function pipelineTaskDone(id: string, workerId: string, file: File) { export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileReference) {
update(queueData => { update(queueData => {
if (queueData[id] && queueData[id].state === "running") { if (queueData[id] && queueData[id].state === "running") {
queueData[id].pipelineResults = [...queueData[id].pipelineResults || [], file]; queueData[id].pipelineResults = [...queueData[id].pipelineResults || [], file];

53
web/src/lib/storage.ts Normal file
View file

@ -0,0 +1,53 @@
export class OPFSStorage {
#root;
#handle;
#io;
constructor(root: FileSystemDirectoryHandle, handle: FileSystemFileHandle, reader: FileSystemSyncAccessHandle) {
this.#root = root;
this.#handle = handle;
this.#io = reader;
}
static async init() {
const root = await navigator.storage.getDirectory();
const cobaltDir = await root.getDirectoryHandle('cobalt-processing-data', { create: true });
const handle = await cobaltDir.getFileHandle(crypto.randomUUID(), { create: true });
const reader = await handle.createSyncAccessHandle();
return new this(cobaltDir, handle, reader);
}
async res() {
// await for compat with ios 15
await this.#io.flush();
await this.#io.close();
return await this.#handle.getFile();
}
read(size: number, offset: number) {
const out = new Uint8Array(size);
const bytesRead = this.#io.read(out, { at: offset });
return out.subarray(0, bytesRead);
}
async write(data: Uint8Array | Int8Array, offset: number) {
const writ = this.#io.write(data, { at: offset });
if (data.length !== writ) {
console.log(data.length, writ);
}
return writ;
}
async destroy() {
await this.#root.removeEntry(this.#handle.name);
}
static isAvailable() {
return !!navigator.storage?.getDirectory;
}
}

View file

@ -1,10 +1,12 @@
import type { CobaltFileReference } from "$lib/types/storage";
export type FileInfo = { export type FileInfo = {
type?: string, type?: string,
extension?: string, extension?: string,
} }
export type RenderParams = { export type RenderParams = {
files: File[], files: CobaltFileReference[],
output: FileInfo, output: FileInfo,
args: string[], args: string[],
} }

View file

@ -1,3 +1,4 @@
import type { CobaltFileReference } from "$lib/types/storage";
import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers"; import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers";
export type CobaltQueueItemState = "waiting" | "running" | "done" | "error"; export type CobaltQueueItemState = "waiting" | "running" | "done" | "error";
@ -19,12 +20,12 @@ export type CobaltQueueItemRunning = CobaltQueueBaseItem & {
state: "running", state: "running",
runningWorker: string, runningWorker: string,
completedWorkers?: string[], completedWorkers?: string[],
pipelineResults?: File[], pipelineResults?: CobaltFileReference[],
}; };
export type CobaltQueueItemDone = CobaltQueueBaseItem & { export type CobaltQueueItemDone = CobaltQueueBaseItem & {
state: "done", state: "done",
resultFile: File, resultFile: CobaltFileReference,
}; };
export type CobaltQueueItemError = CobaltQueueBaseItem & { export type CobaltQueueItemError = CobaltQueueBaseItem & {

View file

@ -0,0 +1,4 @@
export type CobaltFileReference = {
file: File,
type: string,
}

View file

@ -1,4 +1,5 @@
import type { FileInfo } from "$lib/types/libav"; import type { FileInfo } from "$lib/types/libav";
import type { CobaltFileReference } from "$lib/types/storage";
export const resultFileTypes = ["video", "audio", "image"] as const; export const resultFileTypes = ["video", "audio", "image"] as const;
@ -12,7 +13,7 @@ export type CobaltWorkerProgress = {
} }
export type CobaltWorkerArgs = { export type CobaltWorkerArgs = {
files?: File[], files?: CobaltFileReference[],
url?: string, url?: string,
ffargs?: string[], ffargs?: string[],
output?: FileInfo, output?: FileInfo,

View file

@ -1,3 +1,5 @@
import { OPFSStorage } from "$lib/storage";
const error = (code: string) => { const error = (code: string) => {
// TODO: return proper errors and code here // TODO: return proper errors and code here
self.postMessage({ self.postMessage({
@ -22,20 +24,21 @@ const fetchFile = async (url: string) => {
const totalBytes = contentLength ? parseInt(contentLength, 10) : null; const totalBytes = contentLength ? parseInt(contentLength, 10) : null;
const reader = response.body?.getReader(); const reader = response.body?.getReader();
const storage = await OPFSStorage.init();
if (!reader) { if (!reader) {
error("no reader"); error("no reader");
return self.close(); return self.close();
} }
let receivedBytes = 0; let receivedBytes = 0;
const chunks = [];
while (true) { while (true) {
const { done, value } = await reader.read(); const { done, value } = await reader.read();
if (done) break; if (done) break;
await storage.write(value, receivedBytes);
receivedBytes += value.length; receivedBytes += value.length;
chunks.push(value);
if (totalBytes) { if (totalBytes) {
self.postMessage({ self.postMessage({
@ -52,15 +55,22 @@ const fetchFile = async (url: string) => {
return self.close(); return self.close();
} }
const file = new File(chunks, "file", { type: contentType }); const file = await storage.res();
if (Number(contentLength) !== file.size) {
error("file is not downloaded fully");
}
self.postMessage({ self.postMessage({
cobaltFetchWorker: { cobaltFetchWorker: {
file result: {
file,
type: contentType,
}
} }
}); });
} catch (e) { } catch (e) {
console.log(e) console.log(e);
error("error when downloading the file"); error("error when downloading the file");
return self.close(); return self.close();
} }

View file

@ -1,5 +1,7 @@
import LibAVWrapper from "$lib/libav"; import LibAVWrapper from "$lib/libav";
import type { FileInfo } from "$lib/types/libav"; import type { FileInfo } from "$lib/types/libav";
import type { CobaltFileReference } from "$lib/types/storage";
const error = (code: string) => { const error = (code: string) => {
self.postMessage({ self.postMessage({
@ -25,14 +27,17 @@ const ff = new LibAVWrapper((progress) => {
ff.init(); ff.init();
const remux = async (files: File[], args: string[], output: FileInfo, filename: string) => { const remux = async (files: CobaltFileReference[], args: string[], output: FileInfo, filename: string) => {
if (!(files && output && args)) return; if (!(files && output && args)) return;
await ff.init(); await ff.init();
try { try {
// probing just the first file in files array (usually audio) for duration progress // probing just the first file in files array (usually audio) for duration progress
const file_info = await ff.probe(files[0]).catch((e) => { const probeFile = files[0]?.file;
if (!probeFile) return error("couldn't probe one of files");
const file_info = await ff.probe(probeFile).catch((e) => {
if (e?.message?.toLowerCase().includes("out of memory")) { if (e?.message?.toLowerCase().includes("out of memory")) {
console.error("uh oh! out of memory"); console.error("uh oh! out of memory");
console.error(e); console.error(e);
@ -87,6 +92,7 @@ const remux = async (files: File[], args: string[], output: FileInfo, filename:
}); });
} catch (e) { } catch (e) {
console.log(e); console.log(e);
return error("remux.crashed");
} }
} }