mirror of
https://github.com/wukko/cobalt.git
synced 2025-01-22 18:56:21 +01:00
api/cookie: implement cluster synchronization
This commit is contained in:
parent
42ec28a642
commit
d48cc8fc07
2 changed files with 78 additions and 2 deletions
|
@ -38,3 +38,34 @@ export const broadcast = (message) => {
|
||||||
worker.send(message);
|
worker.send(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const send = (message) => {
|
||||||
|
if (!isCluster) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cluster.isPrimary) {
|
||||||
|
return broadcast(message);
|
||||||
|
} else {
|
||||||
|
return process.send(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const waitFor = (key) => {
|
||||||
|
return new Promise(resolve => {
|
||||||
|
const listener = (message) => {
|
||||||
|
if (key in message) {
|
||||||
|
process.off('message', listener);
|
||||||
|
return resolve(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
process.on('message', listener);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export const mainOnMessage = (cb) => {
|
||||||
|
for (const worker of Object.values(cluster.workers)) {
|
||||||
|
worker.on('message', cb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,8 @@ import Cookie from './cookie.js';
|
||||||
import { readFile, writeFile } from 'fs/promises';
|
import { readFile, writeFile } from 'fs/promises';
|
||||||
import { Green, Yellow } from '../../misc/console-text.js';
|
import { Green, Yellow } from '../../misc/console-text.js';
|
||||||
import { parse as parseSetCookie, splitCookiesString } from 'set-cookie-parser';
|
import { parse as parseSetCookie, splitCookiesString } from 'set-cookie-parser';
|
||||||
|
import * as cluster from '../../misc/cluster.js';
|
||||||
|
import { isCluster } from '../../config.js';
|
||||||
|
|
||||||
const WRITE_INTERVAL = 60000;
|
const WRITE_INTERVAL = 60000;
|
||||||
let cookies = {}, dirty = false, intervalId;
|
let cookies = {}, dirty = false, intervalId;
|
||||||
|
@ -16,18 +18,55 @@ function writeChanges(cookiePath) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
export const setup = async (cookiePath) => {
|
const setupMain = async (cookiePath) => {
|
||||||
try {
|
try {
|
||||||
cookies = await readFile(cookiePath, 'utf8');
|
cookies = await readFile(cookiePath, 'utf8');
|
||||||
cookies = JSON.parse(cookies);
|
cookies = JSON.parse(cookies);
|
||||||
intervalId = setInterval(() => writeChanges(cookiePath), WRITE_INTERVAL);
|
intervalId = setInterval(() => writeChanges(cookiePath), WRITE_INTERVAL);
|
||||||
console.log(`${Green('[✓]')} cookies loaded successfully!`)
|
|
||||||
|
cluster.broadcast({ cookies });
|
||||||
|
|
||||||
|
console.log(`${Green('[✓]')} cookies loaded successfully!`);
|
||||||
} catch(e) {
|
} catch(e) {
|
||||||
console.error(`${Yellow('[!]')} failed to load cookies.`);
|
console.error(`${Yellow('[!]')} failed to load cookies.`);
|
||||||
console.error('error:', e);
|
console.error('error:', e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const setupWorker = async () => {
|
||||||
|
cookies = (await cluster.waitFor('cookies')).cookies;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const setup = async (cookiePath) => {
|
||||||
|
if (cluster.isPrimary) {
|
||||||
|
await setupMain(cookiePath);
|
||||||
|
} else if (cluster.isWorker) {
|
||||||
|
await setupWorker();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isCluster) {
|
||||||
|
const messageHandler = (message) => {
|
||||||
|
if ('cookieUpdate' in message) {
|
||||||
|
const { cookieUpdate } = message;
|
||||||
|
|
||||||
|
if (cluster.isPrimary) {
|
||||||
|
dirty = true;
|
||||||
|
cluster.broadcast({ cookieUpdate });
|
||||||
|
}
|
||||||
|
|
||||||
|
const { service, idx, cookie } = cookieUpdate;
|
||||||
|
cookies[service][idx] = cookie;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cluster.isPrimary) {
|
||||||
|
cluster.mainOnMessage(messageHandler);
|
||||||
|
} else {
|
||||||
|
process.on('message', messageHandler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export function getCookie(service) {
|
export function getCookie(service) {
|
||||||
if (!cookies[service] || !cookies[service].length) return;
|
if (!cookies[service] || !cookies[service].length) return;
|
||||||
|
|
||||||
|
@ -38,6 +77,7 @@ export function getCookie(service) {
|
||||||
cookies[service][idx] = Cookie.fromString(cookie);
|
cookies[service][idx] = Cookie.fromString(cookie);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cookies[service][idx].meta = { service, idx };
|
||||||
return cookies[service][idx];
|
return cookies[service][idx];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,6 +90,10 @@ export function updateCookieValues(cookie, values) {
|
||||||
|
|
||||||
if (changed) {
|
if (changed) {
|
||||||
dirty = true;
|
dirty = true;
|
||||||
|
if (isCluster && cookie.meta) {
|
||||||
|
const message = { cookieUpdate: { ...cookie.meta, cookie } };
|
||||||
|
cluster.send(message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return changed;
|
return changed;
|
||||||
|
@ -65,5 +109,6 @@ export function updateCookie(cookie, headers) {
|
||||||
|
|
||||||
cookie.unset(parsed.filter(c => c.expires < new Date()).map(c => c.name));
|
cookie.unset(parsed.filter(c => c.expires < new Date()).map(c => c.name));
|
||||||
parsed.filter(c => !c.expires || c.expires > new Date()).forEach(c => values[c.name] = c.value);
|
parsed.filter(c => !c.expires || c.expires > new Date()).forEach(c => values[c.name] = c.value);
|
||||||
|
|
||||||
updateCookieValues(cookie, values);
|
updateCookieValues(cookie, values);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue