From c0b671e45fbc8e867c69ea0d72f92aa97f94d917 Mon Sep 17 00:00:00 2001 From: wukko Date: Wed, 12 Feb 2025 13:34:52 +0600 Subject: [PATCH] web/queen-bee: move runners to their own files --- web/src/lib/queen-bee/run-worker.ts | 144 +------------------------ web/src/lib/queen-bee/runners/fetch.ts | 51 +++++++++ web/src/lib/queen-bee/runners/remux.ts | 96 +++++++++++++++++ 3 files changed, 152 insertions(+), 139 deletions(-) create mode 100644 web/src/lib/queen-bee/runners/fetch.ts create mode 100644 web/src/lib/queen-bee/runners/remux.ts diff --git a/web/src/lib/queen-bee/run-worker.ts b/web/src/lib/queen-bee/run-worker.ts index 4ee5f0b4..6e876cb3 100644 --- a/web/src/lib/queen-bee/run-worker.ts +++ b/web/src/lib/queen-bee/run-worker.ts @@ -1,152 +1,18 @@ -import RemuxWorker from "$lib/workers/remux?worker"; -import FetchWorker from "$lib/workers/fetch?worker"; - import { get } from "svelte/store"; -import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; -import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue"; +import { queue } from "$lib/state/queen-bee/queue"; + +import { runRemuxWorker } from "$lib/queen-bee/runners/remux"; +import { runFetchWorker } from "$lib/queen-bee/runners/fetch"; -import type { FileInfo } from "$lib/types/libav"; -import type { CobaltQueue } from "$lib/types/queue"; import type { CobaltPipelineItem } from "$lib/types/workers"; import type { CobaltFileReference } from "$lib/types/storage"; -const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => { +export const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => { unsubscribe(); worker.terminate(); if (interval) clearInterval(interval); } -export const runRemuxWorker = async ( - workerId: string, - parentId: string, - files: CobaltFileReference[], - args: string[], - output: FileInfo -) => { - const worker = new RemuxWorker(); - - // sometimes chrome refuses to start libav wasm, - // so we check the health and kill self if it doesn't spawn - - let bumpAttempts = 0; - const startCheck = setInterval(() => { - bumpAttempts++; - - if (bumpAttempts === 8) { - killWorker(worker, unsubscribe, startCheck); - console.error("worker didn't start after 4 seconds, so it was killed"); - - // TODO: proper error code - return itemError(parentId, workerId, "worker didn't start"); - } - }, 500); - - const unsubscribe = queue.subscribe((queue: CobaltQueue) => { - if (!queue[parentId]) { - // TODO: remove logging - console.log("worker's parent is gone, so it killed itself"); - killWorker(worker, unsubscribe, startCheck); - } - }); - - worker.postMessage({ - cobaltRemuxWorker: { - files, - args, - output, - } - }); - - worker.onerror = (e) => { - console.error("remux worker exploded:", e); - killWorker(worker, unsubscribe, startCheck); - - // TODO: proper error code - return itemError(parentId, workerId, "internal error"); - }; - - let totalDuration: number | null = null; - - worker.onmessage = (event) => { - const eventData = event.data.cobaltRemuxWorker; - if (!eventData) return; - - clearInterval(startCheck); - - // temporary debug logging - console.log(JSON.stringify(eventData, null, 2)); - - if (eventData.progress) { - if (eventData.progress.duration) { - totalDuration = eventData.progress.duration; - } - - updateWorkerProgress(workerId, { - percentage: totalDuration ? (eventData.progress.durationProcessed / totalDuration) * 100 : 0, - size: eventData.progress.size, - }) - } - - if (eventData.render) { - killWorker(worker, unsubscribe, startCheck); - return pipelineTaskDone( - parentId, - workerId, - eventData.render, - ); - } - - if (eventData.error) { - killWorker(worker, unsubscribe, startCheck); - return itemError(parentId, workerId, eventData.error); - } - }; -} - -export const runFetchWorker = async (workerId: string, parentId: string, url: string) => { - const worker = new FetchWorker(); - - const unsubscribe = queue.subscribe((queue: CobaltQueue) => { - if (!queue[parentId]) { - // TODO: remove logging - console.log("worker's parent is gone, so it killed itself"); - killWorker(worker, unsubscribe); - } - }); - - worker.postMessage({ - cobaltFetchWorker: { - url - } - }); - - worker.onmessage = (event) => { - const eventData = event.data.cobaltFetchWorker; - if (!eventData) return; - - if (eventData.progress) { - updateWorkerProgress(workerId, { - percentage: eventData.progress, - size: eventData.size, - }) - } - - if (eventData.result) { - killWorker(worker, unsubscribe); - return pipelineTaskDone( - parentId, - workerId, - eventData.result, - ); - } - - if (eventData.error) { - killWorker(worker, unsubscribe); - return itemError(parentId, workerId, eventData.error); - } - } -} - export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => { let files: CobaltFileReference[] = []; diff --git a/web/src/lib/queen-bee/runners/fetch.ts b/web/src/lib/queen-bee/runners/fetch.ts new file mode 100644 index 00000000..8cba7b29 --- /dev/null +++ b/web/src/lib/queen-bee/runners/fetch.ts @@ -0,0 +1,51 @@ +import FetchWorker from "$lib/workers/fetch?worker"; + +import { killWorker } from "$lib/queen-bee/run-worker"; +import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; +import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue"; + +import type { CobaltQueue } from "$lib/types/queue"; + +export const runFetchWorker = async (workerId: string, parentId: string, url: string) => { + const worker = new FetchWorker(); + + const unsubscribe = queue.subscribe((queue: CobaltQueue) => { + if (!queue[parentId]) { + // TODO: remove logging + console.log("worker's parent is gone, so it killed itself"); + killWorker(worker, unsubscribe); + } + }); + + worker.postMessage({ + cobaltFetchWorker: { + url + } + }); + + worker.onmessage = (event) => { + const eventData = event.data.cobaltFetchWorker; + if (!eventData) return; + + if (eventData.progress) { + updateWorkerProgress(workerId, { + percentage: eventData.progress, + size: eventData.size, + }) + } + + if (eventData.result) { + killWorker(worker, unsubscribe); + return pipelineTaskDone( + parentId, + workerId, + eventData.result, + ); + } + + if (eventData.error) { + killWorker(worker, unsubscribe); + return itemError(parentId, workerId, eventData.error); + } + } +} diff --git a/web/src/lib/queen-bee/runners/remux.ts b/web/src/lib/queen-bee/runners/remux.ts new file mode 100644 index 00000000..55a6681d --- /dev/null +++ b/web/src/lib/queen-bee/runners/remux.ts @@ -0,0 +1,96 @@ +import RemuxWorker from "$lib/workers/remux?worker"; + +import { killWorker } from "$lib/queen-bee/run-worker"; +import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; +import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue"; + +import type { FileInfo } from "$lib/types/libav"; +import type { CobaltQueue } from "$lib/types/queue"; +import type { CobaltFileReference } from "$lib/types/storage"; + +export const runRemuxWorker = async ( + workerId: string, + parentId: string, + files: CobaltFileReference[], + args: string[], + output: FileInfo +) => { + const worker = new RemuxWorker(); + + // sometimes chrome refuses to start libav wasm, + // so we check the health and kill self if it doesn't spawn + + let bumpAttempts = 0; + const startCheck = setInterval(() => { + bumpAttempts++; + + if (bumpAttempts === 8) { + killWorker(worker, unsubscribe, startCheck); + console.error("worker didn't start after 4 seconds, so it was killed"); + + // TODO: proper error code + return itemError(parentId, workerId, "worker didn't start"); + } + }, 500); + + const unsubscribe = queue.subscribe((queue: CobaltQueue) => { + if (!queue[parentId]) { + // TODO: remove logging + console.log("worker's parent is gone, so it killed itself"); + killWorker(worker, unsubscribe, startCheck); + } + }); + + worker.postMessage({ + cobaltRemuxWorker: { + files, + args, + output, + } + }); + + worker.onerror = (e) => { + console.error("remux worker exploded:", e); + killWorker(worker, unsubscribe, startCheck); + + // TODO: proper error code + return itemError(parentId, workerId, "internal error"); + }; + + let totalDuration: number | null = null; + + worker.onmessage = (event) => { + const eventData = event.data.cobaltRemuxWorker; + if (!eventData) return; + + clearInterval(startCheck); + + // temporary debug logging + console.log(JSON.stringify(eventData, null, 2)); + + if (eventData.progress) { + if (eventData.progress.duration) { + totalDuration = eventData.progress.duration; + } + + updateWorkerProgress(workerId, { + percentage: totalDuration ? (eventData.progress.durationProcessed / totalDuration) * 100 : 0, + size: eventData.progress.size, + }) + } + + if (eventData.render) { + killWorker(worker, unsubscribe, startCheck); + return pipelineTaskDone( + parentId, + workerId, + eventData.render, + ); + } + + if (eventData.error) { + killWorker(worker, unsubscribe, startCheck); + return itemError(parentId, workerId, eventData.error); + } + }; +}