From 1590490db2a02da411dbf0f3976b0591b0cf9533 Mon Sep 17 00:00:00 2001 From: wukko Date: Fri, 31 Jan 2025 11:22:31 +0600 Subject: [PATCH] web/queue: add a remux worker to saving pipeline, use pipelineResults --- web/src/lib/queen-bee/queue.ts | 25 ++++++++++++++++++++++++- web/src/lib/queen-bee/run-worker.ts | 8 ++++++++ web/src/lib/workers/remux.ts | 1 + 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/web/src/lib/queen-bee/queue.ts b/web/src/lib/queen-bee/queue.ts index 37d2ac11..10a9fef8 100644 --- a/web/src/lib/queen-bee/queue.ts +++ b/web/src/lib/queen-bee/queue.ts @@ -1,3 +1,5 @@ +import mime from "mime"; + import { addItem } from "$lib/state/queen-bee/queue"; import type { CobaltPipelineItem } from "$lib/types/workers"; import type { CobaltLocalProcessingResponse } from "$lib/types/api"; @@ -49,7 +51,10 @@ export const createSavePipeline = (info: CobaltLocalProcessingResponse) => { const parentId = crypto.randomUUID(); const pipeline: CobaltPipelineItem[] = []; - for (const tunnel of info.tunnel) { + // reverse is needed for audio (second item) to be downloaded first + const tunnels = info.tunnel.reverse(); + + for (const tunnel of tunnels) { pipeline.push({ worker: "fetch", workerId: crypto.randomUUID(), @@ -60,6 +65,24 @@ export const createSavePipeline = (info: CobaltLocalProcessingResponse) => { }) } + pipeline.push({ + worker: "remux", + workerId: crypto.randomUUID(), + parentId, + workerArgs: { + ffargs: [ + "-c:v", "copy", + "-c:a", "copy" + ], + output: { + // TODO: return mime type from api to avoid dragging a big ass package into web build + type: mime.getType(info.filename) || undefined, + extension: info.filename.split(".").pop(), + }, + filename: info.filename, + }, + }) + addItem({ id: parentId, state: "waiting", diff --git a/web/src/lib/queen-bee/run-worker.ts b/web/src/lib/queen-bee/run-worker.ts index 4244ef14..4c05072f 100644 --- a/web/src/lib/queen-bee/run-worker.ts +++ b/web/src/lib/queen-bee/run-worker.ts @@ -1,6 +1,7 @@ import RemuxWorker from "$lib/workers/remux?worker"; import FetchWorker from "$lib/workers/fetch?worker"; +import { get } from "svelte/store"; import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue"; @@ -151,6 +152,13 @@ export const startWorker = async ({ worker, workerId, parentId, workerArgs }: Co files = workerArgs.files; } + if (files?.length === 0) { + const parent = get(queue)[parentId]; + if (parent.state === "running" && parent.pipelineResults) { + files = parent.pipelineResults; + } + } + if (files.length > 0 && workerArgs.ffargs && workerArgs.output && workerArgs.filename) { await runRemuxWorker( workerId, diff --git a/web/src/lib/workers/remux.ts b/web/src/lib/workers/remux.ts index 3c7b371c..61f06bfc 100644 --- a/web/src/lib/workers/remux.ts +++ b/web/src/lib/workers/remux.ts @@ -31,6 +31,7 @@ const remux = async (files: File[], args: string[], output: FileInfo, filename: await ff.init(); try { + // probing just the first file in files array (usually audio) for duration progress const file_info = await ff.probe(files[0]).catch((e) => { if (e?.message?.toLowerCase().includes("out of memory")) { console.error("uh oh! out of memory");