api: implement redis/memory store for cache

This commit is contained in:
jj 2024-10-30 19:06:46 +00:00
parent 5e92b649a3
commit 18acad19b9
No known key found for this signature in database
6 changed files with 199 additions and 0 deletions

View file

@ -24,6 +24,7 @@
},
"homepage": "https://github.com/imputnet/cobalt#readme",
"dependencies": {
"@datastructures-js/priority-queue": "^6.3.1",
"@imput/version-info": "workspace:^",
"content-disposition-header": "0.6.0",
"cors": "^2.8.5",

View file

@ -0,0 +1,39 @@
const _stores = new Set();
export class Store {
id;
constructor(name) {
name = name.toUpperCase();
if (_stores.has(name))
throw `${name} store already exists`;
_stores.add(name);
this.id = name;
}
async _get(_key) { throw "needs implementation" }
async get(key) {
if (typeof key !== 'string') {
key = key.toString();
}
const val = await this._get(key);
if (val === null)
return null;
return val;
}
async _set(_key, _val, _exp_sec = -1) { throw "needs implementation" }
set(key, val, exp_sec = -1) {
if (typeof key !== 'string') {
key = key.toString();
}
exp_sec = Math.round(exp_sec);
return this._set(key, val, exp_sec);
}
};

View file

@ -0,0 +1,72 @@
import { MinPriorityQueue } from '@datastructures-js/priority-queue'
import { Store } from './base-store.js';
// minimum delay between sweeps to avoid repeatedly
// sweeping entries close in proximity one by one.
const MIN_THRESHOLD_MS = 2500;
export default class MemoryStore extends Store {
#store = new Map();
#timeouts = new MinPriorityQueue/*<{ t: number, k: unknown }>*/((obj) => obj.t);
#nextSweep = { id: null, t: null };
constructor(name) {
super(name);
}
async _get(key) {
const val = this.#store.get(key);
return val === undefined ? null : val;
}
async _set(key, val, exp_sec = -1) {
if (this.#store.has(key)) {
this.#timeouts.remove(o => o.k === key);
}
if (exp_sec > 0) {
const exp = 1000 * exp_sec;
const timeout_at = +new Date() + exp;
this.#timeouts.enqueue({ k: key, t: timeout_at });
}
this.#store.set(key, val);
this.#reschedule();
}
#reschedule() {
const current_time = new Date().getTime();
const time = this.#timeouts.front()?.t;
if (!time) {
return;
} else if (time < current_time) {
return this.#sweepNow();
}
const sweep = this.#nextSweep;
if (sweep.id === null || sweep.t > time) {
if (sweep.id) {
clearTimeout(sweep.id);
}
sweep.t = time;
sweep.id = setTimeout(
() => this.#sweepNow(),
Math.max(MIN_THRESHOLD_MS, time - current_time)
);
}
}
#sweepNow() {
while (this.#timeouts.front()?.t < new Date().getTime()) {
const item = this.#timeouts.dequeue();
this.#store.delete(item.k);
}
this.#nextSweep.id = null;
this.#nextSweep.t = null;
this.#reschedule();
}
}

View file

@ -0,0 +1,62 @@
import { commandOptions, createClient } from "redis";
import { env } from "../config.js";
import { Store } from "./base-store.js";
export default class RedisStore extends Store {
#client = createClient({
url: env.redisURL,
});
#connected;
constructor(name) {
super(name);
this.#connected = this.#client.connect();
}
#keyOf(key) {
return this.id + '_' + key;
}
async _get(key) {
await this.#connected;
const data = await this.#client.hGetAll(
commandOptions({ returnBuffers: true }),
this.#keyOf(key)
);
if (!data.d) {
return null;
}
const type = data.t;
if (type && type[0] === 'b'.charCodeAt())
return data.d;
else
return JSON.parse(data.d);
}
async _set(key, val, exp_sec = -1) {
await this.#connected;
const out = { d: val };
if (val instanceof Buffer) {
out.t = 'b';
} else {
out.d = JSON.stringify(val);
}
await this.#client.hSet(
this.#keyOf(key),
out
);
if (exp_sec > 0) {
await this.#client.hExpire(
this.#keyOf(key),
Object.keys(out),
exp_sec
);
}
}
}

10
api/src/store/store.js Normal file
View file

@ -0,0 +1,10 @@
import { env } from '../config.js';
let _export;
if (env.redisURL) {
_export = await import('./redis-store.js');
} else {
_export = await import('./memory-store.js');
}
export default _export.default;

View file

@ -10,6 +10,9 @@ importers:
api:
dependencies:
'@datastructures-js/priority-queue':
specifier: ^6.3.1
version: 6.3.1
'@imput/version-info':
specifier: workspace:^
version: link:../packages/version-info
@ -188,6 +191,12 @@ packages:
'@bufbuild/protobuf@2.1.0':
resolution: {integrity: sha512-+2Mx67Y3skJ4NCD/qNSdBJNWtu6x6Qr53jeNg+QcwiL6mt0wK+3jwHH2x1p7xaYH6Ve2JKOVn0OxU35WsmqI9A==}
'@datastructures-js/heap@4.3.3':
resolution: {integrity: sha512-UcUu/DLh/aM4W3C8zZfwxxm6/6FIZUlm3mcAXuNOCa6Aj4iizNvNXQyb8DjZQH2jKSQbMRyNlngP6TPimuGjpQ==}
'@datastructures-js/priority-queue@6.3.1':
resolution: {integrity: sha512-eoxkWql/j0VJ0UFMFTpnyJz4KbEEVQ6aZ/JuJUgenu0Im4tYKylAycNGsYCHGXiVNEd7OKGVwfx1Ac3oYkuu7A==}
'@derhuerst/http-basic@8.2.4':
resolution: {integrity: sha512-F9rL9k9Xjf5blCz8HsJRO4diy111cayL2vkY2XE4r4t3n0yPXVYy3KD3nJ1qbrSn9743UWSXH4IwuCa/HWlGFw==}
engines: {node: '>=6.0.0'}
@ -2355,6 +2364,12 @@ snapshots:
'@bufbuild/protobuf@2.1.0': {}
'@datastructures-js/heap@4.3.3': {}
'@datastructures-js/priority-queue@6.3.1':
dependencies:
'@datastructures-js/heap': 4.3.3
'@derhuerst/http-basic@8.2.4':
dependencies:
caseless: 0.12.0