chore(): Default caching configuration and gracefull redis error handling (#13663)

* chore(): Default caching configuration and gracefull redis error handling

* Create odd-moons-crash.md

* chore(): Default caching configuration and gracefull redis error handling

* fixes

* address feedback

* revert(): Test utils imit module fix

* reconnect

* reconnect

* reconnect
This commit is contained in:
Adrien de Peretti
2025-10-06 17:57:11 +02:00
committed by GitHub
parent 28d57b7bf8
commit 51859c38a7
9 changed files with 565 additions and 328 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/caching-redis": patch
"@medusajs/utils": patch
---
chore(): Default caching configuration and gracefull redis error handling

View File

@@ -1048,6 +1048,7 @@ describe("defineConfig", function () {
process.env.EXECUTION_CONTEXT = "medusa-cloud"
process.env.REDIS_URL = "redis://localhost:6379"
process.env.CACHE_REDIS_URL = "redis://localhost:6379"
process.env.S3_FILE_URL = "https://s3.amazonaws.com/medusa-cloud-test"
process.env.S3_PREFIX = "test"
process.env.S3_REGION = "us-east-1"
@@ -1086,6 +1087,21 @@ describe("defineConfig", function () {
},
"resolve": "@medusajs/medusa/cache-redis",
},
"caching": {
"options": {
"providers": [
{
"id": "caching-redis",
"is_default": true,
"options": {
"redisUrl": "redis://localhost:6379",
},
"resolve": "@medusajs/medusa/caching-redis",
},
],
},
"resolve": "@medusajs/caching",
},
"cart": {
"resolve": "@medusajs/medusa/cart",
},
@@ -1255,6 +1271,7 @@ describe("defineConfig", function () {
process.env.EXECUTION_CONTEXT = "medusa-cloud"
process.env.REDIS_URL = "redis://localhost:6379"
process.env.CACHE_REDIS_URL = "redis://localhost:6379"
process.env.S3_FILE_URL = "https://s3.amazonaws.com/medusa-cloud-test"
process.env.S3_PREFIX = "test"
process.env.S3_REGION = "us-east-1"
@@ -1294,6 +1311,21 @@ describe("defineConfig", function () {
},
"resolve": "@medusajs/medusa/cache-redis",
},
"caching": {
"options": {
"providers": [
{
"id": "caching-redis",
"is_default": true,
"options": {
"redisUrl": "redis://localhost:6379",
},
"resolve": "@medusajs/medusa/caching-redis",
},
],
},
"resolve": "@medusajs/caching",
},
"cart": {
"resolve": "@medusajs/medusa/cart",
},
@@ -1473,6 +1505,7 @@ describe("defineConfig", function () {
process.env.EXECUTION_CONTEXT = "medusa-cloud"
process.env.REDIS_URL = "redis://localhost:6379"
process.env.CACHE_REDIS_URL = "redis://localhost:6379"
process.env.S3_FILE_URL = "https://s3.amazonaws.com/medusa-cloud-test"
process.env.S3_PREFIX = "test"
process.env.S3_REGION = "us-east-1"
@@ -1518,6 +1551,21 @@ describe("defineConfig", function () {
},
"resolve": "@medusajs/medusa/cache-redis",
},
"caching": {
"options": {
"providers": [
{
"id": "caching-redis",
"is_default": true,
"options": {
"redisUrl": "redis://localhost:6379",
},
"resolve": "@medusajs/medusa/caching-redis",
},
],
},
"resolve": "@medusajs/caching",
},
"cart": {
"resolve": "@medusajs/medusa/cart",
},

View File

@@ -303,6 +303,24 @@ function resolveModules(
},
]
if (process.env.CACHE_REDIS_URL) {
cloudModules.push({
resolve: MODULE_PACKAGE_NAMES[Modules.CACHING],
options: {
providers: [
{
id: "caching-redis",
resolve: "@medusajs/medusa/caching-redis",
is_default: true,
options: {
redisUrl: process.env.CACHE_REDIS_URL,
},
},
],
},
})
}
/**
* The default set of modules to always use. The end user can swap
* the modules by providing an alternate implementation via their

View File

@@ -64,12 +64,12 @@ export async function initModules({
async function shutdown() {
const promises: Promise<void>[] = []
promises.push(medusaApp.onApplicationPrepareShutdown())
promises.push(medusaApp.onApplicationShutdown())
if (shouldDestroyConnectionAutomatically) {
promises.push((sharedPgConnection as any).context?.destroy())
promises.push((sharedPgConnection as any).destroy())
promises.push(medusaApp.onApplicationPrepareShutdown())
promises.push(medusaApp.onApplicationShutdown())
} else {
if (!preventConnectionDestroyWarning) {
logger.info(

View File

@@ -4,7 +4,7 @@ import type {
ModulesSdkTypes,
} from "@medusajs/framework/types"
import { RedisCacheModuleOptions } from "@types"
import Redis from "ioredis"
import Redis, { type RedisOptions } from "ioredis"
export default async (
{
@@ -23,36 +23,56 @@ export default async (
const moduleOptions = (options ??
moduleDeclaration?.options ??
{}) as RedisCacheModuleOptions & {
redisUrl?: string
}
{}) as RedisCacheModuleOptions
if (!moduleOptions.redisUrl) {
const { redisUrl, ...redisOptions_ } = moduleOptions
if (!redisUrl) {
throw new Error("[caching-redis] redisUrl is required")
}
let redisClient: Redis
try {
redisClient = new Redis(moduleOptions.redisUrl!, {
connectTimeout: 10000,
lazyConnect: true,
retryDelayOnFailover: 100,
connectionName: "medusa-cache-redis",
...moduleOptions,
})
const redisOptions: RedisOptions = {
connectTimeout: 10000,
commandTimeout: 5000,
lazyConnect: true,
maxRetriesPerRequest: 3,
enableOfflineQueue: true,
connectionName: "medusa-cache-redis",
...redisOptions_,
}
// Test connection
await redisClient.ping()
redisClient = new Redis(redisUrl!, redisOptions)
// Handle connection errors gracefully
redisClient.on("error", (error) => {
logger_.warn(`Redis cache connection error: ${error.message}`)
})
redisClient.on("connect", () => {
logger_.info("Redis cache connection established successfully")
})
redisClient.on("ready", () => {
logger_.info("Redis cache is ready to accept commands")
})
redisClient.on("close", () => {
logger_.warn("Redis cache connection closed")
})
redisClient.on("reconnecting", () => {
logger_.info("Redis cache attempting to reconnect...")
})
try {
// Test connection with timeout
await redisClient.ping()
logger_.info("Redis cache connection test successful")
} catch (error) {
logger_.error(`Failed to connect to Redis cache: ${error.message}`)
redisClient = new Redis(moduleOptions.redisUrl!, {
connectTimeout: 10000,
lazyConnect: true,
retryDelayOnFailover: 100,
...moduleOptions,
})
logger_.warn(
`Redis cache connection test failed: ${error.message}, but continuing with lazy connection`
)
}
container.register({
@@ -62,5 +82,8 @@ export default async (
prefix: {
resolve: () => moduleOptions.prefix ?? "mc:",
},
logger: {
resolve: () => logger_,
},
})
}

View File

@@ -1,3 +1,4 @@
import { Logger } from "@medusajs/framework/types"
import { RedisCacheModuleOptions } from "@types"
import { Redis } from "ioredis"
import { createGunzip, createGzip } from "zlib"
@@ -10,13 +11,20 @@ export class RedisCachingProvider {
protected defaultTTL: number
protected compressionThreshold: number
protected hasher: (key: string) => string
protected logger: Logger
constructor(
{
redisClient,
logger,
prefix,
hasher,
}: { redisClient: Redis; prefix: string; hasher: (key: string) => string },
}: {
redisClient: Redis
prefix: string
hasher: (key: string) => string
logger: Logger
},
options?: RedisCacheModuleOptions
) {
this.redisClient = redisClient
@@ -24,6 +32,29 @@ export class RedisCachingProvider {
this.defaultTTL = options?.ttl ?? 3600 // 1 hour default
this.compressionThreshold = options?.compressionThreshold ?? 2048 // 2KB default
this.hasher = hasher
this.logger = logger
}
private isConnectionError(error: any): boolean {
return (
error.code === "ECONNREFUSED" ||
error.code === "ENOTFOUND" ||
error.code === "ETIMEDOUT" ||
error.code === "ECONNRESET" ||
error.code === "EPIPE" ||
error.message?.includes("Connection is closed") ||
error.message?.includes("connect ECONNREFUSED") ||
error.message?.includes("connect ETIMEDOUT") ||
error.message?.includes("Command timed out") ||
error.message?.includes("Maximum number of retries exceeded") ||
["connecting", "reconnecting", "disconnecting", "wait", "end"].includes(
this.redisClient.status
)
)
}
private isConnectionHealthy(): boolean {
return this.redisClient.status === "ready"
}
#getKeyName(key: string): string {
@@ -252,66 +283,92 @@ export class RedisCachingProvider {
}
async get({ key, tags }: { key?: string; tags?: string[] }): Promise<any> {
if (key) {
const keyName = this.#getKeyName(key)
const buffer = await this.redisClient.hgetBuffer(keyName, "data")
if (!buffer) {
return null
}
if (!this.isConnectionHealthy()) {
return null
}
const finalData = await this.#decompressData(buffer)
return JSON.parse(finalData)
if (key) {
try {
const keyName = this.#getKeyName(key)
const buffer = await this.redisClient.hgetBuffer(keyName, "data")
if (!buffer) {
return null
}
const finalData = await this.#decompressData(buffer)
return JSON.parse(finalData)
} catch (error) {
if (this.isConnectionError(error)) {
this.logger.warn(
"Redis connection error during get operation, returning null to trigger fallback to original data source"
)
return null
}
throw error
}
}
if (tags?.length) {
// Get all keys associated with the tags
const pipeline = this.redisClient.pipeline()
tags.forEach((tag) => {
const tagKey = this.#getTagKey(tag)
pipeline.smembers(tagKey)
})
try {
// Get all keys associated with the tags
const pipeline = this.redisClient.pipeline()
tags.forEach((tag) => {
const tagKey = this.#getTagKey(tag)
pipeline.smembers(tagKey)
})
const tagResults = await pipeline.exec()
const allKeys = new Set<string>()
const tagResults = await pipeline.exec()
const allKeys = new Set<string>()
tagResults?.forEach((result, index) => {
if (result && result[1]) {
;(result[1] as string[]).forEach((key) => allKeys.add(key))
tagResults?.forEach((result, index) => {
if (result && result[1]) {
;(result[1] as string[]).forEach((key) => allKeys.add(key))
}
})
if (allKeys.size === 0) {
return []
}
})
if (allKeys.size === 0) {
return []
}
// Get all hash data for the keys
const valuePipeline = this.redisClient.pipeline()
Array.from(allKeys).forEach((key) => {
valuePipeline.hgetBuffer(key, "data")
})
// Get all hash data for the keys
const valuePipeline = this.redisClient.pipeline()
Array.from(allKeys).forEach((key) => {
valuePipeline.hgetBuffer(key, "data")
})
const valueResults = await valuePipeline.exec()
const results: any[] = []
const valueResults = await valuePipeline.exec()
const results: any[] = []
const decompressionPromises = (valueResults || []).map(async (result) => {
if (result && result[1]) {
const buffer = result[1] as Buffer
try {
const finalData = await this.#decompressData(buffer)
return JSON.parse(finalData)
} catch (e) {
// If JSON parsing fails, skip this entry (corrupted data)
console.warn(`Skipping corrupted cache entry: ${e.message}`)
const decompressionPromises = (valueResults || []).map(
async (result) => {
if (result && result[1]) {
const buffer = result[1] as Buffer
try {
const finalData = await this.#decompressData(buffer)
return JSON.parse(finalData)
} catch (e) {
// If JSON parsing fails, skip this entry (corrupted data)
this.logger.warn(`Skipping corrupted cache entry: ${e.message}`)
return null
}
}
return null
}
)
const decompressionResults = await Promise.all(decompressionPromises)
results.push(...decompressionResults.filter(Boolean))
return results
} catch (error) {
if (this.isConnectionError(error)) {
this.logger.warn(
"Redis connection error during get operation, returning empty array to trigger fallback to original data source"
)
return null
}
return null
})
const decompressionResults = await Promise.all(decompressionPromises)
results.push(...decompressionResults.filter(Boolean))
return results
throw error
}
}
return null
@@ -332,53 +389,63 @@ export class RedisCachingProvider {
autoInvalidate?: boolean
}
}): Promise<void> {
const keyName = this.#getKeyName(key)
const serializedData = JSON.stringify(data)
const effectiveTTL = ttl ?? this.defaultTTL
try {
const keyName = this.#getKeyName(key)
const serializedData = JSON.stringify(data)
const effectiveTTL = ttl ?? this.defaultTTL
const finalData = await this.#compressData(serializedData)
const finalData = await this.#compressData(serializedData)
let tagIds: number[] = []
if (tags?.length) {
tagIds = await this.#internTags(tags)
}
const setPipeline = this.redisClient.pipeline()
// Main data with conditional operations
setPipeline.hsetnx(keyName, "data", finalData)
if (options && Object.keys(options).length) {
setPipeline.hset(keyName, "options", JSON.stringify(options))
}
if (effectiveTTL) {
setPipeline.expire(keyName, effectiveTTL)
}
// Store tag IDs if present
if (tags?.length && tagIds.length) {
const tagsKey = this.#getTagsKey(key)
const buffer = Buffer.alloc(tagIds.length * 4)
tagIds.forEach((id, index) => {
buffer.writeUInt32LE(id, index * 4)
})
if (effectiveTTL) {
setPipeline.set(tagsKey, buffer, "EX", effectiveTTL + 60, "NX")
} else {
setPipeline.setnx(tagsKey, buffer)
let tagIds: number[] = []
if (tags?.length) {
tagIds = await this.#internTags(tags)
}
// Add tag operations to the same pipeline
tags.forEach((tag) => {
const tagKey = this.#getTagKey(tag)
setPipeline.sadd(tagKey, keyName)
if (effectiveTTL) {
setPipeline.expire(tagKey, effectiveTTL + 60)
}
})
}
const setPipeline = this.redisClient.pipeline()
await setPipeline.exec()
// Main data with conditional operations
setPipeline.hsetnx(keyName, "data", finalData)
if (options && Object.keys(options).length) {
setPipeline.hset(keyName, "options", JSON.stringify(options))
}
if (effectiveTTL) {
setPipeline.expire(keyName, effectiveTTL)
}
// Store tag IDs if present
if (tags?.length && tagIds.length) {
const tagsKey = this.#getTagsKey(key)
const buffer = Buffer.alloc(tagIds.length * 4)
tagIds.forEach((id, index) => {
buffer.writeUInt32LE(id, index * 4)
})
if (effectiveTTL) {
setPipeline.set(tagsKey, buffer, "EX", effectiveTTL + 60, "NX")
} else {
setPipeline.setnx(tagsKey, buffer)
}
// Add tag operations to the same pipeline
tags.forEach((tag) => {
const tagKey = this.#getTagKey(tag)
setPipeline.sadd(tagKey, keyName)
if (effectiveTTL) {
setPipeline.expire(tagKey, effectiveTTL + 60)
}
})
}
await setPipeline.exec()
} catch (error) {
if (this.isConnectionError(error)) {
this.logger.warn(
"Redis connection error during set operation, relying on IORedis retry mechanism"
)
return
}
throw error
}
}
async clear({
@@ -392,208 +459,96 @@ export class RedisCachingProvider {
autoInvalidate?: boolean
}
}): Promise<void> {
if (key) {
const keyName = this.#getKeyName(key)
const tagsKey = this.#getTagsKey(key)
try {
if (key) {
const keyName = this.#getKeyName(key)
const tagsKey = this.#getTagsKey(key)
const clearPipeline = this.redisClient.pipeline()
const clearPipeline = this.redisClient.pipeline()
// Get tags for cleanup and delete main key in same pipeline
clearPipeline.getBuffer(tagsKey)
clearPipeline.unlink(keyName)
// Get tags for cleanup and delete main key in same pipeline
clearPipeline.getBuffer(tagsKey)
clearPipeline.unlink(keyName)
const results = await clearPipeline.exec()
const tagsBuffer = results?.[0]?.[1] as Buffer
const results = await clearPipeline.exec()
const tagsBuffer = results?.[0]?.[1] as Buffer
if (tagsBuffer?.length) {
try {
// Binary format: array of 32-bit integers
const tagIds: number[] = []
for (let i = 0; i < tagsBuffer.length; i += 4) {
tagIds.push(tagsBuffer.readUInt32LE(i))
if (tagsBuffer?.length) {
try {
// Binary format: array of 32-bit integers
const tagIds: number[] = []
for (let i = 0; i < tagsBuffer.length; i += 4) {
tagIds.push(tagsBuffer.readUInt32LE(i))
}
if (tagIds.length) {
const entryTags = await this.#resolveTagIds(tagIds)
const tagCleanupPipeline = this.redisClient.pipeline()
entryTags.forEach((tag) => {
const tagKey = this.#getTagKey(tag, { isHashed: true })
tagCleanupPipeline.srem(tagKey, keyName)
})
tagCleanupPipeline.unlink(tagsKey)
await tagCleanupPipeline.exec()
// Decrement reference counts and cleanup unused tags
await this.#decrementTagRefs(tagIds)
}
} catch (e) {
// noop - corrupted tag data, skip cleanup
}
if (tagIds.length) {
const entryTags = await this.#resolveTagIds(tagIds)
const tagCleanupPipeline = this.redisClient.pipeline()
entryTags.forEach((tag) => {
const tagKey = this.#getTagKey(tag, { isHashed: true })
tagCleanupPipeline.srem(tagKey, keyName)
})
tagCleanupPipeline.unlink(tagsKey)
await tagCleanupPipeline.exec()
// Decrement reference counts and cleanup unused tags
await this.#decrementTagRefs(tagIds)
}
} catch (e) {
// noop - corrupted tag data, skip cleanup
}
}
return
}
if (tags?.length) {
// Handle wildcard tag to clear all cache data
if (tags.includes("*")) {
await this.flush()
return
}
// Get all keys associated with the tags
const pipeline = this.redisClient.pipeline()
tags.forEach((tag) => {
const tagKey = this.#getTagKey(tag)
pipeline.smembers(tagKey)
})
const tagResults = await pipeline.exec()
const allKeys = new Set<string>()
tagResults?.forEach((result) => {
if (result && result[1]) {
;(result[1] as string[]).forEach((key) => allKeys.add(key))
}
})
if (allKeys.size) {
// If no options provided (user explicit call), clear everything
if (!options) {
const deletePipeline = this.redisClient.pipeline()
// Delete main keys and options
Array.from(allKeys).forEach((key) => {
deletePipeline.unlink(key)
})
// Clean up tag references for each key
const tagDataPromises = Array.from(allKeys).map(async (key) => {
const keyWithoutPrefix = key.replace(this.keyNamePrefix, "")
const tagsKey = this.#getTagsKey(keyWithoutPrefix)
const tagsData = await this.redisClient.getBuffer(tagsKey)
return { key, tagsKey, tagsData }
})
const tagResults = await Promise.all(tagDataPromises)
// Build single pipeline for all tag cleanup operations
const tagCleanupPipeline = this.redisClient.pipeline()
const cleanupPromises = tagResults.map(
async ({ key, tagsKey, tagsData }) => {
if (tagsData) {
try {
// Binary format: array of 32-bit integers
const tagIds: number[] = []
for (let i = 0; i < tagsData.length; i += 4) {
tagIds.push(tagsData.readUInt32LE(i))
}
if (tagIds.length) {
const entryTags = await this.#resolveTagIds(tagIds)
entryTags.forEach((tag) => {
const tagKey = this.#getTagKey(tag, { isHashed: true })
tagCleanupPipeline.srem(tagKey, key)
})
tagCleanupPipeline.unlink(tagsKey)
// Decrement reference counts and cleanup unused tags
await this.#decrementTagRefs(tagIds)
}
} catch (e) {
// noop
}
}
}
)
await Promise.all(cleanupPromises)
await tagCleanupPipeline.exec()
await deletePipeline.exec()
// Clean up empty tag sets
const allTagKeys = await this.redisClient.keys(
`${this.keyNamePrefix}tag:*`
)
if (allTagKeys.length) {
const cardinalityPipeline = this.redisClient.pipeline()
allTagKeys.forEach((tagKey) => {
cardinalityPipeline.scard(tagKey)
})
const cardinalityResults = await cardinalityPipeline.exec()
// Delete empty tag keys
const emptyTagPipeline = this.redisClient.pipeline()
cardinalityResults?.forEach((result, index) => {
if (result && result[1] === 0) {
emptyTagPipeline.unlink(allTagKeys[index])
}
})
await emptyTagPipeline.exec()
}
if (tags?.length) {
// Handle wildcard tag to clear all cache data
if (tags.includes("*")) {
await this.flush()
return
}
// If autoInvalidate is true (strategy call), only clear entries with autoInvalidate=true (default)
if (options.autoInvalidate === true) {
const optionsPipeline = this.redisClient.pipeline()
// Get all keys associated with the tags
const pipeline = this.redisClient.pipeline()
tags.forEach((tag) => {
const tagKey = this.#getTagKey(tag)
pipeline.smembers(tagKey)
})
Array.from(allKeys).forEach((key) => {
optionsPipeline.hget(key, "options")
})
const tagResults = await pipeline.exec()
const optionsResults = await optionsPipeline.exec()
const keysToDelete: string[] = []
const allKeys = new Set<string>()
Array.from(allKeys).forEach((key, index) => {
const optionsResult = optionsResults?.[index]
tagResults?.forEach((result) => {
if (result && result[1]) {
;(result[1] as string[]).forEach((key) => allKeys.add(key))
}
})
if (optionsResult && optionsResult[1]) {
try {
const entryOptions = JSON.parse(optionsResult[1] as string)
// Delete if entry has autoInvalidate=true or no setting (default true)
const shouldAutoInvalidate = entryOptions.autoInvalidate ?? true
if (shouldAutoInvalidate) {
keysToDelete.push(key)
}
} catch (e) {
// If can't parse options, assume it's safe to delete (default true)
keysToDelete.push(key)
}
} else {
// No options stored, default to true
keysToDelete.push(key)
}
})
if (keysToDelete.length) {
if (allKeys.size) {
// If no options provided (user explicit call), clear everything
if (!options) {
const deletePipeline = this.redisClient.pipeline()
keysToDelete.forEach((key) => {
// Delete main keys and options
Array.from(allKeys).forEach((key) => {
deletePipeline.unlink(key)
})
// Clean up tag references for each key to delete
const tagDataPromises = keysToDelete.map(async (key) => {
// Clean up tag references for each key
const tagDataPromises = Array.from(allKeys).map(async (key) => {
const keyWithoutPrefix = key.replace(this.keyNamePrefix, "")
const tagsKey = this.#getTagsKey(keyWithoutPrefix)
const tagsData = await this.redisClient.getBuffer(tagsKey)
return { key, tagsKey, tagsData }
})
// Wait for all tag data fetches
const tagResults = await Promise.all(tagDataPromises)
// Build single pipeline for all tag cleanup operations
const tagCleanupPipeline = this.redisClient.pipeline()
const cleanupPromises = tagResults.map(
async ({ key, tagsKey, tagsData }) => {
if (tagsData) {
@@ -610,7 +565,7 @@ export class RedisCachingProvider {
const tagKey = this.#getTagKey(tag, { isHashed: true })
tagCleanupPipeline.srem(tagKey, key)
})
tagCleanupPipeline.unlink(tagsKey) // Delete the tags key
tagCleanupPipeline.unlink(tagsKey)
// Decrement reference counts and cleanup unused tags
await this.#decrementTagRefs(tagIds)
@@ -624,7 +579,6 @@ export class RedisCachingProvider {
await Promise.all(cleanupPromises)
await tagCleanupPipeline.exec()
await deletePipeline.exec()
// Clean up empty tag sets
@@ -632,52 +586,188 @@ export class RedisCachingProvider {
`${this.keyNamePrefix}tag:*`
)
if (allTagKeys.length) {
const cleanupPipeline = this.redisClient.pipeline()
const cardinalityPipeline = this.redisClient.pipeline()
allTagKeys.forEach((tagKey) => {
cleanupPipeline.scard(tagKey)
cardinalityPipeline.scard(tagKey)
})
const cardinalityResults = await cleanupPipeline.exec()
const cardinalityResults = await cardinalityPipeline.exec()
// Delete tag keys that are now empty
const emptyTagDeletePipeline = this.redisClient.pipeline()
// Delete empty tag keys
const emptyTagPipeline = this.redisClient.pipeline()
cardinalityResults?.forEach((result, index) => {
if (result && result[1] === 0) {
emptyTagDeletePipeline.unlink(allTagKeys[index])
emptyTagPipeline.unlink(allTagKeys[index])
}
})
await emptyTagDeletePipeline.exec()
await emptyTagPipeline.exec()
}
return
}
// If autoInvalidate is true (strategy call), only clear entries with autoInvalidate=true (default)
if (options.autoInvalidate === true) {
const optionsPipeline = this.redisClient.pipeline()
Array.from(allKeys).forEach((key) => {
optionsPipeline.hget(key, "options")
})
const optionsResults = await optionsPipeline.exec()
const keysToDelete: string[] = []
Array.from(allKeys).forEach((key, index) => {
const optionsResult = optionsResults?.[index]
if (optionsResult && optionsResult[1]) {
try {
const entryOptions = JSON.parse(optionsResult[1] as string)
// Delete if entry has autoInvalidate=true or no setting (default true)
const shouldAutoInvalidate =
entryOptions.autoInvalidate ?? true
if (shouldAutoInvalidate) {
keysToDelete.push(key)
}
} catch (e) {
// If can't parse options, assume it's safe to delete (default true)
keysToDelete.push(key)
}
} else {
// No options stored, default to true
keysToDelete.push(key)
}
})
if (keysToDelete.length) {
const deletePipeline = this.redisClient.pipeline()
keysToDelete.forEach((key) => {
deletePipeline.unlink(key)
})
// Clean up tag references for each key to delete
const tagDataPromises = keysToDelete.map(async (key) => {
const keyWithoutPrefix = key.replace(this.keyNamePrefix, "")
const tagsKey = this.#getTagsKey(keyWithoutPrefix)
const tagsData = await this.redisClient.getBuffer(tagsKey)
return { key, tagsKey, tagsData }
})
// Wait for all tag data fetches
const tagResults = await Promise.all(tagDataPromises)
// Build single pipeline for all tag cleanup operations
const tagCleanupPipeline = this.redisClient.pipeline()
const cleanupPromises = tagResults.map(
async ({ key, tagsKey, tagsData }) => {
if (tagsData) {
try {
// Binary format: array of 32-bit integers
const tagIds: number[] = []
for (let i = 0; i < tagsData.length; i += 4) {
tagIds.push(tagsData.readUInt32LE(i))
}
if (tagIds.length) {
const entryTags = await this.#resolveTagIds(tagIds)
entryTags.forEach((tag) => {
const tagKey = this.#getTagKey(tag, {
isHashed: true,
})
tagCleanupPipeline.srem(tagKey, key)
})
tagCleanupPipeline.unlink(tagsKey) // Delete the tags key
// Decrement reference counts and cleanup unused tags
await this.#decrementTagRefs(tagIds)
}
} catch (e) {
// noop
}
}
}
)
await Promise.all(cleanupPromises)
await tagCleanupPipeline.exec()
await deletePipeline.exec()
// Clean up empty tag sets
const allTagKeys = await this.redisClient.keys(
`${this.keyNamePrefix}tag:*`
)
if (allTagKeys.length) {
const cleanupPipeline = this.redisClient.pipeline()
allTagKeys.forEach((tagKey) => {
cleanupPipeline.scard(tagKey)
})
const cardinalityResults = await cleanupPipeline.exec()
// Delete tag keys that are now empty
const emptyTagDeletePipeline = this.redisClient.pipeline()
cardinalityResults?.forEach((result, index) => {
if (result && result[1] === 0) {
emptyTagDeletePipeline.unlink(allTagKeys[index])
}
})
await emptyTagDeletePipeline.exec()
}
return
}
}
}
}
} catch (error) {
if (this.isConnectionError(error)) {
this.logger.warn(
"Redis connection error during clear operation, relying on IORedis retry mechanism"
)
return
}
throw error
}
}
async flush(): Promise<void> {
// Use SCAN to find ALL keys with our prefix and delete them
// This includes main cache keys, tag keys (tag:*), and tags keys (tags:*)
const pattern = `${this.keyNamePrefix}*`
let cursor = "0"
try {
// Use SCAN to find ALL keys with our prefix and delete them
// This includes main cache keys, tag keys (tag:*), and tags keys (tags:*)
const pattern = `${this.keyNamePrefix}*`
let cursor = "0"
do {
const result = await this.redisClient.scan(
cursor,
"MATCH",
pattern,
"COUNT",
1000
)
cursor = result[0]
const keys = result[1]
do {
const result = await this.redisClient.scan(
cursor,
"MATCH",
pattern,
"COUNT",
1000
)
cursor = result[0]
const keys = result[1]
if (keys.length) {
await this.redisClient.unlink(...keys)
if (keys.length) {
await this.redisClient.unlink(...keys)
}
} while (cursor !== "0")
} catch (error) {
if (this.isConnectionError(error)) {
this.logger.warn(
"Redis connection error during flush operation, relying on IORedis retry mechanism"
)
return
}
} while (cursor !== "0")
throw error
}
}
}

View File

@@ -1,20 +1,13 @@
export interface RedisCacheModuleOptions {
import { RedisOptions } from "ioredis"
export interface RedisCacheModuleOptions extends RedisOptions {
/**
* Redis connection string
*/
redisUrl?: string
/**
* TTL in milliseconds
*/
ttl?: number
/**
* Connection timeout in milliseconds
*/
connectTimeout?: number
/**
* Lazyload connections
*/
lazyConnect?: boolean
/**
* Connection retries
*/
retryDelayOnFailover?: number
/**
* Key prefix for all cache keys
*/

View File

@@ -42,11 +42,11 @@ export default async (
maxRetriesPerRequest: null,
})
logger?.info(
`Connection to Redis in module 'workflow-engine-redis' established`
`[Workflow-engine-redis] Connection to Redis in module 'workflow-engine-redis' established`
)
} catch (err) {
logger?.error(
`An error occurred while connecting to Redis in module 'workflow-engine-redis': ${err}`
`[Workflow-engine-redis] An error occurred while connecting to Redis in module 'workflow-engine-redis': ${err}`
)
}
@@ -54,11 +54,11 @@ export default async (
redisPublisher = await getConnection(cnnPubSub.url, cnnPubSub.options)
redisSubscriber = await getConnection(cnnPubSub.url, cnnPubSub.options)
logger?.info(
`Connection to Redis PubSub in module 'workflow-engine-redis' established`
`[Workflow-engine-redis] Connection to Redis PubSub in module 'workflow-engine-redis' established`
)
} catch (err) {
logger?.error(
`An error occurred while connecting to Redis PubSub in module 'workflow-engine-redis': ${err}`
`[Workflow-engine-redis] An error occurred while connecting to Redis PubSub in module 'workflow-engine-redis': ${err}`
)
}

View File

@@ -128,6 +128,7 @@ export class RedisDistributedTransactionStorage
}
async onApplicationStart() {
await this.ensureRedisConnection()
const allowedJobs = [
JobType.RETRY,
JobType.STEP_TIMEOUT,
@@ -212,6 +213,64 @@ export class RedisDistributedTransactionStorage
this.workflowOrchestratorService_ = workflowOrchestratorService
}
private async ensureRedisConnection(): Promise<void> {
const reconnectTasks: Promise<void>[] = []
if (this.redisClient.status !== "ready") {
this.logger_.warn(
`[Workflow-engine-redis] Redis connection is not ready (status: ${this.redisClient.status}). Attempting to reconnect...`
)
reconnectTasks.push(
this.redisClient
.connect()
.then(() => {
this.logger_.info(
"[Workflow-engine-redis] Redis connection reestablished successfully"
)
})
.catch((error) => {
this.logger_.error(
"[Workflow-engine-redis] Failed to reconnect to Redis",
error
)
throw new MedusaError(
MedusaError.Types.DB_ERROR,
`Redis connection failed: ${error.message}`
)
})
)
}
if (this.redisWorkerConnection.status !== "ready") {
this.logger_.warn(
`[Workflow-engine-redis] Redis worker connection is not ready (status: ${this.redisWorkerConnection.status}). Attempting to reconnect...`
)
reconnectTasks.push(
this.redisWorkerConnection
.connect()
.then(() => {
this.logger_.info(
"[Workflow-engine-redis] Redis worker connection reestablished successfully"
)
})
.catch((error) => {
this.logger_.error(
"[Workflow-engine-redis] Failed to reconnect to Redis worker connection",
error
)
throw new MedusaError(
MedusaError.Types.DB_ERROR,
`Redis worker connection failed: ${error.message}`
)
})
)
}
if (reconnectTasks.length > 0) {
await promiseAll(reconnectTasks)
}
}
private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) {
const isNotStarted = data.flow.state === TransactionState.NOT_STARTED
const isFinished = [