web/queen-bee: move runners to their own files

This commit is contained in:
wukko 2025-02-12 13:34:52 +06:00
parent 564fc65297
commit c0b671e45f
No known key found for this signature in database
GPG key ID: 3E30B3F26C7B4AA2
3 changed files with 152 additions and 139 deletions

View file

@ -1,152 +1,18 @@
import RemuxWorker from "$lib/workers/remux?worker";
import FetchWorker from "$lib/workers/fetch?worker";
import { get } from "svelte/store"; import { get } from "svelte/store";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; import { queue } from "$lib/state/queen-bee/queue";
import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue";
import { runRemuxWorker } from "$lib/queen-bee/runners/remux";
import { runFetchWorker } from "$lib/queen-bee/runners/fetch";
import type { FileInfo } from "$lib/types/libav";
import type { CobaltQueue } from "$lib/types/queue";
import type { CobaltPipelineItem } from "$lib/types/workers"; import type { CobaltPipelineItem } from "$lib/types/workers";
import type { CobaltFileReference } from "$lib/types/storage"; import type { CobaltFileReference } from "$lib/types/storage";
const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => { export const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => {
unsubscribe(); unsubscribe();
worker.terminate(); worker.terminate();
if (interval) clearInterval(interval); if (interval) clearInterval(interval);
} }
export const runRemuxWorker = async (
workerId: string,
parentId: string,
files: CobaltFileReference[],
args: string[],
output: FileInfo
) => {
const worker = new RemuxWorker();
// sometimes chrome refuses to start libav wasm,
// so we check the health and kill self if it doesn't spawn
let bumpAttempts = 0;
const startCheck = setInterval(() => {
bumpAttempts++;
if (bumpAttempts === 8) {
killWorker(worker, unsubscribe, startCheck);
console.error("worker didn't start after 4 seconds, so it was killed");
// TODO: proper error code
return itemError(parentId, workerId, "worker didn't start");
}
}, 500);
const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe, startCheck);
}
});
worker.postMessage({
cobaltRemuxWorker: {
files,
args,
output,
}
});
worker.onerror = (e) => {
console.error("remux worker exploded:", e);
killWorker(worker, unsubscribe, startCheck);
// TODO: proper error code
return itemError(parentId, workerId, "internal error");
};
let totalDuration: number | null = null;
worker.onmessage = (event) => {
const eventData = event.data.cobaltRemuxWorker;
if (!eventData) return;
clearInterval(startCheck);
// temporary debug logging
console.log(JSON.stringify(eventData, null, 2));
if (eventData.progress) {
if (eventData.progress.duration) {
totalDuration = eventData.progress.duration;
}
updateWorkerProgress(workerId, {
percentage: totalDuration ? (eventData.progress.durationProcessed / totalDuration) * 100 : 0,
size: eventData.progress.size,
})
}
if (eventData.render) {
killWorker(worker, unsubscribe, startCheck);
return pipelineTaskDone(
parentId,
workerId,
eventData.render,
);
}
if (eventData.error) {
killWorker(worker, unsubscribe, startCheck);
return itemError(parentId, workerId, eventData.error);
}
};
}
export const runFetchWorker = async (workerId: string, parentId: string, url: string) => {
const worker = new FetchWorker();
const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe);
}
});
worker.postMessage({
cobaltFetchWorker: {
url
}
});
worker.onmessage = (event) => {
const eventData = event.data.cobaltFetchWorker;
if (!eventData) return;
if (eventData.progress) {
updateWorkerProgress(workerId, {
percentage: eventData.progress,
size: eventData.size,
})
}
if (eventData.result) {
killWorker(worker, unsubscribe);
return pipelineTaskDone(
parentId,
workerId,
eventData.result,
);
}
if (eventData.error) {
killWorker(worker, unsubscribe);
return itemError(parentId, workerId, eventData.error);
}
}
}
export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => { export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => {
let files: CobaltFileReference[] = []; let files: CobaltFileReference[] = [];

View file

@ -0,0 +1,51 @@
import FetchWorker from "$lib/workers/fetch?worker";
import { killWorker } from "$lib/queen-bee/run-worker";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";
import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue";
import type { CobaltQueue } from "$lib/types/queue";
export const runFetchWorker = async (workerId: string, parentId: string, url: string) => {
const worker = new FetchWorker();
const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe);
}
});
worker.postMessage({
cobaltFetchWorker: {
url
}
});
worker.onmessage = (event) => {
const eventData = event.data.cobaltFetchWorker;
if (!eventData) return;
if (eventData.progress) {
updateWorkerProgress(workerId, {
percentage: eventData.progress,
size: eventData.size,
})
}
if (eventData.result) {
killWorker(worker, unsubscribe);
return pipelineTaskDone(
parentId,
workerId,
eventData.result,
);
}
if (eventData.error) {
killWorker(worker, unsubscribe);
return itemError(parentId, workerId, eventData.error);
}
}
}

View file

@ -0,0 +1,96 @@
import RemuxWorker from "$lib/workers/remux?worker";
import { killWorker } from "$lib/queen-bee/run-worker";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";
import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue";
import type { FileInfo } from "$lib/types/libav";
import type { CobaltQueue } from "$lib/types/queue";
import type { CobaltFileReference } from "$lib/types/storage";
export const runRemuxWorker = async (
workerId: string,
parentId: string,
files: CobaltFileReference[],
args: string[],
output: FileInfo
) => {
const worker = new RemuxWorker();
// sometimes chrome refuses to start libav wasm,
// so we check the health and kill self if it doesn't spawn
let bumpAttempts = 0;
const startCheck = setInterval(() => {
bumpAttempts++;
if (bumpAttempts === 8) {
killWorker(worker, unsubscribe, startCheck);
console.error("worker didn't start after 4 seconds, so it was killed");
// TODO: proper error code
return itemError(parentId, workerId, "worker didn't start");
}
}, 500);
const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe, startCheck);
}
});
worker.postMessage({
cobaltRemuxWorker: {
files,
args,
output,
}
});
worker.onerror = (e) => {
console.error("remux worker exploded:", e);
killWorker(worker, unsubscribe, startCheck);
// TODO: proper error code
return itemError(parentId, workerId, "internal error");
};
let totalDuration: number | null = null;
worker.onmessage = (event) => {
const eventData = event.data.cobaltRemuxWorker;
if (!eventData) return;
clearInterval(startCheck);
// temporary debug logging
console.log(JSON.stringify(eventData, null, 2));
if (eventData.progress) {
if (eventData.progress.duration) {
totalDuration = eventData.progress.duration;
}
updateWorkerProgress(workerId, {
percentage: totalDuration ? (eventData.progress.durationProcessed / totalDuration) * 100 : 0,
size: eventData.progress.size,
})
}
if (eventData.render) {
killWorker(worker, unsubscribe, startCheck);
return pipelineTaskDone(
parentId,
workerId,
eventData.render,
);
}
if (eventData.error) {
killWorker(worker, unsubscribe, startCheck);
return itemError(parentId, workerId, eventData.error);
}
};
}