From 5464574a3e37bb7d06152daa38b221c868cdb209 Mon Sep 17 00:00:00 2001 From: wukko Date: Sat, 1 Feb 2025 23:26:57 +0600 Subject: [PATCH] web/workers: use opfs instead of blobs for better memory management spent almost an entire day figuring this out but it's so worth it --- .../queue/ProcessingQueueItem.svelte | 4 +- web/src/lib/libav.ts | 67 ++++--------------- web/src/lib/queen-bee/queue.ts | 5 +- web/src/lib/queen-bee/run-worker.ts | 20 ++++-- web/src/lib/state/queen-bee/queue.ts | 8 ++- web/src/lib/storage.ts | 53 +++++++++++++++ web/src/lib/types/libav.ts | 4 +- web/src/lib/types/queue.ts | 5 +- web/src/lib/types/storage.ts | 4 ++ web/src/lib/types/workers.ts | 3 +- web/src/lib/workers/fetch.ts | 20 ++++-- web/src/lib/workers/remux.ts | 10 ++- 12 files changed, 124 insertions(+), 79 deletions(-) create mode 100644 web/src/lib/storage.ts create mode 100644 web/src/lib/types/storage.ts diff --git a/web/src/components/queue/ProcessingQueueItem.svelte b/web/src/components/queue/ProcessingQueueItem.svelte index f3da210d..ac6aac65 100644 --- a/web/src/components/queue/ProcessingQueueItem.svelte +++ b/web/src/components/queue/ProcessingQueueItem.svelte @@ -65,7 +65,7 @@
{#if info.state === "done"} - {formatFileSize(info.resultFile?.size)} + {formatFileSize(info.resultFile?.file?.size)} {/if} {#if info.state === "running"} @@ -95,7 +95,7 @@ {#if info.state === "done" && info.resultFile} diff --git a/web/src/lib/libav.ts b/web/src/lib/libav.ts index b6275f26..6beac0c5 100644 --- a/web/src/lib/libav.ts +++ b/web/src/lib/libav.ts @@ -1,3 +1,4 @@ +import { OPFSStorage } from "$lib/storage"; import LibAV, { type LibAV as LibAVInstance } from "@imput/libav.js-remux-cli"; import type { FfprobeData } from "fluent-ffmpeg"; @@ -69,27 +70,18 @@ export default class LibAVWrapper { try { for (let i = 0; i < files.length; i++) { - await libav.mkreadaheadfile(`input${i}`, files[i]); + const file = files[i].file; + + await libav.mkreadaheadfile(`input${i}`, file); ffInputs.push('-i', `input${i}`); } await libav.mkwriterdev(outputName); await libav.mkwriterdev('progress.txt'); - const totalInputSize = files.reduce((a, b) => a + b.size, 0); + const storage = await OPFSStorage.init(); - const MB = 1024 * 1024; - const chunks: Uint8Array[] = []; - const chunkSize = Math.min(512 * MB, totalInputSize); - - // since we expect the output file to be roughly the same size - // as inputs, preallocate its size for the output - for (let toAllocate = totalInputSize; toAllocate > 0; toAllocate -= chunkSize) { - chunks.push(new Uint8Array(chunkSize)); - } - - let actualSize = 0; - libav.onwrite = (name, pos, data) => { + libav.onwrite = async (name, pos, data) => { if (name === 'progress.txt') { try { return this.#emitProgress(data); @@ -98,26 +90,7 @@ export default class LibAVWrapper { } } else if (name !== outputName) return; - const writeEnd = pos + data.length; - if (writeEnd > chunkSize * chunks.length) { - chunks.push(new Uint8Array(chunkSize)); - } - - const chunkIndex = pos / chunkSize | 0; - const offset = pos - (chunkSize * chunkIndex); - - if (offset + data.length > chunkSize) { - chunks[chunkIndex].set( - data.subarray(0, chunkSize - offset), offset - ); - chunks[chunkIndex + 1].set( - data.subarray(chunkSize - offset), 0 - ); - } else { - chunks[chunkIndex].set(data, offset); - } - - actualSize = Math.max(writeEnd, actualSize); + await storage.write(data, pos); }; await libav.ffmpeg([ @@ -130,30 +103,14 @@ export default class LibAVWrapper { outputName ]); - // if we didn't need as much space as we allocated for some reason, - // shrink the buffers so that we don't inflate the file with zeroes - const outputView: Uint8Array[] = []; + const file = await storage.res(); - for (let i = 0; i < chunks.length; ++i) { - outputView.push( - chunks[i].subarray( - 0, Math.min(chunkSize, actualSize) - ) - ); + if (file.size === 0) return; - actualSize -= chunkSize; - if (actualSize <= 0) { - break; - } + return { + file, + type: output.type, } - - const renderBlob = new Blob( - outputView, - { type: output.type } - ); - - if (renderBlob.size === 0) return; - return renderBlob; } finally { try { await libav.unlink(outputName); diff --git a/web/src/lib/queen-bee/queue.ts b/web/src/lib/queen-bee/queue.ts index 10a9fef8..141d800c 100644 --- a/web/src/lib/queen-bee/queue.ts +++ b/web/src/lib/queen-bee/queue.ts @@ -23,7 +23,10 @@ export const createRemuxPipeline = (file: File) => { workerId: crypto.randomUUID(), parentId, workerArgs: { - files: [file], + files: [{ + file, + type: file.type, + }], ffargs: [ "-c", "copy", "-map", "0" diff --git a/web/src/lib/queen-bee/run-worker.ts b/web/src/lib/queen-bee/run-worker.ts index 4c05072f..47746806 100644 --- a/web/src/lib/queen-bee/run-worker.ts +++ b/web/src/lib/queen-bee/run-worker.ts @@ -8,6 +8,7 @@ import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue"; import type { FileInfo } from "$lib/types/libav"; import type { CobaltQueue } from "$lib/types/queue"; import type { CobaltPipelineItem } from "$lib/types/workers"; +import type { CobaltFileReference } from "$lib/types/storage"; const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => { unsubscribe(); @@ -15,7 +16,14 @@ const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.T if (interval) clearInterval(interval); } -export const runRemuxWorker = async (workerId: string, parentId: string, files: File[], args: string[], output: FileInfo, filename: string) => { +export const runRemuxWorker = async ( + workerId: string, + parentId: string, + files: CobaltFileReference[], + args: string[], + output: FileInfo, + filename: string +) => { const worker = new RemuxWorker(); // sometimes chrome refuses to start libav wasm, @@ -86,9 +94,7 @@ export const runRemuxWorker = async (workerId: string, parentId: string, files: return pipelineTaskDone( parentId, workerId, - new File([eventData.render], eventData.filename, { - type: eventData.render.type, - }) + eventData.render, ); } @@ -127,12 +133,12 @@ export const runFetchWorker = async (workerId: string, parentId: string, url: st }) } - if (eventData.file) { + if (eventData.result) { killWorker(worker, unsubscribe); return pipelineTaskDone( parentId, workerId, - eventData.file, + eventData.result, ); } @@ -144,7 +150,7 @@ export const runFetchWorker = async (workerId: string, parentId: string, url: st } export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => { - let files: File[] = []; + let files: CobaltFileReference[] = []; switch (worker) { case "remux": diff --git a/web/src/lib/state/queen-bee/queue.ts b/web/src/lib/state/queen-bee/queue.ts index 1ff45162..5151551b 100644 --- a/web/src/lib/state/queen-bee/queue.ts +++ b/web/src/lib/state/queen-bee/queue.ts @@ -1,8 +1,10 @@ import { readable, type Updater } from "svelte/store"; import { checkTasks } from "$lib/queen-bee/scheduler"; -import type { CobaltQueue, CobaltQueueItem } from "$lib/types/queue"; import { clearCurrentTasks, removeWorkerFromQueue } from "$lib/state/queen-bee/current-tasks"; +import type { CobaltFileReference } from "$lib/types/storage"; +import type { CobaltQueue, CobaltQueueItem } from "$lib/types/queue"; + let update: (_: Updater) => void; const queue = readable( @@ -39,7 +41,7 @@ export function itemError(id: string, workerId: string, error: string) { checkTasks(); } -export function itemDone(id: string, file: File) { +export function itemDone(id: string, file: CobaltFileReference) { update(queueData => { if (queueData[id]) { if (queueData[id].state === "running" && queueData[id].pipelineResults) { @@ -58,7 +60,7 @@ export function itemDone(id: string, file: File) { checkTasks(); } -export function pipelineTaskDone(id: string, workerId: string, file: File) { +export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileReference) { update(queueData => { if (queueData[id] && queueData[id].state === "running") { queueData[id].pipelineResults = [...queueData[id].pipelineResults || [], file]; diff --git a/web/src/lib/storage.ts b/web/src/lib/storage.ts new file mode 100644 index 00000000..68a5982c --- /dev/null +++ b/web/src/lib/storage.ts @@ -0,0 +1,53 @@ +export class OPFSStorage { + #root; + #handle; + #io; + + constructor(root: FileSystemDirectoryHandle, handle: FileSystemFileHandle, reader: FileSystemSyncAccessHandle) { + this.#root = root; + this.#handle = handle; + this.#io = reader; + } + + static async init() { + const root = await navigator.storage.getDirectory(); + const cobaltDir = await root.getDirectoryHandle('cobalt-processing-data', { create: true }); + const handle = await cobaltDir.getFileHandle(crypto.randomUUID(), { create: true }); + const reader = await handle.createSyncAccessHandle(); + + return new this(cobaltDir, handle, reader); + } + + async res() { + // await for compat with ios 15 + await this.#io.flush(); + await this.#io.close(); + return await this.#handle.getFile(); + } + + read(size: number, offset: number) { + const out = new Uint8Array(size); + const bytesRead = this.#io.read(out, { at: offset }); + + return out.subarray(0, bytesRead); + } + + async write(data: Uint8Array | Int8Array, offset: number) { + const writ = this.#io.write(data, { at: offset }); + + if (data.length !== writ) { + console.log(data.length, writ); + } + + return writ; + } + + async destroy() { + await this.#root.removeEntry(this.#handle.name); + } + + static isAvailable() { + return !!navigator.storage?.getDirectory; + } +} + diff --git a/web/src/lib/types/libav.ts b/web/src/lib/types/libav.ts index 92a07a7d..9871e4cb 100644 --- a/web/src/lib/types/libav.ts +++ b/web/src/lib/types/libav.ts @@ -1,10 +1,12 @@ +import type { CobaltFileReference } from "$lib/types/storage"; + export type FileInfo = { type?: string, extension?: string, } export type RenderParams = { - files: File[], + files: CobaltFileReference[], output: FileInfo, args: string[], } diff --git a/web/src/lib/types/queue.ts b/web/src/lib/types/queue.ts index f50093a4..c09e0d6b 100644 --- a/web/src/lib/types/queue.ts +++ b/web/src/lib/types/queue.ts @@ -1,3 +1,4 @@ +import type { CobaltFileReference } from "$lib/types/storage"; import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers"; export type CobaltQueueItemState = "waiting" | "running" | "done" | "error"; @@ -19,12 +20,12 @@ export type CobaltQueueItemRunning = CobaltQueueBaseItem & { state: "running", runningWorker: string, completedWorkers?: string[], - pipelineResults?: File[], + pipelineResults?: CobaltFileReference[], }; export type CobaltQueueItemDone = CobaltQueueBaseItem & { state: "done", - resultFile: File, + resultFile: CobaltFileReference, }; export type CobaltQueueItemError = CobaltQueueBaseItem & { diff --git a/web/src/lib/types/storage.ts b/web/src/lib/types/storage.ts new file mode 100644 index 00000000..a9bb13a7 --- /dev/null +++ b/web/src/lib/types/storage.ts @@ -0,0 +1,4 @@ +export type CobaltFileReference = { + file: File, + type: string, +} diff --git a/web/src/lib/types/workers.ts b/web/src/lib/types/workers.ts index 6a6e2c98..e42d8ba7 100644 --- a/web/src/lib/types/workers.ts +++ b/web/src/lib/types/workers.ts @@ -1,4 +1,5 @@ import type { FileInfo } from "$lib/types/libav"; +import type { CobaltFileReference } from "$lib/types/storage"; export const resultFileTypes = ["video", "audio", "image"] as const; @@ -12,7 +13,7 @@ export type CobaltWorkerProgress = { } export type CobaltWorkerArgs = { - files?: File[], + files?: CobaltFileReference[], url?: string, ffargs?: string[], output?: FileInfo, diff --git a/web/src/lib/workers/fetch.ts b/web/src/lib/workers/fetch.ts index 0965c51d..aa30a4c4 100644 --- a/web/src/lib/workers/fetch.ts +++ b/web/src/lib/workers/fetch.ts @@ -1,3 +1,5 @@ +import { OPFSStorage } from "$lib/storage"; + const error = (code: string) => { // TODO: return proper errors and code here self.postMessage({ @@ -22,20 +24,21 @@ const fetchFile = async (url: string) => { const totalBytes = contentLength ? parseInt(contentLength, 10) : null; const reader = response.body?.getReader(); + const storage = await OPFSStorage.init(); + if (!reader) { error("no reader"); return self.close(); } let receivedBytes = 0; - const chunks = []; while (true) { const { done, value } = await reader.read(); if (done) break; + await storage.write(value, receivedBytes); receivedBytes += value.length; - chunks.push(value); if (totalBytes) { self.postMessage({ @@ -52,15 +55,22 @@ const fetchFile = async (url: string) => { return self.close(); } - const file = new File(chunks, "file", { type: contentType }); + const file = await storage.res(); + + if (Number(contentLength) !== file.size) { + error("file is not downloaded fully"); + } self.postMessage({ cobaltFetchWorker: { - file + result: { + file, + type: contentType, + } } }); } catch (e) { - console.log(e) + console.log(e); error("error when downloading the file"); return self.close(); } diff --git a/web/src/lib/workers/remux.ts b/web/src/lib/workers/remux.ts index 61f06bfc..1f3f6aff 100644 --- a/web/src/lib/workers/remux.ts +++ b/web/src/lib/workers/remux.ts @@ -1,5 +1,7 @@ import LibAVWrapper from "$lib/libav"; + import type { FileInfo } from "$lib/types/libav"; +import type { CobaltFileReference } from "$lib/types/storage"; const error = (code: string) => { self.postMessage({ @@ -25,14 +27,17 @@ const ff = new LibAVWrapper((progress) => { ff.init(); -const remux = async (files: File[], args: string[], output: FileInfo, filename: string) => { +const remux = async (files: CobaltFileReference[], args: string[], output: FileInfo, filename: string) => { if (!(files && output && args)) return; await ff.init(); try { // probing just the first file in files array (usually audio) for duration progress - const file_info = await ff.probe(files[0]).catch((e) => { + const probeFile = files[0]?.file; + if (!probeFile) return error("couldn't probe one of files"); + + const file_info = await ff.probe(probeFile).catch((e) => { if (e?.message?.toLowerCase().includes("out of memory")) { console.error("uh oh! out of memory"); console.error(e); @@ -87,6 +92,7 @@ const remux = async (files: File[], args: string[], output: FileInfo, filename: }); } catch (e) { console.log(e); + return error("remux.crashed"); } }