From 51859c38a728c5f05cf30f09f4c6a55657895de7 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Mon, 6 Oct 2025 17:57:11 +0200 Subject: [PATCH] chore(): Default caching configuration and gracefull redis error handling (#13663) * chore(): Default caching configuration and gracefull redis error handling * Create odd-moons-crash.md * chore(): Default caching configuration and gracefull redis error handling * fixes * address feedback * revert(): Test utils imit module fix * reconnect * reconnect * reconnect --- .changeset/odd-moons-crash.md | 6 + .../common/__tests__/define-config.spec.ts | 48 ++ .../core/utils/src/common/define-config.ts | 18 + .../medusa-test-utils/src/init-modules.ts | 4 +- .../caching-redis/src/loaders/connection.ts | 67 +- .../caching-redis/src/services/redis-cache.ts | 664 ++++++++++-------- .../caching-redis/src/types/index.ts | 19 +- .../src/loaders/redis.ts | 8 +- .../utils/workflow-orchestrator-storage.ts | 59 ++ 9 files changed, 565 insertions(+), 328 deletions(-) create mode 100644 .changeset/odd-moons-crash.md diff --git a/.changeset/odd-moons-crash.md b/.changeset/odd-moons-crash.md new file mode 100644 index 0000000000..e728d84ca9 --- /dev/null +++ b/.changeset/odd-moons-crash.md @@ -0,0 +1,6 @@ +--- +"@medusajs/caching-redis": patch +"@medusajs/utils": patch +--- + +chore(): Default caching configuration and gracefull redis error handling diff --git a/packages/core/utils/src/common/__tests__/define-config.spec.ts b/packages/core/utils/src/common/__tests__/define-config.spec.ts index 2276383d02..9e7bca9892 100644 --- a/packages/core/utils/src/common/__tests__/define-config.spec.ts +++ b/packages/core/utils/src/common/__tests__/define-config.spec.ts @@ -1048,6 +1048,7 @@ describe("defineConfig", function () { process.env.EXECUTION_CONTEXT = "medusa-cloud" process.env.REDIS_URL = "redis://localhost:6379" + process.env.CACHE_REDIS_URL = "redis://localhost:6379" process.env.S3_FILE_URL = "https://s3.amazonaws.com/medusa-cloud-test" process.env.S3_PREFIX = "test" process.env.S3_REGION = "us-east-1" @@ -1086,6 +1087,21 @@ describe("defineConfig", function () { }, "resolve": "@medusajs/medusa/cache-redis", }, + "caching": { + "options": { + "providers": [ + { + "id": "caching-redis", + "is_default": true, + "options": { + "redisUrl": "redis://localhost:6379", + }, + "resolve": "@medusajs/medusa/caching-redis", + }, + ], + }, + "resolve": "@medusajs/caching", + }, "cart": { "resolve": "@medusajs/medusa/cart", }, @@ -1255,6 +1271,7 @@ describe("defineConfig", function () { process.env.EXECUTION_CONTEXT = "medusa-cloud" process.env.REDIS_URL = "redis://localhost:6379" + process.env.CACHE_REDIS_URL = "redis://localhost:6379" process.env.S3_FILE_URL = "https://s3.amazonaws.com/medusa-cloud-test" process.env.S3_PREFIX = "test" process.env.S3_REGION = "us-east-1" @@ -1294,6 +1311,21 @@ describe("defineConfig", function () { }, "resolve": "@medusajs/medusa/cache-redis", }, + "caching": { + "options": { + "providers": [ + { + "id": "caching-redis", + "is_default": true, + "options": { + "redisUrl": "redis://localhost:6379", + }, + "resolve": "@medusajs/medusa/caching-redis", + }, + ], + }, + "resolve": "@medusajs/caching", + }, "cart": { "resolve": "@medusajs/medusa/cart", }, @@ -1473,6 +1505,7 @@ describe("defineConfig", function () { process.env.EXECUTION_CONTEXT = "medusa-cloud" process.env.REDIS_URL = "redis://localhost:6379" + process.env.CACHE_REDIS_URL = "redis://localhost:6379" process.env.S3_FILE_URL = "https://s3.amazonaws.com/medusa-cloud-test" process.env.S3_PREFIX = "test" process.env.S3_REGION = "us-east-1" @@ -1518,6 +1551,21 @@ describe("defineConfig", function () { }, "resolve": "@medusajs/medusa/cache-redis", }, + "caching": { + "options": { + "providers": [ + { + "id": "caching-redis", + "is_default": true, + "options": { + "redisUrl": "redis://localhost:6379", + }, + "resolve": "@medusajs/medusa/caching-redis", + }, + ], + }, + "resolve": "@medusajs/caching", + }, "cart": { "resolve": "@medusajs/medusa/cart", }, diff --git a/packages/core/utils/src/common/define-config.ts b/packages/core/utils/src/common/define-config.ts index 4e77f79101..d5db7e8822 100644 --- a/packages/core/utils/src/common/define-config.ts +++ b/packages/core/utils/src/common/define-config.ts @@ -303,6 +303,24 @@ function resolveModules( }, ] + if (process.env.CACHE_REDIS_URL) { + cloudModules.push({ + resolve: MODULE_PACKAGE_NAMES[Modules.CACHING], + options: { + providers: [ + { + id: "caching-redis", + resolve: "@medusajs/medusa/caching-redis", + is_default: true, + options: { + redisUrl: process.env.CACHE_REDIS_URL, + }, + }, + ], + }, + }) + } + /** * The default set of modules to always use. The end user can swap * the modules by providing an alternate implementation via their diff --git a/packages/medusa-test-utils/src/init-modules.ts b/packages/medusa-test-utils/src/init-modules.ts index 70951c1f7a..c7eceb028f 100644 --- a/packages/medusa-test-utils/src/init-modules.ts +++ b/packages/medusa-test-utils/src/init-modules.ts @@ -64,12 +64,12 @@ export async function initModules({ async function shutdown() { const promises: Promise[] = [] - promises.push(medusaApp.onApplicationPrepareShutdown()) - promises.push(medusaApp.onApplicationShutdown()) if (shouldDestroyConnectionAutomatically) { promises.push((sharedPgConnection as any).context?.destroy()) promises.push((sharedPgConnection as any).destroy()) + promises.push(medusaApp.onApplicationPrepareShutdown()) + promises.push(medusaApp.onApplicationShutdown()) } else { if (!preventConnectionDestroyWarning) { logger.info( diff --git a/packages/modules/providers/caching-redis/src/loaders/connection.ts b/packages/modules/providers/caching-redis/src/loaders/connection.ts index 5e1590a4db..95f19bde43 100644 --- a/packages/modules/providers/caching-redis/src/loaders/connection.ts +++ b/packages/modules/providers/caching-redis/src/loaders/connection.ts @@ -4,7 +4,7 @@ import type { ModulesSdkTypes, } from "@medusajs/framework/types" import { RedisCacheModuleOptions } from "@types" -import Redis from "ioredis" +import Redis, { type RedisOptions } from "ioredis" export default async ( { @@ -23,36 +23,56 @@ export default async ( const moduleOptions = (options ?? moduleDeclaration?.options ?? - {}) as RedisCacheModuleOptions & { - redisUrl?: string - } + {}) as RedisCacheModuleOptions - if (!moduleOptions.redisUrl) { + const { redisUrl, ...redisOptions_ } = moduleOptions + if (!redisUrl) { throw new Error("[caching-redis] redisUrl is required") } let redisClient: Redis - try { - redisClient = new Redis(moduleOptions.redisUrl!, { - connectTimeout: 10000, - lazyConnect: true, - retryDelayOnFailover: 100, - connectionName: "medusa-cache-redis", - ...moduleOptions, - }) + const redisOptions: RedisOptions = { + connectTimeout: 10000, + commandTimeout: 5000, + lazyConnect: true, + maxRetriesPerRequest: 3, + enableOfflineQueue: true, + connectionName: "medusa-cache-redis", + ...redisOptions_, + } - // Test connection - await redisClient.ping() + redisClient = new Redis(redisUrl!, redisOptions) + + // Handle connection errors gracefully + redisClient.on("error", (error) => { + logger_.warn(`Redis cache connection error: ${error.message}`) + }) + + redisClient.on("connect", () => { logger_.info("Redis cache connection established successfully") + }) + + redisClient.on("ready", () => { + logger_.info("Redis cache is ready to accept commands") + }) + + redisClient.on("close", () => { + logger_.warn("Redis cache connection closed") + }) + + redisClient.on("reconnecting", () => { + logger_.info("Redis cache attempting to reconnect...") + }) + + try { + // Test connection with timeout + await redisClient.ping() + logger_.info("Redis cache connection test successful") } catch (error) { - logger_.error(`Failed to connect to Redis cache: ${error.message}`) - redisClient = new Redis(moduleOptions.redisUrl!, { - connectTimeout: 10000, - lazyConnect: true, - retryDelayOnFailover: 100, - ...moduleOptions, - }) + logger_.warn( + `Redis cache connection test failed: ${error.message}, but continuing with lazy connection` + ) } container.register({ @@ -62,5 +82,8 @@ export default async ( prefix: { resolve: () => moduleOptions.prefix ?? "mc:", }, + logger: { + resolve: () => logger_, + }, }) } diff --git a/packages/modules/providers/caching-redis/src/services/redis-cache.ts b/packages/modules/providers/caching-redis/src/services/redis-cache.ts index 3c0959dbf0..5c67ee7c6c 100644 --- a/packages/modules/providers/caching-redis/src/services/redis-cache.ts +++ b/packages/modules/providers/caching-redis/src/services/redis-cache.ts @@ -1,3 +1,4 @@ +import { Logger } from "@medusajs/framework/types" import { RedisCacheModuleOptions } from "@types" import { Redis } from "ioredis" import { createGunzip, createGzip } from "zlib" @@ -10,13 +11,20 @@ export class RedisCachingProvider { protected defaultTTL: number protected compressionThreshold: number protected hasher: (key: string) => string + protected logger: Logger constructor( { redisClient, + logger, prefix, hasher, - }: { redisClient: Redis; prefix: string; hasher: (key: string) => string }, + }: { + redisClient: Redis + prefix: string + hasher: (key: string) => string + logger: Logger + }, options?: RedisCacheModuleOptions ) { this.redisClient = redisClient @@ -24,6 +32,29 @@ export class RedisCachingProvider { this.defaultTTL = options?.ttl ?? 3600 // 1 hour default this.compressionThreshold = options?.compressionThreshold ?? 2048 // 2KB default this.hasher = hasher + this.logger = logger + } + + private isConnectionError(error: any): boolean { + return ( + error.code === "ECONNREFUSED" || + error.code === "ENOTFOUND" || + error.code === "ETIMEDOUT" || + error.code === "ECONNRESET" || + error.code === "EPIPE" || + error.message?.includes("Connection is closed") || + error.message?.includes("connect ECONNREFUSED") || + error.message?.includes("connect ETIMEDOUT") || + error.message?.includes("Command timed out") || + error.message?.includes("Maximum number of retries exceeded") || + ["connecting", "reconnecting", "disconnecting", "wait", "end"].includes( + this.redisClient.status + ) + ) + } + + private isConnectionHealthy(): boolean { + return this.redisClient.status === "ready" } #getKeyName(key: string): string { @@ -252,66 +283,92 @@ export class RedisCachingProvider { } async get({ key, tags }: { key?: string; tags?: string[] }): Promise { - if (key) { - const keyName = this.#getKeyName(key) - const buffer = await this.redisClient.hgetBuffer(keyName, "data") - if (!buffer) { - return null - } + if (!this.isConnectionHealthy()) { + return null + } - const finalData = await this.#decompressData(buffer) - return JSON.parse(finalData) + if (key) { + try { + const keyName = this.#getKeyName(key) + const buffer = await this.redisClient.hgetBuffer(keyName, "data") + if (!buffer) { + return null + } + + const finalData = await this.#decompressData(buffer) + return JSON.parse(finalData) + } catch (error) { + if (this.isConnectionError(error)) { + this.logger.warn( + "Redis connection error during get operation, returning null to trigger fallback to original data source" + ) + return null + } + throw error + } } if (tags?.length) { - // Get all keys associated with the tags - const pipeline = this.redisClient.pipeline() - tags.forEach((tag) => { - const tagKey = this.#getTagKey(tag) - pipeline.smembers(tagKey) - }) + try { + // Get all keys associated with the tags + const pipeline = this.redisClient.pipeline() + tags.forEach((tag) => { + const tagKey = this.#getTagKey(tag) + pipeline.smembers(tagKey) + }) - const tagResults = await pipeline.exec() - const allKeys = new Set() + const tagResults = await pipeline.exec() + const allKeys = new Set() - tagResults?.forEach((result, index) => { - if (result && result[1]) { - ;(result[1] as string[]).forEach((key) => allKeys.add(key)) + tagResults?.forEach((result, index) => { + if (result && result[1]) { + ;(result[1] as string[]).forEach((key) => allKeys.add(key)) + } + }) + + if (allKeys.size === 0) { + return [] } - }) - if (allKeys.size === 0) { - return [] - } + // Get all hash data for the keys + const valuePipeline = this.redisClient.pipeline() + Array.from(allKeys).forEach((key) => { + valuePipeline.hgetBuffer(key, "data") + }) - // Get all hash data for the keys - const valuePipeline = this.redisClient.pipeline() - Array.from(allKeys).forEach((key) => { - valuePipeline.hgetBuffer(key, "data") - }) + const valueResults = await valuePipeline.exec() + const results: any[] = [] - const valueResults = await valuePipeline.exec() - const results: any[] = [] - - const decompressionPromises = (valueResults || []).map(async (result) => { - if (result && result[1]) { - const buffer = result[1] as Buffer - try { - const finalData = await this.#decompressData(buffer) - return JSON.parse(finalData) - } catch (e) { - // If JSON parsing fails, skip this entry (corrupted data) - console.warn(`Skipping corrupted cache entry: ${e.message}`) + const decompressionPromises = (valueResults || []).map( + async (result) => { + if (result && result[1]) { + const buffer = result[1] as Buffer + try { + const finalData = await this.#decompressData(buffer) + return JSON.parse(finalData) + } catch (e) { + // If JSON parsing fails, skip this entry (corrupted data) + this.logger.warn(`Skipping corrupted cache entry: ${e.message}`) + return null + } + } return null } + ) + + const decompressionResults = await Promise.all(decompressionPromises) + results.push(...decompressionResults.filter(Boolean)) + + return results + } catch (error) { + if (this.isConnectionError(error)) { + this.logger.warn( + "Redis connection error during get operation, returning empty array to trigger fallback to original data source" + ) + return null } - return null - }) - - const decompressionResults = await Promise.all(decompressionPromises) - results.push(...decompressionResults.filter(Boolean)) - - return results + throw error + } } return null @@ -332,53 +389,63 @@ export class RedisCachingProvider { autoInvalidate?: boolean } }): Promise { - const keyName = this.#getKeyName(key) - const serializedData = JSON.stringify(data) - const effectiveTTL = ttl ?? this.defaultTTL + try { + const keyName = this.#getKeyName(key) + const serializedData = JSON.stringify(data) + const effectiveTTL = ttl ?? this.defaultTTL - const finalData = await this.#compressData(serializedData) + const finalData = await this.#compressData(serializedData) - let tagIds: number[] = [] - if (tags?.length) { - tagIds = await this.#internTags(tags) - } - - const setPipeline = this.redisClient.pipeline() - - // Main data with conditional operations - setPipeline.hsetnx(keyName, "data", finalData) - if (options && Object.keys(options).length) { - setPipeline.hset(keyName, "options", JSON.stringify(options)) - } - if (effectiveTTL) { - setPipeline.expire(keyName, effectiveTTL) - } - - // Store tag IDs if present - if (tags?.length && tagIds.length) { - const tagsKey = this.#getTagsKey(key) - const buffer = Buffer.alloc(tagIds.length * 4) - tagIds.forEach((id, index) => { - buffer.writeUInt32LE(id, index * 4) - }) - - if (effectiveTTL) { - setPipeline.set(tagsKey, buffer, "EX", effectiveTTL + 60, "NX") - } else { - setPipeline.setnx(tagsKey, buffer) + let tagIds: number[] = [] + if (tags?.length) { + tagIds = await this.#internTags(tags) } - // Add tag operations to the same pipeline - tags.forEach((tag) => { - const tagKey = this.#getTagKey(tag) - setPipeline.sadd(tagKey, keyName) - if (effectiveTTL) { - setPipeline.expire(tagKey, effectiveTTL + 60) - } - }) - } + const setPipeline = this.redisClient.pipeline() - await setPipeline.exec() + // Main data with conditional operations + setPipeline.hsetnx(keyName, "data", finalData) + if (options && Object.keys(options).length) { + setPipeline.hset(keyName, "options", JSON.stringify(options)) + } + if (effectiveTTL) { + setPipeline.expire(keyName, effectiveTTL) + } + + // Store tag IDs if present + if (tags?.length && tagIds.length) { + const tagsKey = this.#getTagsKey(key) + const buffer = Buffer.alloc(tagIds.length * 4) + tagIds.forEach((id, index) => { + buffer.writeUInt32LE(id, index * 4) + }) + + if (effectiveTTL) { + setPipeline.set(tagsKey, buffer, "EX", effectiveTTL + 60, "NX") + } else { + setPipeline.setnx(tagsKey, buffer) + } + + // Add tag operations to the same pipeline + tags.forEach((tag) => { + const tagKey = this.#getTagKey(tag) + setPipeline.sadd(tagKey, keyName) + if (effectiveTTL) { + setPipeline.expire(tagKey, effectiveTTL + 60) + } + }) + } + + await setPipeline.exec() + } catch (error) { + if (this.isConnectionError(error)) { + this.logger.warn( + "Redis connection error during set operation, relying on IORedis retry mechanism" + ) + return + } + throw error + } } async clear({ @@ -392,208 +459,96 @@ export class RedisCachingProvider { autoInvalidate?: boolean } }): Promise { - if (key) { - const keyName = this.#getKeyName(key) - const tagsKey = this.#getTagsKey(key) + try { + if (key) { + const keyName = this.#getKeyName(key) + const tagsKey = this.#getTagsKey(key) - const clearPipeline = this.redisClient.pipeline() + const clearPipeline = this.redisClient.pipeline() - // Get tags for cleanup and delete main key in same pipeline - clearPipeline.getBuffer(tagsKey) - clearPipeline.unlink(keyName) + // Get tags for cleanup and delete main key in same pipeline + clearPipeline.getBuffer(tagsKey) + clearPipeline.unlink(keyName) - const results = await clearPipeline.exec() - const tagsBuffer = results?.[0]?.[1] as Buffer + const results = await clearPipeline.exec() + const tagsBuffer = results?.[0]?.[1] as Buffer - if (tagsBuffer?.length) { - try { - // Binary format: array of 32-bit integers - const tagIds: number[] = [] - for (let i = 0; i < tagsBuffer.length; i += 4) { - tagIds.push(tagsBuffer.readUInt32LE(i)) + if (tagsBuffer?.length) { + try { + // Binary format: array of 32-bit integers + const tagIds: number[] = [] + for (let i = 0; i < tagsBuffer.length; i += 4) { + tagIds.push(tagsBuffer.readUInt32LE(i)) + } + + if (tagIds.length) { + const entryTags = await this.#resolveTagIds(tagIds) + + const tagCleanupPipeline = this.redisClient.pipeline() + entryTags.forEach((tag) => { + const tagKey = this.#getTagKey(tag, { isHashed: true }) + tagCleanupPipeline.srem(tagKey, keyName) + }) + tagCleanupPipeline.unlink(tagsKey) + await tagCleanupPipeline.exec() + + // Decrement reference counts and cleanup unused tags + await this.#decrementTagRefs(tagIds) + } + } catch (e) { + // noop - corrupted tag data, skip cleanup } - - if (tagIds.length) { - const entryTags = await this.#resolveTagIds(tagIds) - - const tagCleanupPipeline = this.redisClient.pipeline() - entryTags.forEach((tag) => { - const tagKey = this.#getTagKey(tag, { isHashed: true }) - tagCleanupPipeline.srem(tagKey, keyName) - }) - tagCleanupPipeline.unlink(tagsKey) - await tagCleanupPipeline.exec() - - // Decrement reference counts and cleanup unused tags - await this.#decrementTagRefs(tagIds) - } - } catch (e) { - // noop - corrupted tag data, skip cleanup } - } - return - } - - if (tags?.length) { - // Handle wildcard tag to clear all cache data - if (tags.includes("*")) { - await this.flush() return } - // Get all keys associated with the tags - const pipeline = this.redisClient.pipeline() - tags.forEach((tag) => { - const tagKey = this.#getTagKey(tag) - pipeline.smembers(tagKey) - }) - - const tagResults = await pipeline.exec() - - const allKeys = new Set() - - tagResults?.forEach((result) => { - if (result && result[1]) { - ;(result[1] as string[]).forEach((key) => allKeys.add(key)) - } - }) - - if (allKeys.size) { - // If no options provided (user explicit call), clear everything - if (!options) { - const deletePipeline = this.redisClient.pipeline() - - // Delete main keys and options - Array.from(allKeys).forEach((key) => { - deletePipeline.unlink(key) - }) - - // Clean up tag references for each key - const tagDataPromises = Array.from(allKeys).map(async (key) => { - const keyWithoutPrefix = key.replace(this.keyNamePrefix, "") - const tagsKey = this.#getTagsKey(keyWithoutPrefix) - const tagsData = await this.redisClient.getBuffer(tagsKey) - return { key, tagsKey, tagsData } - }) - - const tagResults = await Promise.all(tagDataPromises) - - // Build single pipeline for all tag cleanup operations - const tagCleanupPipeline = this.redisClient.pipeline() - const cleanupPromises = tagResults.map( - async ({ key, tagsKey, tagsData }) => { - if (tagsData) { - try { - // Binary format: array of 32-bit integers - const tagIds: number[] = [] - for (let i = 0; i < tagsData.length; i += 4) { - tagIds.push(tagsData.readUInt32LE(i)) - } - - if (tagIds.length) { - const entryTags = await this.#resolveTagIds(tagIds) - entryTags.forEach((tag) => { - const tagKey = this.#getTagKey(tag, { isHashed: true }) - tagCleanupPipeline.srem(tagKey, key) - }) - tagCleanupPipeline.unlink(tagsKey) - - // Decrement reference counts and cleanup unused tags - await this.#decrementTagRefs(tagIds) - } - } catch (e) { - // noop - } - } - } - ) - - await Promise.all(cleanupPromises) - await tagCleanupPipeline.exec() - await deletePipeline.exec() - - // Clean up empty tag sets - const allTagKeys = await this.redisClient.keys( - `${this.keyNamePrefix}tag:*` - ) - if (allTagKeys.length) { - const cardinalityPipeline = this.redisClient.pipeline() - allTagKeys.forEach((tagKey) => { - cardinalityPipeline.scard(tagKey) - }) - - const cardinalityResults = await cardinalityPipeline.exec() - - // Delete empty tag keys - const emptyTagPipeline = this.redisClient.pipeline() - cardinalityResults?.forEach((result, index) => { - if (result && result[1] === 0) { - emptyTagPipeline.unlink(allTagKeys[index]) - } - }) - - await emptyTagPipeline.exec() - } - + if (tags?.length) { + // Handle wildcard tag to clear all cache data + if (tags.includes("*")) { + await this.flush() return } - // If autoInvalidate is true (strategy call), only clear entries with autoInvalidate=true (default) - if (options.autoInvalidate === true) { - const optionsPipeline = this.redisClient.pipeline() + // Get all keys associated with the tags + const pipeline = this.redisClient.pipeline() + tags.forEach((tag) => { + const tagKey = this.#getTagKey(tag) + pipeline.smembers(tagKey) + }) - Array.from(allKeys).forEach((key) => { - optionsPipeline.hget(key, "options") - }) + const tagResults = await pipeline.exec() - const optionsResults = await optionsPipeline.exec() - const keysToDelete: string[] = [] + const allKeys = new Set() - Array.from(allKeys).forEach((key, index) => { - const optionsResult = optionsResults?.[index] + tagResults?.forEach((result) => { + if (result && result[1]) { + ;(result[1] as string[]).forEach((key) => allKeys.add(key)) + } + }) - if (optionsResult && optionsResult[1]) { - try { - const entryOptions = JSON.parse(optionsResult[1] as string) - - // Delete if entry has autoInvalidate=true or no setting (default true) - const shouldAutoInvalidate = entryOptions.autoInvalidate ?? true - - if (shouldAutoInvalidate) { - keysToDelete.push(key) - } - } catch (e) { - // If can't parse options, assume it's safe to delete (default true) - keysToDelete.push(key) - } - } else { - // No options stored, default to true - keysToDelete.push(key) - } - }) - - if (keysToDelete.length) { + if (allKeys.size) { + // If no options provided (user explicit call), clear everything + if (!options) { const deletePipeline = this.redisClient.pipeline() - keysToDelete.forEach((key) => { + // Delete main keys and options + Array.from(allKeys).forEach((key) => { deletePipeline.unlink(key) }) - // Clean up tag references for each key to delete - const tagDataPromises = keysToDelete.map(async (key) => { + // Clean up tag references for each key + const tagDataPromises = Array.from(allKeys).map(async (key) => { const keyWithoutPrefix = key.replace(this.keyNamePrefix, "") const tagsKey = this.#getTagsKey(keyWithoutPrefix) const tagsData = await this.redisClient.getBuffer(tagsKey) return { key, tagsKey, tagsData } }) - // Wait for all tag data fetches const tagResults = await Promise.all(tagDataPromises) // Build single pipeline for all tag cleanup operations const tagCleanupPipeline = this.redisClient.pipeline() - const cleanupPromises = tagResults.map( async ({ key, tagsKey, tagsData }) => { if (tagsData) { @@ -610,7 +565,7 @@ export class RedisCachingProvider { const tagKey = this.#getTagKey(tag, { isHashed: true }) tagCleanupPipeline.srem(tagKey, key) }) - tagCleanupPipeline.unlink(tagsKey) // Delete the tags key + tagCleanupPipeline.unlink(tagsKey) // Decrement reference counts and cleanup unused tags await this.#decrementTagRefs(tagIds) @@ -624,7 +579,6 @@ export class RedisCachingProvider { await Promise.all(cleanupPromises) await tagCleanupPipeline.exec() - await deletePipeline.exec() // Clean up empty tag sets @@ -632,52 +586,188 @@ export class RedisCachingProvider { `${this.keyNamePrefix}tag:*` ) if (allTagKeys.length) { - const cleanupPipeline = this.redisClient.pipeline() - + const cardinalityPipeline = this.redisClient.pipeline() allTagKeys.forEach((tagKey) => { - cleanupPipeline.scard(tagKey) + cardinalityPipeline.scard(tagKey) }) - const cardinalityResults = await cleanupPipeline.exec() + const cardinalityResults = await cardinalityPipeline.exec() - // Delete tag keys that are now empty - const emptyTagDeletePipeline = this.redisClient.pipeline() + // Delete empty tag keys + const emptyTagPipeline = this.redisClient.pipeline() cardinalityResults?.forEach((result, index) => { if (result && result[1] === 0) { - emptyTagDeletePipeline.unlink(allTagKeys[index]) + emptyTagPipeline.unlink(allTagKeys[index]) } }) - await emptyTagDeletePipeline.exec() + await emptyTagPipeline.exec() } return } + + // If autoInvalidate is true (strategy call), only clear entries with autoInvalidate=true (default) + if (options.autoInvalidate === true) { + const optionsPipeline = this.redisClient.pipeline() + + Array.from(allKeys).forEach((key) => { + optionsPipeline.hget(key, "options") + }) + + const optionsResults = await optionsPipeline.exec() + const keysToDelete: string[] = [] + + Array.from(allKeys).forEach((key, index) => { + const optionsResult = optionsResults?.[index] + + if (optionsResult && optionsResult[1]) { + try { + const entryOptions = JSON.parse(optionsResult[1] as string) + + // Delete if entry has autoInvalidate=true or no setting (default true) + const shouldAutoInvalidate = + entryOptions.autoInvalidate ?? true + + if (shouldAutoInvalidate) { + keysToDelete.push(key) + } + } catch (e) { + // If can't parse options, assume it's safe to delete (default true) + keysToDelete.push(key) + } + } else { + // No options stored, default to true + keysToDelete.push(key) + } + }) + + if (keysToDelete.length) { + const deletePipeline = this.redisClient.pipeline() + + keysToDelete.forEach((key) => { + deletePipeline.unlink(key) + }) + + // Clean up tag references for each key to delete + const tagDataPromises = keysToDelete.map(async (key) => { + const keyWithoutPrefix = key.replace(this.keyNamePrefix, "") + const tagsKey = this.#getTagsKey(keyWithoutPrefix) + const tagsData = await this.redisClient.getBuffer(tagsKey) + return { key, tagsKey, tagsData } + }) + + // Wait for all tag data fetches + const tagResults = await Promise.all(tagDataPromises) + + // Build single pipeline for all tag cleanup operations + const tagCleanupPipeline = this.redisClient.pipeline() + + const cleanupPromises = tagResults.map( + async ({ key, tagsKey, tagsData }) => { + if (tagsData) { + try { + // Binary format: array of 32-bit integers + const tagIds: number[] = [] + for (let i = 0; i < tagsData.length; i += 4) { + tagIds.push(tagsData.readUInt32LE(i)) + } + + if (tagIds.length) { + const entryTags = await this.#resolveTagIds(tagIds) + entryTags.forEach((tag) => { + const tagKey = this.#getTagKey(tag, { + isHashed: true, + }) + tagCleanupPipeline.srem(tagKey, key) + }) + tagCleanupPipeline.unlink(tagsKey) // Delete the tags key + + // Decrement reference counts and cleanup unused tags + await this.#decrementTagRefs(tagIds) + } + } catch (e) { + // noop + } + } + } + ) + + await Promise.all(cleanupPromises) + await tagCleanupPipeline.exec() + + await deletePipeline.exec() + + // Clean up empty tag sets + const allTagKeys = await this.redisClient.keys( + `${this.keyNamePrefix}tag:*` + ) + if (allTagKeys.length) { + const cleanupPipeline = this.redisClient.pipeline() + + allTagKeys.forEach((tagKey) => { + cleanupPipeline.scard(tagKey) + }) + + const cardinalityResults = await cleanupPipeline.exec() + + // Delete tag keys that are now empty + const emptyTagDeletePipeline = this.redisClient.pipeline() + cardinalityResults?.forEach((result, index) => { + if (result && result[1] === 0) { + emptyTagDeletePipeline.unlink(allTagKeys[index]) + } + }) + + await emptyTagDeletePipeline.exec() + } + + return + } + } } } + } catch (error) { + if (this.isConnectionError(error)) { + this.logger.warn( + "Redis connection error during clear operation, relying on IORedis retry mechanism" + ) + return + } + throw error } } async flush(): Promise { - // Use SCAN to find ALL keys with our prefix and delete them - // This includes main cache keys, tag keys (tag:*), and tags keys (tags:*) - const pattern = `${this.keyNamePrefix}*` - let cursor = "0" + try { + // Use SCAN to find ALL keys with our prefix and delete them + // This includes main cache keys, tag keys (tag:*), and tags keys (tags:*) + const pattern = `${this.keyNamePrefix}*` + let cursor = "0" - do { - const result = await this.redisClient.scan( - cursor, - "MATCH", - pattern, - "COUNT", - 1000 - ) - cursor = result[0] - const keys = result[1] + do { + const result = await this.redisClient.scan( + cursor, + "MATCH", + pattern, + "COUNT", + 1000 + ) + cursor = result[0] + const keys = result[1] - if (keys.length) { - await this.redisClient.unlink(...keys) + if (keys.length) { + await this.redisClient.unlink(...keys) + } + } while (cursor !== "0") + } catch (error) { + if (this.isConnectionError(error)) { + this.logger.warn( + "Redis connection error during flush operation, relying on IORedis retry mechanism" + ) + return } - } while (cursor !== "0") + throw error + } } } diff --git a/packages/modules/providers/caching-redis/src/types/index.ts b/packages/modules/providers/caching-redis/src/types/index.ts index c22fcf493a..c74635cb0c 100644 --- a/packages/modules/providers/caching-redis/src/types/index.ts +++ b/packages/modules/providers/caching-redis/src/types/index.ts @@ -1,20 +1,13 @@ -export interface RedisCacheModuleOptions { +import { RedisOptions } from "ioredis" +export interface RedisCacheModuleOptions extends RedisOptions { + /** + * Redis connection string + */ + redisUrl?: string /** * TTL in milliseconds */ ttl?: number - /** - * Connection timeout in milliseconds - */ - connectTimeout?: number - /** - * Lazyload connections - */ - lazyConnect?: boolean - /** - * Connection retries - */ - retryDelayOnFailover?: number /** * Key prefix for all cache keys */ diff --git a/packages/modules/workflow-engine-redis/src/loaders/redis.ts b/packages/modules/workflow-engine-redis/src/loaders/redis.ts index 43a841b252..0a6b38ddd6 100644 --- a/packages/modules/workflow-engine-redis/src/loaders/redis.ts +++ b/packages/modules/workflow-engine-redis/src/loaders/redis.ts @@ -42,11 +42,11 @@ export default async ( maxRetriesPerRequest: null, }) logger?.info( - `Connection to Redis in module 'workflow-engine-redis' established` + `[Workflow-engine-redis] Connection to Redis in module 'workflow-engine-redis' established` ) } catch (err) { logger?.error( - `An error occurred while connecting to Redis in module 'workflow-engine-redis': ${err}` + `[Workflow-engine-redis] An error occurred while connecting to Redis in module 'workflow-engine-redis': ${err}` ) } @@ -54,11 +54,11 @@ export default async ( redisPublisher = await getConnection(cnnPubSub.url, cnnPubSub.options) redisSubscriber = await getConnection(cnnPubSub.url, cnnPubSub.options) logger?.info( - `Connection to Redis PubSub in module 'workflow-engine-redis' established` + `[Workflow-engine-redis] Connection to Redis PubSub in module 'workflow-engine-redis' established` ) } catch (err) { logger?.error( - `An error occurred while connecting to Redis PubSub in module 'workflow-engine-redis': ${err}` + `[Workflow-engine-redis] An error occurred while connecting to Redis PubSub in module 'workflow-engine-redis': ${err}` ) } diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 950bb48c97..19ca2df899 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -128,6 +128,7 @@ export class RedisDistributedTransactionStorage } async onApplicationStart() { + await this.ensureRedisConnection() const allowedJobs = [ JobType.RETRY, JobType.STEP_TIMEOUT, @@ -212,6 +213,64 @@ export class RedisDistributedTransactionStorage this.workflowOrchestratorService_ = workflowOrchestratorService } + private async ensureRedisConnection(): Promise { + const reconnectTasks: Promise[] = [] + + if (this.redisClient.status !== "ready") { + this.logger_.warn( + `[Workflow-engine-redis] Redis connection is not ready (status: ${this.redisClient.status}). Attempting to reconnect...` + ) + reconnectTasks.push( + this.redisClient + .connect() + .then(() => { + this.logger_.info( + "[Workflow-engine-redis] Redis connection reestablished successfully" + ) + }) + .catch((error) => { + this.logger_.error( + "[Workflow-engine-redis] Failed to reconnect to Redis", + error + ) + throw new MedusaError( + MedusaError.Types.DB_ERROR, + `Redis connection failed: ${error.message}` + ) + }) + ) + } + + if (this.redisWorkerConnection.status !== "ready") { + this.logger_.warn( + `[Workflow-engine-redis] Redis worker connection is not ready (status: ${this.redisWorkerConnection.status}). Attempting to reconnect...` + ) + reconnectTasks.push( + this.redisWorkerConnection + .connect() + .then(() => { + this.logger_.info( + "[Workflow-engine-redis] Redis worker connection reestablished successfully" + ) + }) + .catch((error) => { + this.logger_.error( + "[Workflow-engine-redis] Failed to reconnect to Redis worker connection", + error + ) + throw new MedusaError( + MedusaError.Types.DB_ERROR, + `Redis worker connection failed: ${error.message}` + ) + }) + ) + } + + if (reconnectTasks.length > 0) { + await promiseAll(reconnectTasks) + } + } + private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) { const isNotStarted = data.flow.state === TransactionState.NOT_STARTED const isFinished = [