From 18acad19b931418c343a14cf88656684827b0e02 Mon Sep 17 00:00:00 2001 From: jj Date: Wed, 30 Oct 2024 19:06:46 +0000 Subject: [PATCH] api: implement redis/memory store for cache --- api/package.json | 1 + api/src/store/base-store.js | 39 +++++++++++++++++++ api/src/store/memory-store.js | 72 +++++++++++++++++++++++++++++++++++ api/src/store/redis-store.js | 62 ++++++++++++++++++++++++++++++ api/src/store/store.js | 10 +++++ pnpm-lock.yaml | 15 ++++++++ 6 files changed, 199 insertions(+) create mode 100644 api/src/store/base-store.js create mode 100644 api/src/store/memory-store.js create mode 100644 api/src/store/redis-store.js create mode 100644 api/src/store/store.js diff --git a/api/package.json b/api/package.json index ef70f2e4..6867c37d 100644 --- a/api/package.json +++ b/api/package.json @@ -24,6 +24,7 @@ }, "homepage": "https://github.com/imputnet/cobalt#readme", "dependencies": { + "@datastructures-js/priority-queue": "^6.3.1", "@imput/version-info": "workspace:^", "content-disposition-header": "0.6.0", "cors": "^2.8.5", diff --git a/api/src/store/base-store.js b/api/src/store/base-store.js new file mode 100644 index 00000000..3f81d8d9 --- /dev/null +++ b/api/src/store/base-store.js @@ -0,0 +1,39 @@ +const _stores = new Set(); + +export class Store { + id; + + constructor(name) { + name = name.toUpperCase(); + + if (_stores.has(name)) + throw `${name} store already exists`; + _stores.add(name); + + this.id = name; + } + + async _get(_key) { throw "needs implementation" } + async get(key) { + if (typeof key !== 'string') { + key = key.toString(); + } + + const val = await this._get(key); + if (val === null) + return null; + + return val; + } + + async _set(_key, _val, _exp_sec = -1) { throw "needs implementation" } + set(key, val, exp_sec = -1) { + if (typeof key !== 'string') { + key = key.toString(); + } + + exp_sec = Math.round(exp_sec); + + return this._set(key, val, exp_sec); + } +}; diff --git a/api/src/store/memory-store.js b/api/src/store/memory-store.js new file mode 100644 index 00000000..07e94ec2 --- /dev/null +++ b/api/src/store/memory-store.js @@ -0,0 +1,72 @@ +import { MinPriorityQueue } from '@datastructures-js/priority-queue' +import { Store } from './base-store.js'; + +// minimum delay between sweeps to avoid repeatedly +// sweeping entries close in proximity one by one. +const MIN_THRESHOLD_MS = 2500; + +export default class MemoryStore extends Store { + #store = new Map(); + #timeouts = new MinPriorityQueue/*<{ t: number, k: unknown }>*/((obj) => obj.t); + #nextSweep = { id: null, t: null }; + + constructor(name) { + super(name); + } + + async _get(key) { + const val = this.#store.get(key); + + return val === undefined ? null : val; + } + + async _set(key, val, exp_sec = -1) { + if (this.#store.has(key)) { + this.#timeouts.remove(o => o.k === key); + } + + if (exp_sec > 0) { + const exp = 1000 * exp_sec; + const timeout_at = +new Date() + exp; + + this.#timeouts.enqueue({ k: key, t: timeout_at }); + } + + this.#store.set(key, val); + this.#reschedule(); + } + + #reschedule() { + const current_time = new Date().getTime(); + const time = this.#timeouts.front()?.t; + if (!time) { + return; + } else if (time < current_time) { + return this.#sweepNow(); + } + + const sweep = this.#nextSweep; + if (sweep.id === null || sweep.t > time) { + if (sweep.id) { + clearTimeout(sweep.id); + } + + sweep.t = time; + sweep.id = setTimeout( + () => this.#sweepNow(), + Math.max(MIN_THRESHOLD_MS, time - current_time) + ); + } + } + + #sweepNow() { + while (this.#timeouts.front()?.t < new Date().getTime()) { + const item = this.#timeouts.dequeue(); + this.#store.delete(item.k); + } + + this.#nextSweep.id = null; + this.#nextSweep.t = null; + this.#reschedule(); + } +} diff --git a/api/src/store/redis-store.js b/api/src/store/redis-store.js new file mode 100644 index 00000000..cce158e0 --- /dev/null +++ b/api/src/store/redis-store.js @@ -0,0 +1,62 @@ +import { commandOptions, createClient } from "redis"; +import { env } from "../config.js"; +import { Store } from "./base-store.js"; + +export default class RedisStore extends Store { + #client = createClient({ + url: env.redisURL, + }); + #connected; + + constructor(name) { + super(name); + this.#connected = this.#client.connect(); + } + + #keyOf(key) { + return this.id + '_' + key; + } + + async _get(key) { + await this.#connected; + + const data = await this.#client.hGetAll( + commandOptions({ returnBuffers: true }), + this.#keyOf(key) + ); + + if (!data.d) { + return null; + } + + const type = data.t; + if (type && type[0] === 'b'.charCodeAt()) + return data.d; + else + return JSON.parse(data.d); + } + + async _set(key, val, exp_sec = -1) { + await this.#connected; + + const out = { d: val }; + if (val instanceof Buffer) { + out.t = 'b'; + } else { + out.d = JSON.stringify(val); + } + + await this.#client.hSet( + this.#keyOf(key), + out + ); + + if (exp_sec > 0) { + await this.#client.hExpire( + this.#keyOf(key), + Object.keys(out), + exp_sec + ); + } + } +} diff --git a/api/src/store/store.js b/api/src/store/store.js new file mode 100644 index 00000000..e268d88d --- /dev/null +++ b/api/src/store/store.js @@ -0,0 +1,10 @@ +import { env } from '../config.js'; + +let _export; +if (env.redisURL) { + _export = await import('./redis-store.js'); +} else { + _export = await import('./memory-store.js'); +} + +export default _export.default; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d96b6642..3f095749 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -10,6 +10,9 @@ importers: api: dependencies: + '@datastructures-js/priority-queue': + specifier: ^6.3.1 + version: 6.3.1 '@imput/version-info': specifier: workspace:^ version: link:../packages/version-info @@ -188,6 +191,12 @@ packages: '@bufbuild/protobuf@2.1.0': resolution: {integrity: sha512-+2Mx67Y3skJ4NCD/qNSdBJNWtu6x6Qr53jeNg+QcwiL6mt0wK+3jwHH2x1p7xaYH6Ve2JKOVn0OxU35WsmqI9A==} + '@datastructures-js/heap@4.3.3': + resolution: {integrity: sha512-UcUu/DLh/aM4W3C8zZfwxxm6/6FIZUlm3mcAXuNOCa6Aj4iizNvNXQyb8DjZQH2jKSQbMRyNlngP6TPimuGjpQ==} + + '@datastructures-js/priority-queue@6.3.1': + resolution: {integrity: sha512-eoxkWql/j0VJ0UFMFTpnyJz4KbEEVQ6aZ/JuJUgenu0Im4tYKylAycNGsYCHGXiVNEd7OKGVwfx1Ac3oYkuu7A==} + '@derhuerst/http-basic@8.2.4': resolution: {integrity: sha512-F9rL9k9Xjf5blCz8HsJRO4diy111cayL2vkY2XE4r4t3n0yPXVYy3KD3nJ1qbrSn9743UWSXH4IwuCa/HWlGFw==} engines: {node: '>=6.0.0'} @@ -2355,6 +2364,12 @@ snapshots: '@bufbuild/protobuf@2.1.0': {} + '@datastructures-js/heap@4.3.3': {} + + '@datastructures-js/priority-queue@6.3.1': + dependencies: + '@datastructures-js/heap': 4.3.3 + '@derhuerst/http-basic@8.2.4': dependencies: caseless: 0.12.0