From 6fe149b6dc9b5ed1670f3cd58a5bb52464ec4063 Mon Sep 17 00:00:00 2001 From: Kamil Kisiela Date: Mon, 16 Dec 2024 17:19:15 +0100 Subject: [PATCH 1/9] Multi-Tier caching in the tokens service --- packages/services/storage/src/tokens.ts | 16 +- packages/services/tokens/package.json | 1 + packages/services/tokens/src/api.ts | 20 +- packages/services/tokens/src/cache.ts | 290 ------------------ packages/services/tokens/src/helpers.ts | 17 - packages/services/tokens/src/index.ts | 18 +- .../services/tokens/src/multi-tier-storage.ts | 271 ++++++++++++++++ pnpm-lock.yaml | 47 +-- 8 files changed, 325 insertions(+), 355 deletions(-) delete mode 100644 packages/services/tokens/src/cache.ts create mode 100644 packages/services/tokens/src/multi-tier-storage.ts diff --git a/packages/services/storage/src/tokens.ts b/packages/services/storage/src/tokens.ts index cee3f294d7..dfb521db6e 100644 --- a/packages/services/storage/src/tokens.ts +++ b/packages/services/storage/src/tokens.ts @@ -75,12 +75,16 @@ export async function createTokenStorage( `, ); }, - async deleteToken({ token }: { token: string }) { - await pool.query( - sql` - UPDATE tokens SET deleted_at = NOW() WHERE token = ${token} - `, - ); + async deleteToken(params: { token: string; postDeletionTransaction: () => Promise }) { + await pool.transaction(async t => { + await t.query(sql` + UPDATE tokens + SET deleted_at = NOW() + WHERE token = ${params.token} + `); + + await params.postDeletionTransaction(); + }); }, async touchTokens({ tokens }: { tokens: Array<{ token: string; date: Date }> }) { await pool.query(sql` diff --git a/packages/services/tokens/package.json b/packages/services/tokens/package.json index 6d89ce7d82..87b587b9a8 100644 --- a/packages/services/tokens/package.json +++ b/packages/services/tokens/package.json @@ -17,6 +17,7 @@ "dotenv": "16.4.5", "fastify": "4.28.1", "ioredis": "5.4.1", + "lru-cache": "11.0.2", "ms": "2.1.3", "p-timeout": "6.1.3", "pino-pretty": "11.3.0", diff --git a/packages/services/tokens/src/api.ts b/packages/services/tokens/src/api.ts index ef2342ae2c..07f59fc061 100644 --- a/packages/services/tokens/src/api.ts +++ b/packages/services/tokens/src/api.ts @@ -5,8 +5,8 @@ import { z } from 'zod'; import { createErrorHandler, handleTRPCError, maskToken, metrics } from '@hive/service-common'; import type { inferRouterInputs, inferRouterOutputs } from '@trpc/server'; import { initTRPC } from '@trpc/server'; -import { useCache } from './cache'; import { cacheHits, cacheMisses } from './metrics'; +import { Storage } from './multi-tier-storage'; const httpRequests = new metrics.Counter({ name: 'tokens_http_requests', @@ -43,7 +43,7 @@ function generateToken() { export type Context = { req: FastifyRequest; errorHandler: ReturnType; - getStorage: ReturnType['getStorage']; + storage: Storage; tokenReadFailuresCache: LruType; }; @@ -72,9 +72,7 @@ export const tokensApiRouter = t.router({ ) .query(async ({ ctx, input }) => { try { - const storage = await ctx.getStorage(); - - return await storage.readTarget(input.targetId); + return await ctx.storage.readTarget(input.targetId); } catch (error) { ctx.errorHandler('Failed to get tokens of a target', error as Error); @@ -91,8 +89,7 @@ export const tokensApiRouter = t.router({ ) .mutation(async ({ ctx, input }) => { try { - const storage = await ctx.getStorage(); - await storage.invalidateTokens(input.tokens); + await ctx.storage.invalidateTokens(input.tokens); return true; } catch (error) { @@ -116,9 +113,8 @@ export const tokensApiRouter = t.router({ .mutation(async ({ ctx, input }) => { try { const { target, project, organization, name, scopes } = input; - const storage = await ctx.getStorage(); const token = generateToken(); - const result = await storage.writeToken({ + const result = await ctx.storage.writeToken({ name, target, project, @@ -149,8 +145,7 @@ export const tokensApiRouter = t.router({ .mutation(async ({ ctx, input }) => { try { const hashed_token = input.token; - const storage = await ctx.getStorage(); - await storage.deleteToken(hashed_token); + await ctx.storage.deleteToken(hashed_token); return true; } catch (error) { @@ -180,8 +175,7 @@ export const tokensApiRouter = t.router({ } try { - const storage = await ctx.getStorage(); - const result = await storage.readToken(hash); + const result = await ctx.storage.readToken(hash); // removes the token from the failures cache (in case the value expired) ctx.tokenReadFailuresCache.delete(hash); diff --git a/packages/services/tokens/src/cache.ts b/packages/services/tokens/src/cache.ts deleted file mode 100644 index 1660b2d3cf..0000000000 --- a/packages/services/tokens/src/cache.ts +++ /dev/null @@ -1,290 +0,0 @@ -import type { FastifyBaseLogger } from 'fastify'; -import type { Redis } from 'ioredis'; -import ms from 'ms'; -import LRU from 'tiny-lru'; -import { traceInlineSync } from '@hive/service-common'; -import { ConnectionError } from '@hive/storage'; -import { atomic, until, useActionTracker } from './helpers'; -import { cacheHits, cacheInvalidations, cacheMisses } from './metrics'; -import type { Storage, StorageItem } from './storage'; - -function generateKey(hashedToken: string) { - return `tokens:cache:${hashedToken}`; -} - -interface CacheStorage extends Omit { - invalidateTokens(hashedTokens: string[]): Promise; - shouldCacheError(error: unknown): boolean; -} - -const TTLSeconds = { - /** - * TTL for tokens that don't exist in the DB. - */ - notFound: 60, // seconds - /** - * TTL for tokens that exist in the DB. - */ - found: 60 * 5, // 5 minutes - /** - * TTL for tokens in the in-memory cache. - * Helps to reduce the traffic that goes to Redis (as we read the token from in-memory cache first) and in case Redis is down. - */ - inMemory: 60, // seconds -}; - -function useSafeRedis(redis: Redis, logger: FastifyBaseLogger) { - const cache = LRU(1000, TTLSeconds.inMemory * 1000 /* s -> ms */); - - // Purge the cache when redis is ready (when it reconnects or when it starts) - redis.on('ready', () => { - logger.info('Redis is ready, purging the in-memory cache'); - cache.clear(); - }); - - return { - async get(key: string) { - const cached = cache.get(key); - - if (cached) { - return cached; - } - - if (redis.status === 'ready') { - return redis.get(key); - } - - logger.warn('Redis is not ready, skipping GET'); - return cache.get(key); - }, - async del(keys: string[]) { - for (const key of keys) { - cache.delete(key); - } - - if (redis.status === 'ready') { - if (keys.length > 0) { - await redis.del(...keys); - } - } else { - logger.warn('Redis is not ready, skipping DEL'); - } - }, - async setex(key: string, ttl: number, value: string) { - cache.set(key, value); - - if (redis.status === 'ready') { - await redis.setex(key, ttl, value); - } else { - logger.warn('Redis is not ready, skipping SETEX'); - } - }, - }; -} - -// Cache is a wrapper around the storage that adds a cache layer. -// It also handles invalidation of the cache. -// It also handles the "touch" logic to mark tokens as used and update the "lastUsedAt" column in PG. -// Without the cache we would hit the DB for every request, with the cache we hit it only once (until a token is invalidated). -export function useCache( - storagePromise: Promise, - redisInstance: Redis, - logger: FastifyBaseLogger, -): { - start(): Promise; - stop(): Promise; - readiness(): Promise; - getStorage(): Promise; -} { - let started = false; - let cachedStoragePromise: Promise | null = null; - - function getStorage() { - if (!cachedStoragePromise) { - cachedStoragePromise = create(); - } - - return cachedStoragePromise; - } - - const tracker = useActionTracker(); - const redis = useSafeRedis(redisInstance, logger); - - async function create() { - const storage = await storagePromise; - const touch = useTokenTouchScheduler(storage, logger); - - // When there's a new token or a token was removed we need to invalidate the cache - async function invalidateTokens(hashedTokens: string[]) { - cacheInvalidations.inc(1); - - if (hashedTokens.length > 0) { - await redis.del(hashedTokens.map(generateKey)); - } - } - - // Thanks to the `atomic` function, every call to this function will only be executed once and Promise will be shared. - // This is important because we don't want to make multiple requests to the DB for the same token, at the same time. - const readTokenFromStorage = atomic(async function _readToken(hashedToken: string) { - const item = await storage.readToken(hashedToken); - - if (!item) { - // If the token doesn't exist in the DB we still want to cache it for a short period of time to avoid hitting the DB again and again. - await redis.setex(generateKey(hashedToken), TTLSeconds.notFound, JSON.stringify(null)); - } else { - await redis.setex(generateKey(hashedToken), TTLSeconds.found, JSON.stringify(item)); - } - - return item; - }); - - // Thanks to the `atomic` function, every call to this function will only be executed once and Promise will be shared. - // This is important because we don't want to make multiple requests to Redis for the same token, at the same time. - const readTokenFromRedis = atomic(async function _readToken( - hashedToken: string, - ): Promise { - const item = await redis.get(generateKey(hashedToken)); - - if (typeof item === 'string') { - return JSON.parse(item); - } - - return; - }); - - const cachedStorage: CacheStorage = { - destroy() { - return storage.destroy(); - }, - isReady() { - return storage.isReady(); - }, - shouldCacheError(error) { - return !(error instanceof ConnectionError); - }, - invalidateTokens(hashedTokens) { - return invalidateTokens(hashedTokens); - }, - readTarget(target) { - return storage.readTarget(target); - }, - async readToken(hashedToken) { - const cached = await readTokenFromRedis(hashedToken); - - if (typeof cached !== 'undefined') { - cacheHits.inc(1); - // mark as used - touch.schedule(hashedToken); - return cached; - } - - cacheMisses.inc(1); - - const item = await readTokenFromStorage(hashedToken); - - if (!item) { - return null; - } - - touch.schedule(hashedToken); // mark as used - - return item; - }, - writeToken: tracker.wrap(async item => { - logger.debug('Writing token (target=%s)', item.target); - const result = await storage.writeToken(item); - - return result; - }), - deleteToken: tracker.wrap(async hashedToken => { - await redis.del([generateKey(hashedToken)]); - - return storage.deleteToken(hashedToken); - }), - }; - - started = true; - - return cachedStorage; - } - - async function start() { - await getStorage(); - } - - async function stop() { - logger.info('Started Tokens shutdown...'); - started = false; - - // Wait for all the pending operations to finish - await until(tracker.idle, 10_000).catch(error => { - logger.error('Failed to wait for tokens being idle', error); - }); - - if (cachedStoragePromise) { - await (await cachedStoragePromise).destroy(); - } - - // Wait for Redis to finish all the pending operations - await redisInstance.quit(); - - process.exit(0); - } - - async function readiness() { - return ( - started && - (redisInstance.status === 'ready' || redisInstance.status === 'reconnecting') && - (await (await getStorage()).isReady()) - ); - } - - return { - start, - stop, - readiness, - getStorage, - }; -} - -function useTokenTouchScheduler(storage: Storage, logger: FastifyBaseLogger) { - const scheduledTokens = new Map(); - - /** - * Mark token as used - */ - function schedule(hashedToken: string): void { - const now = new Date(); - scheduledTokens.set(hashedToken, now); - } - - // updated every 10m - const interval = setInterval( - traceInlineSync('Touch Tokens', {}, () => { - if (!scheduledTokens.size) { - return; - } - - const tokens = Array.from(scheduledTokens.entries()).map(([token, date]) => ({ - token, - date, - })); - scheduledTokens.clear(); - - logger.debug(`Touch ${tokens.length} tokens`); - storage.touchTokens(tokens).catch(error => { - logger.error(error); - }); - }), - ms('60s'), - ); - - function dispose() { - clearInterval(interval); - } - - return { - schedule, - dispose, - }; -} diff --git a/packages/services/tokens/src/helpers.ts b/packages/services/tokens/src/helpers.ts index 1ba726dbbe..0671971069 100644 --- a/packages/services/tokens/src/helpers.ts +++ b/packages/services/tokens/src/helpers.ts @@ -1,22 +1,5 @@ import pTimeout from 'p-timeout'; -const requestsInFlight = new Map>(); - -export function atomic(fn: (arg: A) => Promise): (arg: A) => Promise { - return function atomicWrapper(arg) { - if (requestsInFlight.has(arg)) { - return requestsInFlight.get(arg)!; - } - - const promise = fn(arg); - requestsInFlight.set(arg, promise); - - return promise.finally(() => { - requestsInFlight.delete(arg); - }); - }; -} - // It's used to track the number of requests that are in flight. // This is important because we don't want to kill the pod when `DELETE` or `POST` action is in progress. export function useActionTracker() { diff --git a/packages/services/tokens/src/index.ts b/packages/services/tokens/src/index.ts index 73f2ce1ac4..e3a7dc9bf1 100644 --- a/packages/services/tokens/src/index.ts +++ b/packages/services/tokens/src/index.ts @@ -18,9 +18,8 @@ import { } from '@hive/service-common'; import * as Sentry from '@sentry/node'; import { Context, tokensApiRouter } from './api'; -import { useCache } from './cache'; import { env } from './environment'; -import { createStorage } from './storage'; +import { createStorage } from './multi-tier-storage'; export async function main() { let tracing: TracingInstance | undefined; @@ -83,10 +82,11 @@ export async function main() { tls: env.redis.tlsEnabled ? {} : undefined, }); - const { start, stop, readiness, getStorage } = useCache( - createStorage(env.postgres, tracing ? [tracing.instrumentSlonik()] : []), + const storage = await createStorage( + env.postgres, redis, server.log, + tracing ? [tracing.instrumentSlonik()] : [], ); const stopHeartbeats = env.heartbeat @@ -95,14 +95,14 @@ export async function main() { endpoint: env.heartbeat.endpoint, intervalInMS: 20_000, onError: e => server.log.error(e, `Heartbeat failed with error`), - isReady: readiness, + isReady: storage.isReady, }) : startHeartbeats({ enabled: false }); async function shutdown() { stopHeartbeats(); await server.close(); - await stop(); + await storage.close(); } try { @@ -146,7 +146,7 @@ export async function main() { return { req, errorHandler, - getStorage, + storage, tokenReadFailuresCache, }; }, @@ -164,7 +164,7 @@ export async function main() { method: ['GET', 'HEAD'], url: '/_readiness', async handler(_, res) { - const isReady = await readiness(); + const isReady = await storage.isReady(); reportReadiness(isReady); void res.status(isReady ? 200 : 400).send(); }, @@ -178,8 +178,6 @@ export async function main() { port: env.http.port, host: '::', }); - - await start(); } catch (error) { server.log.fatal(error); Sentry.captureException(error, { diff --git a/packages/services/tokens/src/multi-tier-storage.ts b/packages/services/tokens/src/multi-tier-storage.ts new file mode 100644 index 0000000000..f203d60f8e --- /dev/null +++ b/packages/services/tokens/src/multi-tier-storage.ts @@ -0,0 +1,271 @@ +import type { FastifyBaseLogger } from 'fastify'; +import type { Redis } from 'ioredis'; +import { LRUCache } from 'lru-cache'; +import ms from 'ms'; +import { createConnectionString, createTokenStorage, Interceptor, tokens } from '@hive/storage'; +import { until, useActionTracker } from './helpers'; +import { cacheHits, cacheInvalidations, cacheMisses } from './metrics'; + +type CacheEntry = StorageItem | 'not-found'; + +interface StorageItem { + token: string; + name: string; + tokenAlias: string; + date: string; + lastUsedAt: string; + organization: string; + project: string; + target: string; + scopes: readonly string[]; +} + +export interface Storage { + close(): Promise; + isReady(): Promise; + readTarget(targetId: string): Promise; + readToken(hashedToken: string): Promise; + writeToken(item: Omit): Promise; + deleteToken(hashedToken: string): Promise; + invalidateTokens(hashedTokens: string[]): Promise; +} + +export async function createStorage( + config: Parameters[0], + redis: Redis, + logger: FastifyBaseLogger, + additionalInterceptors: Interceptor[], +): Promise { + const tracker = useActionTracker(); + const connectionString = createConnectionString(config); + const db = await createTokenStorage(connectionString, 5, additionalInterceptors.concat([{}])); + const touch = tokenTouchScheduler(logger, async tokens => { + try { + await db.touchTokens({ tokens }); + } catch (error) { + logger.error('Failed to touch tokens', error); + } + }); + const cache = new LRUCache({ + max: 1000, + ttl: ms('5m'), + // Allow to return stale data if the fetchMethod is slow + allowStale: false, + // Don't delete the cache entry if the fetchMethod fails + noDeleteOnFetchRejection: true, + // Allow to return stale data if the fetchMethod fails. + // The rejection reason won't be logged though. + allowStaleOnFetchRejection: true, + // If a cache entry is stale or missing, this method is called + // to fill the cache with fresh data. + // This method is called only once per cache key, + // even if multiple requests are waiting for it. + async fetchMethod(hashedToken) { + // Nothing fresh in the in-memory cache, let's check Redis + let redisData: string | null = null; + + if (redis.status === 'ready') { + redisData = await redis.get(hashedToken).catch(error => { + logger.error('Failed to read token from Redis', error); + return null; + }); + } else { + // TODO: what if redis is not ready? Should we call the DB or return null? + logger.debug('Redis is not ready, skipping cache read'); + } + + if (redisData) { + return JSON.parse(redisData) as CacheEntry; + } + + // Nothing in Redis, let's check the DB + const dbResult = await db.getToken({ token: hashedToken }).catch(error => { + // If the DB is down, we log the error, and we throw exception. + // This will cause the cache to return stale data. + // This may have a performance impact (more calls to Db), but it won't break the system. + logger.error('Failed to read token from the DB', error); + return Promise.reject(error); + }); + + const cacheEntry = dbResult ? transformToken(dbResult) : 'not-found'; + + // Write to Redis, so the next time we can read it from there + await setInRedis(redis, hashedToken, cacheEntry).catch(error => { + logger.error( + 'Failed to write token to Redis, but it was written to the in-memory cache', + error, + ); + }); + + return cacheEntry; + }, + }); + + return { + async close() { + // Wait for all the pending operations to finish + await until(tracker.idle, 10_000).catch(error => { + logger.error('Failed to wait for tokens being idle', error); + }); + await db.destroy(); + // Wait for Redis to finish all the pending operations + await redis.quit(); + }, + isReady: atomic(async () => { + if (redis.status === 'ready' || redis.status === 'reconnecting') { + return db.isReady(); + } + return false; + }), + async readTarget(target) { + const tokens = await db.getTokens({ target }); + return tokens.map(transformToken); + }, + async readToken(hashedToken) { + const data = await cache.fetch(hashedToken); + + if (!data) { + // Looked up in all layers, and the token is not found + return null; + } + + if (data === 'not-found') { + return null; + } + + touch.schedule(hashedToken); + return data; + }, + writeToken: tracker.wrap(async item => { + const result = await db.createToken(item); + + // We don't want to write to in-memory cache, + // as the token may not be used immediately, or at all. + // We write to Redis, so in case the token is used, + // we reuse it for the in-memory cache, without hitting the DB. + const hashedToken = result.token; + const cacheEntry = transformToken(result); + + // Write to Redis gracefully. If it fails, we log the error, but we don't throw. + // The token won't be in Redis cache, but it will be possible to retrieve it from the DB. + // It will affect performance slightly, but it won't break the system. + try { + await setInRedis(redis, hashedToken, cacheEntry); + } catch (error) { + logger.error('Failed to write token to Redis, but it was created in the DB', error); + } + + return cacheEntry; + }), + deleteToken: tracker.wrap(async hashedToken => { + await db.deleteToken({ + token: hashedToken, + async postDeletionTransaction() { + // delete from Redis when the token is deleted from the DB + await redis.del(generateRedisKey(hashedToken)); + // only then delete from the in-memory cache. + // The other replicas will purge the token from + // their own in-memory caches on their own pace (ttl) + cache.delete(hashedToken); + }, + }); + }), + invalidateTokens: tracker.wrap(async hashedTokens => { + if (hashedTokens.length === 0) { + return; + } + + cacheInvalidations.inc(); + + await redis.del(hashedTokens.map(generateRedisKey)); + for (const hashedToken of hashedTokens) { + cache.delete(hashedToken); + } + }), + }; +} + +function transformToken(item: tokens): StorageItem { + return { + token: item.token, + tokenAlias: item.token_alias, + name: item.name, + date: item.created_at as any, + lastUsedAt: item.last_used_at as any, + organization: item.organization_id, + project: item.project_id, + target: item.target_id, + scopes: item.scopes || [], + }; +} + +function generateRedisKey(hashedToken: string) { + // bump the version when the cache format changes + return `tokens:cache:v2:${hashedToken}`; +} + +const promisesInFlight = new Map>(); +function atomic(fn: () => Promise): () => Promise { + const uniqueString = Math.random().toString(36).slice(2) + Date.now().toString(36); + + return function atomicWrapper() { + if (promisesInFlight.has(uniqueString)) { + return promisesInFlight.get(uniqueString)!; + } + + const promise = fn(); + promisesInFlight.set(uniqueString, promise); + + return promise.finally(() => { + promisesInFlight.delete(uniqueString); + }); + }; +} + +async function setInRedis(redis: Redis, hashedToken: string, cacheEntry: CacheEntry) { + if (redis.status !== 'ready') { + return; + } + + const redisKey = generateRedisKey(hashedToken); + await redis.setex(redisKey, ms('24h') / 1000, JSON.stringify(cacheEntry)); +} + +function tokenTouchScheduler( + logger: FastifyBaseLogger, + onTouch: (tokens: Array<{ token: string; date: Date }>) => Promise, +) { + const scheduledTokens = new Map(); + + /** + * Mark token as used + */ + function schedule(hashedToken: string): void { + const now = new Date(); + scheduledTokens.set(hashedToken, now); + } + + const interval = setInterval(() => { + if (!scheduledTokens.size) { + return; + } + + const tokens = Array.from(scheduledTokens.entries()).map(([token, date]) => ({ + token, + date, + })); + scheduledTokens.clear(); + + logger.debug(`Touch ${tokens.length} tokens`); + void onTouch(tokens); + }, ms('60s')); + + function dispose() { + clearInterval(interval); + } + + return { + schedule, + dispose, + }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3501d23123..bb3b672640 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1450,6 +1450,9 @@ importers: ioredis: specifier: 5.4.1 version: 5.4.1 + lru-cache: + specifier: 11.0.2 + version: 11.0.2 ms: specifier: 2.1.3 version: 2.1.3 @@ -11926,6 +11929,10 @@ packages: resolution: {integrity: sha512-2bIM8x+VAf6JT4bKAljS1qUWgMsqZRPGJS6FSahIMPVvctcNhyVp7AJu7quxOW9jwkryBReKZY5tY5JYv2n/7Q==} engines: {node: 14 || >=16.14} + lru-cache@11.0.2: + resolution: {integrity: sha512-123qHRfJBmo2jXDbo/a5YOQrJoHF/GNQTLzQ5+IdK5pWpceK17yRc6ozlWd25FxvGKQbIUs91fDFkXmDHTKcyA==} + engines: {node: 20 || >=22} + lru-cache@4.1.5: resolution: {integrity: sha512-sWZlbEP2OsHNkXrMl5GYk/jKk70MBng6UU4YI/qGDYbgf6YbP4EvmqISbXCoJiRKs+1bSpFHVgQxvJ17F2li5g==} @@ -16228,8 +16235,8 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.596.0(@aws-sdk/client-sts@3.596.0) - '@aws-sdk/client-sts': 3.596.0 + '@aws-sdk/client-sso-oidc': 3.596.0 + '@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0) '@aws-sdk/core': 3.592.0 '@aws-sdk/credential-provider-node': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0) '@aws-sdk/middleware-host-header': 3.577.0 @@ -16336,11 +16343,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sso-oidc@3.596.0(@aws-sdk/client-sts@3.596.0)': + '@aws-sdk/client-sso-oidc@3.596.0': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sts': 3.596.0 + '@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0) '@aws-sdk/core': 3.592.0 '@aws-sdk/credential-provider-node': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0) '@aws-sdk/middleware-host-header': 3.577.0 @@ -16379,7 +16386,6 @@ snapshots: '@smithy/util-utf8': 3.0.0 tslib: 2.8.1 transitivePeerDependencies: - - '@aws-sdk/client-sts' - aws-crt '@aws-sdk/client-sso-oidc@3.693.0(@aws-sdk/client-sts@3.693.0)': @@ -16513,11 +16519,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sts@3.596.0': + '@aws-sdk/client-sts@3.596.0(@aws-sdk/client-sso-oidc@3.596.0)': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.596.0(@aws-sdk/client-sts@3.596.0) + '@aws-sdk/client-sso-oidc': 3.596.0 '@aws-sdk/core': 3.592.0 '@aws-sdk/credential-provider-node': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0) '@aws-sdk/middleware-host-header': 3.577.0 @@ -16556,6 +16562,7 @@ snapshots: '@smithy/util-utf8': 3.0.0 tslib: 2.8.1 transitivePeerDependencies: + - '@aws-sdk/client-sso-oidc' - aws-crt '@aws-sdk/client-sts@3.693.0': @@ -16669,7 +16676,7 @@ snapshots: '@aws-sdk/credential-provider-ini@3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0)': dependencies: - '@aws-sdk/client-sts': 3.596.0 + '@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0) '@aws-sdk/credential-provider-env': 3.587.0 '@aws-sdk/credential-provider-http': 3.596.0 '@aws-sdk/credential-provider-process': 3.587.0 @@ -16788,7 +16795,7 @@ snapshots: '@aws-sdk/credential-provider-web-identity@3.587.0(@aws-sdk/client-sts@3.596.0)': dependencies: - '@aws-sdk/client-sts': 3.596.0 + '@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0) '@aws-sdk/types': 3.577.0 '@smithy/property-provider': 3.1.8 '@smithy/types': 3.6.0 @@ -16963,7 +16970,7 @@ snapshots: '@aws-sdk/token-providers@3.587.0(@aws-sdk/client-sso-oidc@3.596.0)': dependencies: - '@aws-sdk/client-sso-oidc': 3.596.0(@aws-sdk/client-sts@3.596.0) + '@aws-sdk/client-sso-oidc': 3.596.0 '@aws-sdk/types': 3.577.0 '@smithy/property-provider': 3.1.8 '@smithy/shared-ini-file-loader': 3.1.9 @@ -23391,8 +23398,8 @@ snapshots: '@typescript-eslint/parser': 7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3) eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva) eslint-config-prettier: 9.1.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) - eslint-import-resolver-typescript: 3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) - eslint-plugin-import: 2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) + eslint-import-resolver-typescript: 3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) + eslint-plugin-import: 2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) eslint-plugin-jsonc: 2.11.1(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) eslint-plugin-jsx-a11y: 6.8.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) eslint-plugin-mdx: 3.0.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) @@ -26100,13 +26107,13 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)): + eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)): dependencies: debug: 4.3.7(supports-color@8.1.1) enhanced-resolve: 5.17.1 eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva) - eslint-module-utils: 2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) - eslint-plugin-import: 2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) + eslint-module-utils: 2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) + eslint-plugin-import: 2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) fast-glob: 3.3.2 get-tsconfig: 4.7.5 is-core-module: 2.13.1 @@ -26137,14 +26144,14 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-module-utils@2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)): + eslint-module-utils@2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)): dependencies: debug: 3.2.7(supports-color@8.1.1) optionalDependencies: '@typescript-eslint/parser': 7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3) eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva) eslint-import-resolver-node: 0.3.9 - eslint-import-resolver-typescript: 3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) + eslint-import-resolver-typescript: 3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) transitivePeerDependencies: - supports-color @@ -26160,7 +26167,7 @@ snapshots: eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva) eslint-compat-utils: 0.1.2(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) - eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)): + eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)): dependencies: array-includes: 3.1.7 array.prototype.findlastindex: 1.2.3 @@ -26170,7 +26177,7 @@ snapshots: doctrine: 2.1.0 eslint: 8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva) eslint-import-resolver-node: 0.3.9 - eslint-module-utils: 2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-plugin-import@2.29.1(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)))(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) + eslint-module-utils: 2.8.0(@typescript-eslint/parser@7.18.0(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva))(typescript@5.6.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.1(patch_hash=fjbpfrtrjd6idngyeqxnwopfva)) hasown: 2.0.0 is-core-module: 2.13.1 is-glob: 4.0.3 @@ -28642,6 +28649,8 @@ snapshots: lru-cache@10.2.0: {} + lru-cache@11.0.2: {} + lru-cache@4.1.5: dependencies: pseudomap: 1.0.2 From e01e2eb98e52ce4a769bdba2f0559cb617583779 Mon Sep 17 00:00:00 2001 From: Kamil Kisiela Date: Mon, 16 Dec 2024 22:00:54 +0100 Subject: [PATCH 2/9] improvements --- packages/services/tokens/src/helpers.ts | 48 ++++++++- .../services/tokens/src/multi-tier-storage.ts | 99 +++++++++++++------ packages/services/tokens/src/storage.ts | 79 --------------- 3 files changed, 112 insertions(+), 114 deletions(-) delete mode 100644 packages/services/tokens/src/storage.ts diff --git a/packages/services/tokens/src/helpers.ts b/packages/services/tokens/src/helpers.ts index 0671971069..385b70d247 100644 --- a/packages/services/tokens/src/helpers.ts +++ b/packages/services/tokens/src/helpers.ts @@ -1,7 +1,41 @@ import pTimeout from 'p-timeout'; -// It's used to track the number of requests that are in flight. -// This is important because we don't want to kill the pod when `DELETE` or `POST` action is in progress. +const atomicPromisesInFlight = new Map>(); + +/** + * This function is used to share execution across multiple calls of the same function. + * It's useful when you have a function that can be called multiple times in a short period of time, + * but you want to execute it only once. + * + * Once the execution is finished, the function will be available for the next call. + * + * @param fn - Function that should be executed only once per its execution period. + * @returns Function that will execute the original function only once. + */ +export function atomic(fn: () => Promise): () => Promise { + // Generate a unique string for each call of `atomic` function to prevent collisions. + const uniqueId = Math.random().toString(36).slice(2) + Date.now().toString(36); + + return function atomicWrapper() { + const existing = atomicPromisesInFlight.get(uniqueId); + if (existing) { + return existing; + } + + const promise = fn(); + atomicPromisesInFlight.set(uniqueId, promise); + + return promise.finally(() => { + atomicPromisesInFlight.delete(uniqueId); + }); + }; +} + +/** + * It's used to track the number of requests that are in flight. + * This is important because we don't want to kill the pod when + * state mutating requests are in progress. + */ export function useActionTracker() { let actionsInProgress = 0; @@ -26,11 +60,17 @@ export function useActionTracker() { }; } -export function until(fn: () => boolean, timeout: number): Promise { +/** + * This function is used to wait until the condition is met or the timeout is reached. + * + * @param conditionFn - function to check the condition + * @param timeout - timeout in milliseconds + */ +export function until(conditionFn: () => boolean, timeout: number): Promise { return pTimeout( new Promise(resolve => { const interval = setInterval(() => { - if (fn()) { + if (conditionFn()) { clearInterval(interval); resolve(); } diff --git a/packages/services/tokens/src/multi-tier-storage.ts b/packages/services/tokens/src/multi-tier-storage.ts index f203d60f8e..f050e342df 100644 --- a/packages/services/tokens/src/multi-tier-storage.ts +++ b/packages/services/tokens/src/multi-tier-storage.ts @@ -3,7 +3,8 @@ import type { Redis } from 'ioredis'; import { LRUCache } from 'lru-cache'; import ms from 'ms'; import { createConnectionString, createTokenStorage, Interceptor, tokens } from '@hive/storage'; -import { until, useActionTracker } from './helpers'; +import { captureException, captureMessage } from '@sentry/node'; +import { atomic, until, useActionTracker } from './helpers'; import { cacheHits, cacheInvalidations, cacheMisses } from './metrics'; type CacheEntry = StorageItem | 'not-found'; @@ -30,6 +31,17 @@ export interface Storage { invalidateTokens(hashedTokens: string[]): Promise; } +const cacheConfig = { + inMemory: { + maxEntries: 1000, + ttlInMs: ms('5m'), + }, + redis: { + ttlInMs: ms('24h') / 1000, + }, + tokenTouchIntervalInMs: ms('60s'), +} as const; + export async function createStorage( config: Parameters[0], redis: Redis, @@ -38,7 +50,7 @@ export async function createStorage( ): Promise { const tracker = useActionTracker(); const connectionString = createConnectionString(config); - const db = await createTokenStorage(connectionString, 5, additionalInterceptors.concat([{}])); + const db = await createTokenStorage(connectionString, 5, additionalInterceptors); const touch = tokenTouchScheduler(logger, async tokens => { try { await db.touchTokens({ tokens }); @@ -47,8 +59,8 @@ export async function createStorage( } }); const cache = new LRUCache({ - max: 1000, - ttl: ms('5m'), + max: cacheConfig.inMemory.maxEntries, + ttl: cacheConfig.inMemory.ttlInMs, // Allow to return stale data if the fetchMethod is slow allowStale: false, // Don't delete the cache entry if the fetchMethod fails @@ -66,12 +78,19 @@ export async function createStorage( if (redis.status === 'ready') { redisData = await redis.get(hashedToken).catch(error => { - logger.error('Failed to read token from Redis', error); + handleStorageError({ + logger, + error, + logMsg: 'Failed to read token from Redis', + tier: 'redis', + action: 'fetch', + }); return null; }); } else { // TODO: what if redis is not ready? Should we call the DB or return null? - logger.debug('Redis is not ready, skipping cache read'); + logger.warn('Redis is not ready, skipping cache read'); + captureMessage('Redis was not available as secondary cache', 'warning'); } if (redisData) { @@ -83,7 +102,13 @@ export async function createStorage( // If the DB is down, we log the error, and we throw exception. // This will cause the cache to return stale data. // This may have a performance impact (more calls to Db), but it won't break the system. - logger.error('Failed to read token from the DB', error); + handleStorageError({ + logger, + error, + logMsg: 'Failed to read token from the Db', + tier: 'db', + action: 'fetch', + }); return Promise.reject(error); }); @@ -91,10 +116,13 @@ export async function createStorage( // Write to Redis, so the next time we can read it from there await setInRedis(redis, hashedToken, cacheEntry).catch(error => { - logger.error( - 'Failed to write token to Redis, but it was written to the in-memory cache', + handleStorageError({ + logger, error, - ); + logMsg: 'Failed to write token to Redis, but it was written to the in-memory cache', + tier: 'redis', + action: 'set', + }); }); return cacheEntry; @@ -110,6 +138,7 @@ export async function createStorage( await db.destroy(); // Wait for Redis to finish all the pending operations await redis.quit(); + touch.dispose(); }, isReady: atomic(async () => { if (redis.status === 'ready' || redis.status === 'reconnecting') { @@ -152,7 +181,13 @@ export async function createStorage( try { await setInRedis(redis, hashedToken, cacheEntry); } catch (error) { - logger.error('Failed to write token to Redis, but it was created in the DB', error); + handleStorageError({ + logger, + error, + logMsg: 'Failed to write token to Redis, but it was created in the Db', + tier: 'redis', + action: 'set', + }); } return cacheEntry; @@ -204,31 +239,17 @@ function generateRedisKey(hashedToken: string) { return `tokens:cache:v2:${hashedToken}`; } -const promisesInFlight = new Map>(); -function atomic(fn: () => Promise): () => Promise { - const uniqueString = Math.random().toString(36).slice(2) + Date.now().toString(36); - - return function atomicWrapper() { - if (promisesInFlight.has(uniqueString)) { - return promisesInFlight.get(uniqueString)!; - } - - const promise = fn(); - promisesInFlight.set(uniqueString, promise); - - return promise.finally(() => { - promisesInFlight.delete(uniqueString); - }); - }; -} - async function setInRedis(redis: Redis, hashedToken: string, cacheEntry: CacheEntry) { if (redis.status !== 'ready') { return; } const redisKey = generateRedisKey(hashedToken); - await redis.setex(redisKey, ms('24h') / 1000, JSON.stringify(cacheEntry)); + await redis.setex( + redisKey, + Math.ceil(cacheConfig.redis.ttlInMs / 1000), + JSON.stringify(cacheEntry), + ); } function tokenTouchScheduler( @@ -258,7 +279,7 @@ function tokenTouchScheduler( logger.debug(`Touch ${tokens.length} tokens`); void onTouch(tokens); - }, ms('60s')); + }, cacheConfig.tokenTouchIntervalInMs); function dispose() { clearInterval(interval); @@ -269,3 +290,19 @@ function tokenTouchScheduler( dispose, }; } + +async function handleStorageError(params: { + logger: FastifyBaseLogger; + error: unknown; + logMsg: string; + tier: 'redis' | 'db'; + action: 'fetch' | 'set'; +}) { + params.logger.error(params.logMsg, params.error); + captureException(params.error, { + tags: { + storageTier: params.tier, + storageAction: params.action, + }, + }); +} diff --git a/packages/services/tokens/src/storage.ts b/packages/services/tokens/src/storage.ts deleted file mode 100644 index cee837bccb..0000000000 --- a/packages/services/tokens/src/storage.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { createConnectionString, createTokenStorage, Interceptor, tokens } from '@hive/storage'; - -export interface StorageItem { - token: string; - name: string; - tokenAlias: string; - date: string; - lastUsedAt: string; - organization: string; - project: string; - target: string; - scopes: readonly string[]; -} - -export interface Storage { - destroy(): Promise; - isReady(): Promise; - readTarget(targetId: string): Promise; - readToken(hashedToken: string): Promise; - writeToken(item: Omit): Promise; - deleteToken(hashedToken: string): Promise; - touchTokens(tokens: Array<{ token: string; date: Date }>): Promise; -} - -export async function createStorage( - config: Parameters[0], - additionalInterceptors: Interceptor[], -): Promise { - const connectionString = createConnectionString(config); - const db = await createTokenStorage(connectionString, 5, additionalInterceptors); - - function transformToken(item: tokens): StorageItem { - return { - token: item.token, - tokenAlias: item.token_alias, - name: item.name, - date: item.created_at as any, - lastUsedAt: item.last_used_at as any, - organization: item.organization_id, - project: item.project_id, - target: item.target_id, - scopes: item.scopes || [], - }; - } - - return { - destroy() { - return db.destroy(); - }, - isReady() { - return db.isReady(); - }, - async readTarget(target) { - const tokens = await db.getTokens({ target }); - - return tokens.map(transformToken); - }, - async readToken(hashed_token) { - const result = await db.getToken({ token: hashed_token }); - - if (!result) { - return null; - } - - return transformToken(result); - }, - async writeToken(item) { - const result = await db.createToken(item); - - return transformToken(result); - }, - async deleteToken(hashed_token) { - return db.deleteToken({ token: hashed_token }); - }, - touchTokens(tokens) { - return db.touchTokens({ tokens }); - }, - }; -} From 3f2c87e4bf738403887fc9fbeb4953b4564bc2ae Mon Sep 17 00:00:00 2001 From: Kamil Kisiela Date: Tue, 17 Dec 2024 09:55:49 +0100 Subject: [PATCH 3/9] ok, decision made --- packages/services/tokens/src/multi-tier-storage.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/services/tokens/src/multi-tier-storage.ts b/packages/services/tokens/src/multi-tier-storage.ts index f050e342df..46c597d55e 100644 --- a/packages/services/tokens/src/multi-tier-storage.ts +++ b/packages/services/tokens/src/multi-tier-storage.ts @@ -88,8 +88,11 @@ export async function createStorage( return null; }); } else { - // TODO: what if redis is not ready? Should we call the DB or return null? - logger.warn('Redis is not ready, skipping cache read'); + // If redis is not ready, we fallback to the Db. + // This will put more load on the Db, but it won't break the usage reporting. + // It's a temporary state, as fetched value will be written to in-memory cache, + // and to Redis - when it's back online. + logger.warn('Redis is not ready, falling back to Db'); captureMessage('Redis was not available as secondary cache', 'warning'); } From 7c20700f1d75b1f93b1c9f923dbbae5ad92a9c12 Mon Sep 17 00:00:00 2001 From: Kamil Kisiela Date: Tue, 17 Dec 2024 10:43:46 +0100 Subject: [PATCH 4/9] Add a fallback to Redis when Db is down - to serve stale data --- .../services/tokens/src/multi-tier-storage.ts | 124 ++++++++++++++---- 1 file changed, 99 insertions(+), 25 deletions(-) diff --git a/packages/services/tokens/src/multi-tier-storage.ts b/packages/services/tokens/src/multi-tier-storage.ts index 46c597d55e..7e0724e1f5 100644 --- a/packages/services/tokens/src/multi-tier-storage.ts +++ b/packages/services/tokens/src/multi-tier-storage.ts @@ -37,7 +37,8 @@ const cacheConfig = { ttlInMs: ms('5m'), }, redis: { - ttlInMs: ms('24h') / 1000, + ttlInMs: ms('1h'), + staleTtlInMs: ms('24h'), }, tokenTouchIntervalInMs: ms('60s'), } as const; @@ -72,7 +73,7 @@ export async function createStorage( // to fill the cache with fresh data. // This method is called only once per cache key, // even if multiple requests are waiting for it. - async fetchMethod(hashedToken) { + async fetchMethod(hashedToken, staleCacheEntry) { // Nothing fresh in the in-memory cache, let's check Redis let redisData: string | null = null; @@ -101,21 +102,57 @@ export async function createStorage( } // Nothing in Redis, let's check the DB - const dbResult = await db.getToken({ token: hashedToken }).catch(error => { - // If the DB is down, we log the error, and we throw exception. - // This will cause the cache to return stale data. - // This may have a performance impact (more calls to Db), but it won't break the system. - handleStorageError({ - logger, - error, - logMsg: 'Failed to read token from the Db', - tier: 'db', - action: 'fetch', - }); - return Promise.reject(error); - }); + const cacheEntry: CacheEntry = await db + .getToken({ token: hashedToken }) + .then(dbResult => { + if (!dbResult) { + return 'not-found' as CacheEntry; + } + + return transformToken(dbResult); + }) + .catch(async error => { + // If the DB is down, we log the error, and we throw exception. + // This will cause the cache to return stale data. + // This may have a performance impact (more calls to Db), but it won't break the system. + handleStorageError({ + logger, + error, + logMsg: 'Failed to read token from the Db', + tier: 'db', + action: 'fetch', + }); - const cacheEntry = dbResult ? transformToken(dbResult) : 'not-found'; + if (staleCacheEntry) { + logger.warn('Failed to read token from the Db, returning stale data'); + throw error; + } + + if (redis.status !== 'ready') { + logger.warn('Redis is not ready, cannot read stale data from it'); + throw error; + } + + const staleRedisData = await redis + .get(generateStaleRedisKey(hashedToken)) + .catch(error => { + handleStorageError({ + logger, + error, + logMsg: 'Failed to read token from Redis (stale)', + tier: 'redis-stale', + action: 'fetch', + }); + return null; + }); + + if (!staleRedisData) { + logger.debug('No stale data in Redis'); + throw error; + } + + return JSON.parse(staleRedisData) as CacheEntry; + }); // Write to Redis, so the next time we can read it from there await setInRedis(redis, hashedToken, cacheEntry).catch(error => { @@ -200,7 +237,7 @@ export async function createStorage( token: hashedToken, async postDeletionTransaction() { // delete from Redis when the token is deleted from the DB - await redis.del(generateRedisKey(hashedToken)); + await redis.del([generateRedisKey(hashedToken), generateStaleRedisKey(hashedToken)]); // only then delete from the in-memory cache. // The other replicas will purge the token from // their own in-memory caches on their own pace (ttl) @@ -215,7 +252,9 @@ export async function createStorage( cacheInvalidations.inc(); - await redis.del(hashedTokens.map(generateRedisKey)); + await redis.del( + hashedTokens.map(generateRedisKey).concat(hashedTokens.map(generateStaleRedisKey)), + ); for (const hashedToken of hashedTokens) { cache.delete(hashedToken); } @@ -242,17 +281,52 @@ function generateRedisKey(hashedToken: string) { return `tokens:cache:v2:${hashedToken}`; } +function generateStaleRedisKey(hashedToken: string) { + // bump the version when the cache format changes + return `tokens:stale-cache:v2:${hashedToken}`; +} + async function setInRedis(redis: Redis, hashedToken: string, cacheEntry: CacheEntry) { if (redis.status !== 'ready') { return; } - const redisKey = generateRedisKey(hashedToken); - await redis.setex( - redisKey, - Math.ceil(cacheConfig.redis.ttlInMs / 1000), - JSON.stringify(cacheEntry), - ); + const stringifiedCacheEntry = JSON.stringify(cacheEntry); + + const results = await redis + .pipeline() + .setex( + generateRedisKey(hashedToken), + Math.ceil(cacheConfig.redis.ttlInMs / 1000), + stringifiedCacheEntry, + ) + .setex( + generateStaleRedisKey(hashedToken), + Math.ceil(cacheConfig.redis.staleTtlInMs / 1000), + stringifiedCacheEntry, + ) + .exec(); + + if (!results?.length) { + return; + } + + const errors: Error[] = []; + + for (const [error] of results) { + if (error instanceof Error) { + errors.push(error); + } + } + + if (errors.length) { + throw new Error(errors.map(e => e.message).join('\n'), { + cause: { + message: 'SETEX Pipeline Failure', + errors, + }, + }); + } } function tokenTouchScheduler( @@ -298,7 +372,7 @@ async function handleStorageError(params: { logger: FastifyBaseLogger; error: unknown; logMsg: string; - tier: 'redis' | 'db'; + tier: 'redis' | 'redis-stale' | 'db'; action: 'fetch' | 'set'; }) { params.logger.error(params.logMsg, params.error); From 470b5c19b2c084b005d206014ce4a7a0b9af3345 Mon Sep 17 00:00:00 2001 From: Kamil Kisiela Date: Tue, 17 Dec 2024 11:24:38 +0100 Subject: [PATCH 5/9] it's not async --- packages/services/tokens/src/multi-tier-storage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/services/tokens/src/multi-tier-storage.ts b/packages/services/tokens/src/multi-tier-storage.ts index 7e0724e1f5..bf81104fc7 100644 --- a/packages/services/tokens/src/multi-tier-storage.ts +++ b/packages/services/tokens/src/multi-tier-storage.ts @@ -368,7 +368,7 @@ function tokenTouchScheduler( }; } -async function handleStorageError(params: { +function handleStorageError(params: { logger: FastifyBaseLogger; error: unknown; logMsg: string; From 2cc37244e1922fe522e0a5100154b040c37ee516 Mon Sep 17 00:00:00 2001 From: Kamil Kisiela Date: Tue, 17 Dec 2024 14:42:10 +0100 Subject: [PATCH 6/9] ok --- packages/services/tokens/.env.template | 1 + packages/services/tokens/src/api.ts | 2 +- .../services/tokens/src/multi-tier-storage.ts | 134 +++++++++--------- 3 files changed, 70 insertions(+), 67 deletions(-) diff --git a/packages/services/tokens/.env.template b/packages/services/tokens/.env.template index 89aebec59e..e950d62275 100644 --- a/packages/services/tokens/.env.template +++ b/packages/services/tokens/.env.template @@ -8,3 +8,4 @@ REDIS_PORT="6379" REDIS_PASSWORD="" PORT=6001 OPENTELEMETRY_COLLECTOR_ENDPOINT="" +LOG_LEVEL="debug" \ No newline at end of file diff --git a/packages/services/tokens/src/api.ts b/packages/services/tokens/src/api.ts index 07f59fc061..acc76704ca 100644 --- a/packages/services/tokens/src/api.ts +++ b/packages/services/tokens/src/api.ts @@ -175,7 +175,7 @@ export const tokensApiRouter = t.router({ } try { - const result = await ctx.storage.readToken(hash); + const result = await ctx.storage.readToken(hash, alias); // removes the token from the failures cache (in case the value expired) ctx.tokenReadFailuresCache.delete(hash); diff --git a/packages/services/tokens/src/multi-tier-storage.ts b/packages/services/tokens/src/multi-tier-storage.ts index bf81104fc7..cf72188e48 100644 --- a/packages/services/tokens/src/multi-tier-storage.ts +++ b/packages/services/tokens/src/multi-tier-storage.ts @@ -25,7 +25,7 @@ export interface Storage { close(): Promise; isReady(): Promise; readTarget(targetId: string): Promise; - readToken(hashedToken: string): Promise; + readToken(hashedToken: string, maskedToken: string): Promise; writeToken(item: Omit): Promise; deleteToken(hashedToken: string): Promise; invalidateTokens(hashedTokens: string[]): Promise; @@ -46,20 +46,26 @@ const cacheConfig = { export async function createStorage( config: Parameters[0], redis: Redis, - logger: FastifyBaseLogger, + serverLogger: FastifyBaseLogger, additionalInterceptors: Interceptor[], ): Promise { const tracker = useActionTracker(); const connectionString = createConnectionString(config); const db = await createTokenStorage(connectionString, 5, additionalInterceptors); - const touch = tokenTouchScheduler(logger, async tokens => { + const touch = tokenTouchScheduler(serverLogger, async tokens => { try { await db.touchTokens({ tokens }); } catch (error) { - logger.error('Failed to touch tokens', error); + serverLogger.error('Failed to touch tokens', error); } }); - const cache = new LRUCache({ + const cache = new LRUCache< + string, + CacheEntry, + { + maskedToken: string; + } + >({ max: cacheConfig.inMemory.maxEntries, ttl: cacheConfig.inMemory.ttlInMs, // Allow to return stale data if the fetchMethod is slow @@ -73,12 +79,14 @@ export async function createStorage( // to fill the cache with fresh data. // This method is called only once per cache key, // even if multiple requests are waiting for it. - async fetchMethod(hashedToken, staleCacheEntry) { + async fetchMethod(hashedToken, _staleEntry, { context }) { // Nothing fresh in the in-memory cache, let's check Redis + + const logger = serverLogger.child({ maskedToken: context.maskedToken }); let redisData: string | null = null; if (redis.status === 'ready') { - redisData = await redis.get(hashedToken).catch(error => { + redisData = await redis.get(generateRedisKey(hashedToken)).catch(error => { handleStorageError({ logger, error, @@ -98,74 +106,64 @@ export async function createStorage( } if (redisData) { + logger.debug('Returning fresh data from Redis'); return JSON.parse(redisData) as CacheEntry; } - // Nothing in Redis, let's check the DB - const cacheEntry: CacheEntry = await db - .getToken({ token: hashedToken }) - .then(dbResult => { - if (!dbResult) { - return 'not-found' as CacheEntry; - } - - return transformToken(dbResult); - }) - .catch(async error => { - // If the DB is down, we log the error, and we throw exception. - // This will cause the cache to return stale data. - // This may have a performance impact (more calls to Db), but it won't break the system. + try { + // Nothing in Redis, let's check the DB + const dbResult = await db.getToken({ token: hashedToken }); + const cacheEntry = dbResult ? transformToken(dbResult) : 'not-found'; + + // Write to Redis, so the next time we can read it from there + await setInRedis(redis, hashedToken, cacheEntry).catch(error => { handleStorageError({ logger, error, - logMsg: 'Failed to read token from the Db', - tier: 'db', - action: 'fetch', + logMsg: 'Failed to write token to Redis, but it was written to the in-memory cache', + tier: 'redis', + action: 'set', }); - - if (staleCacheEntry) { - logger.warn('Failed to read token from the Db, returning stale data'); - throw error; - } - - if (redis.status !== 'ready') { - logger.warn('Redis is not ready, cannot read stale data from it'); - throw error; - } - - const staleRedisData = await redis - .get(generateStaleRedisKey(hashedToken)) - .catch(error => { - handleStorageError({ - logger, - error, - logMsg: 'Failed to read token from Redis (stale)', - tier: 'redis-stale', - action: 'fetch', - }); - return null; - }); - - if (!staleRedisData) { - logger.debug('No stale data in Redis'); - throw error; - } - - return JSON.parse(staleRedisData) as CacheEntry; }); - - // Write to Redis, so the next time we can read it from there - await setInRedis(redis, hashedToken, cacheEntry).catch(error => { + } catch (error) { + // If the DB is down, we log the error, and we throw exception. + // This will cause the cache to return stale data. + // This may have a performance impact (more calls to Db), but it won't break the system. handleStorageError({ logger, error, - logMsg: 'Failed to write token to Redis, but it was written to the in-memory cache', - tier: 'redis', - action: 'set', + logMsg: 'Failed to read token from the Db', + tier: 'db', + action: 'fetch', }); - }); - return cacheEntry; + if (redis.status !== 'ready') { + logger.warn('Redis is not ready, cannot read stale data from it'); + throw error; + } + + const staleRedisData = await redis.get(generateStaleRedisKey(hashedToken)).catch(error => { + handleStorageError({ + logger, + error, + logMsg: 'Failed to read token from Redis (stale)', + tier: 'redis-stale', + action: 'fetch', + }); + return null; + }); + + if (!staleRedisData) { + logger.debug('No stale data in Redis'); + throw error; + } + + logger.debug('Returning stale data from Redis'); + // Stale data will be cached in the in-memory cache only, as it's not fresh. + return JSON.parse(staleRedisData) as CacheEntry; + } + + throw new Error('Unexpected code path'); }, }); @@ -173,7 +171,7 @@ export async function createStorage( async close() { // Wait for all the pending operations to finish await until(tracker.idle, 10_000).catch(error => { - logger.error('Failed to wait for tokens being idle', error); + serverLogger.error('Failed to wait for tokens being idle', error); }); await db.destroy(); // Wait for Redis to finish all the pending operations @@ -190,8 +188,12 @@ export async function createStorage( const tokens = await db.getTokens({ target }); return tokens.map(transformToken); }, - async readToken(hashedToken) { - const data = await cache.fetch(hashedToken); + async readToken(hashedToken, maskedToken) { + const data = await cache.fetch(hashedToken, { + context: { + maskedToken, + }, + }); if (!data) { // Looked up in all layers, and the token is not found @@ -222,7 +224,7 @@ export async function createStorage( await setInRedis(redis, hashedToken, cacheEntry); } catch (error) { handleStorageError({ - logger, + logger: serverLogger, error, logMsg: 'Failed to write token to Redis, but it was created in the Db', tier: 'redis', From 0a7028194efd1a6f70afefe0655c4bb72623bbb2 Mon Sep 17 00:00:00 2001 From: Kamil Kisiela Date: Tue, 17 Dec 2024 15:45:06 +0100 Subject: [PATCH 7/9] metrics --- packages/services/tokens/src/api.ts | 7 ++-- packages/services/tokens/src/metrics.ts | 34 ++++++++++++++----- .../services/tokens/src/multi-tier-storage.ts | 20 +++++++---- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/packages/services/tokens/src/api.ts b/packages/services/tokens/src/api.ts index acc76704ca..5600c49253 100644 --- a/packages/services/tokens/src/api.ts +++ b/packages/services/tokens/src/api.ts @@ -5,7 +5,7 @@ import { z } from 'zod'; import { createErrorHandler, handleTRPCError, maskToken, metrics } from '@hive/service-common'; import type { inferRouterInputs, inferRouterOutputs } from '@trpc/server'; import { initTRPC } from '@trpc/server'; -import { cacheHits, cacheMisses } from './metrics'; +import { recordTokenRead } from './metrics'; import { Storage } from './multi-tier-storage'; const httpRequests = new metrics.Counter({ @@ -170,13 +170,12 @@ export const tokensApiRouter = t.router({ const cachedFailure = ctx.tokenReadFailuresCache.get(hash); if (cachedFailure) { - cacheHits.inc(1); throw new Error(cachedFailure); } try { const result = await ctx.storage.readToken(hash, alias); - + recordTokenRead(result ? 200 : 404); // removes the token from the failures cache (in case the value expired) ctx.tokenReadFailuresCache.delete(hash); @@ -187,8 +186,8 @@ export const tokensApiRouter = t.router({ // set token read as failure // so we don't try to read it again for next X minutes ctx.tokenReadFailuresCache.set(hash, (error as Error).message); - cacheMisses.inc(1); + recordTokenRead(500); throw error; } }), diff --git a/packages/services/tokens/src/metrics.ts b/packages/services/tokens/src/metrics.ts index 5389c0076d..ad7f55f560 100644 --- a/packages/services/tokens/src/metrics.ts +++ b/packages/services/tokens/src/metrics.ts @@ -1,16 +1,32 @@ +import type { LRUCache } from 'lru-cache'; import { metrics } from '@hive/service-common'; -export const cacheHits = new metrics.Counter({ - name: 'tokens_cache_hits', - help: 'Number of cache hits', +const tokenReads = new metrics.Counter({ + name: 'tokens_reads', + help: 'Number of token reads', + labelNames: ['status'], }); -export const cacheMisses = new metrics.Counter({ - name: 'tokens_cache_misses', - help: 'Number of cache misses', +const cacheReads = new metrics.Counter({ + name: 'tokens_cache_reads', + help: 'Number of cache reads', + labelNames: ['status'], }); -export const cacheInvalidations = new metrics.Counter({ - name: 'tokens_cache_invalidations', - help: 'Number of cache invalidations', +const cacheFills = new metrics.Counter({ + name: 'tokens_cache_fills', + help: 'Number of cache fills', + labelNames: ['source'], }); + +export function recordCacheRead(status: NonNullable['fetch']>) { + cacheReads.inc({ status }); +} + +export function recordCacheFill(source: 'db' | 'redis-fresh' | 'redis-stale') { + cacheFills.inc({ source }); +} + +export function recordTokenRead(status: 200 | 500 | 404) { + tokenReads.inc({ status }); +} diff --git a/packages/services/tokens/src/multi-tier-storage.ts b/packages/services/tokens/src/multi-tier-storage.ts index cf72188e48..afe84e7f95 100644 --- a/packages/services/tokens/src/multi-tier-storage.ts +++ b/packages/services/tokens/src/multi-tier-storage.ts @@ -5,7 +5,7 @@ import ms from 'ms'; import { createConnectionString, createTokenStorage, Interceptor, tokens } from '@hive/storage'; import { captureException, captureMessage } from '@sentry/node'; import { atomic, until, useActionTracker } from './helpers'; -import { cacheHits, cacheInvalidations, cacheMisses } from './metrics'; +import { recordCacheFill, recordCacheRead } from './metrics'; type CacheEntry = StorageItem | 'not-found'; @@ -106,6 +106,7 @@ export async function createStorage( } if (redisData) { + recordCacheFill('redis-fresh'); logger.debug('Returning fresh data from Redis'); return JSON.parse(redisData) as CacheEntry; } @@ -114,6 +115,7 @@ export async function createStorage( // Nothing in Redis, let's check the DB const dbResult = await db.getToken({ token: hashedToken }); const cacheEntry = dbResult ? transformToken(dbResult) : 'not-found'; + recordCacheFill('db'); // Write to Redis, so the next time we can read it from there await setInRedis(redis, hashedToken, cacheEntry).catch(error => { @@ -158,6 +160,7 @@ export async function createStorage( throw error; } + recordCacheFill('redis-stale'); logger.debug('Returning stale data from Redis'); // Stale data will be cached in the in-memory cache only, as it's not fresh. return JSON.parse(staleRedisData) as CacheEntry; @@ -189,12 +192,19 @@ export async function createStorage( return tokens.map(transformToken); }, async readToken(hashedToken, maskedToken) { + const status: LRUCache.Status = {}; + const context = { maskedToken, source: 'in-memory' }; const data = await cache.fetch(hashedToken, { - context: { - maskedToken, - }, + context, + status, }); + if (status.fetch) { + recordCacheRead(status.fetch); + } else { + serverLogger.warn('Status of the fetch is missing'); + } + if (!data) { // Looked up in all layers, and the token is not found return null; @@ -252,8 +262,6 @@ export async function createStorage( return; } - cacheInvalidations.inc(); - await redis.del( hashedTokens.map(generateRedisKey).concat(hashedTokens.map(generateStaleRedisKey)), ); From be2fe540ea462d900af8123f6a64e9ca9d49e1b7 Mon Sep 17 00:00:00 2001 From: Kamil Kisiela Date: Thu, 19 Dec 2024 12:38:33 +0100 Subject: [PATCH 8/9] move trasnform token --- packages/services/api/src/shared/entities.ts | 2 +- packages/services/storage/src/tokens.ts | 28 +++++++++++++++---- .../services/tokens/src/multi-tier-storage.ts | 23 +++------------ 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/packages/services/api/src/shared/entities.ts b/packages/services/api/src/shared/entities.ts index dbf6e8700a..fc6f61cb7c 100644 --- a/packages/services/api/src/shared/entities.ts +++ b/packages/services/api/src/shared/entities.ts @@ -318,7 +318,7 @@ export interface Token { project: string; organization: string; date: string; - lastUsedAt: string; + lastUsedAt: string | null; scopes: readonly string[]; } diff --git a/packages/services/storage/src/tokens.ts b/packages/services/storage/src/tokens.ts index dfb521db6e..8f11c546c9 100644 --- a/packages/services/storage/src/tokens.ts +++ b/packages/services/storage/src/tokens.ts @@ -2,6 +2,20 @@ import { Interceptor, sql } from 'slonik'; import { getPool, toDate, tokens } from './db'; import type { Slonik } from './shared'; +function transformToken(row: tokens) { + return { + token: row.token, + tokenAlias: row.token_alias, + name: row.name, + date: row.created_at as unknown as string, + lastUsedAt: row.last_used_at as unknown as string | null, + organization: row.organization_id, + project: row.project_id, + target: row.target_id, + scopes: row.scopes || [], + }; +} + export async function createTokenStorage( connection: string, maximumPoolSize: number, @@ -33,10 +47,10 @@ export async function createTokenStorage( `, ); - return result.rows; + return result.rows.map(transformToken); }, async getToken({ token }: { token: string }) { - return pool.maybeOne>( + const row = await pool.maybeOne>( sql` SELECT * FROM tokens @@ -44,8 +58,10 @@ export async function createTokenStorage( LIMIT 1 `, ); + + return row ? transformToken(row) : null; }, - createToken({ + async createToken({ token, tokenAlias, target, @@ -62,7 +78,7 @@ export async function createTokenStorage( organization: string; scopes: readonly string[]; }) { - return pool.one>( + const row = await pool.one>( sql` INSERT INTO tokens (name, token, token_alias, target_id, project_id, organization_id, scopes) @@ -73,7 +89,9 @@ export async function createTokenStorage( )}) RETURNING * `, - ); + ) + + return transformToken(row); }, async deleteToken(params: { token: string; postDeletionTransaction: () => Promise }) { await pool.transaction(async t => { diff --git a/packages/services/tokens/src/multi-tier-storage.ts b/packages/services/tokens/src/multi-tier-storage.ts index afe84e7f95..6895d38d38 100644 --- a/packages/services/tokens/src/multi-tier-storage.ts +++ b/packages/services/tokens/src/multi-tier-storage.ts @@ -14,7 +14,7 @@ interface StorageItem { name: string; tokenAlias: string; date: string; - lastUsedAt: string; + lastUsedAt: string | null; organization: string; project: string; target: string; @@ -114,7 +114,7 @@ export async function createStorage( try { // Nothing in Redis, let's check the DB const dbResult = await db.getToken({ token: hashedToken }); - const cacheEntry = dbResult ? transformToken(dbResult) : 'not-found'; + const cacheEntry = dbResult ?? 'not-found'; recordCacheFill('db'); // Write to Redis, so the next time we can read it from there @@ -188,8 +188,7 @@ export async function createStorage( return false; }), async readTarget(target) { - const tokens = await db.getTokens({ target }); - return tokens.map(transformToken); + return db.getTokens({ target }); }, async readToken(hashedToken, maskedToken) { const status: LRUCache.Status = {}; @@ -225,7 +224,7 @@ export async function createStorage( // We write to Redis, so in case the token is used, // we reuse it for the in-memory cache, without hitting the DB. const hashedToken = result.token; - const cacheEntry = transformToken(result); + const cacheEntry = result; // Write to Redis gracefully. If it fails, we log the error, but we don't throw. // The token won't be in Redis cache, but it will be possible to retrieve it from the DB. @@ -272,20 +271,6 @@ export async function createStorage( }; } -function transformToken(item: tokens): StorageItem { - return { - token: item.token, - tokenAlias: item.token_alias, - name: item.name, - date: item.created_at as any, - lastUsedAt: item.last_used_at as any, - organization: item.organization_id, - project: item.project_id, - target: item.target_id, - scopes: item.scopes || [], - }; -} - function generateRedisKey(hashedToken: string) { // bump the version when the cache format changes return `tokens:cache:v2:${hashedToken}`; From 87596dbe9d6f1ea21a84e7779a5fda2dd07f9b8e Mon Sep 17 00:00:00 2001 From: Kamil Kisiela Date: Thu, 19 Dec 2024 13:14:29 +0100 Subject: [PATCH 9/9] E S Lint - it's in the game --- packages/services/tokens/src/multi-tier-storage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/services/tokens/src/multi-tier-storage.ts b/packages/services/tokens/src/multi-tier-storage.ts index 6895d38d38..01f230c2f1 100644 --- a/packages/services/tokens/src/multi-tier-storage.ts +++ b/packages/services/tokens/src/multi-tier-storage.ts @@ -2,7 +2,7 @@ import type { FastifyBaseLogger } from 'fastify'; import type { Redis } from 'ioredis'; import { LRUCache } from 'lru-cache'; import ms from 'ms'; -import { createConnectionString, createTokenStorage, Interceptor, tokens } from '@hive/storage'; +import { createConnectionString, createTokenStorage, Interceptor } from '@hive/storage'; import { captureException, captureMessage } from '@sentry/node'; import { atomic, until, useActionTracker } from './helpers'; import { recordCacheFill, recordCacheRead } from './metrics';