Files
medusa-store/packages/modules/workflow-engine-redis/src/loaders/redis.ts
Adrien de Peretti cc8422d3a1 fix(workflow-engine-redis): Split the queues and respect worker mode for job executions (#11740)
**What**
Currently, the workflow engine redis does not make any distinction between worker modes, when starting as server, the engine listen to the queue which contains everything and try to execute the corresponding workflow which does not exists since job workflows are not loaded in server mode. Now, a dedicated queue is created for jobs and the worker is only started if the instance is not in server mode. In order to clean up the old queue, if the old queue trigger a scheduled job then it gets removed from the queue since it will get re added to the new queue by the new worker instances
2025-03-06 11:52:52 +00:00

93 lines
2.6 KiB
TypeScript

import {
InternalModuleDeclaration,
LoaderOptions,
} from "@medusajs/framework/types"
import { asValue } from "awilix"
import Redis from "ioredis"
import { RedisWorkflowsOptions } from "../types"
export default async (
{ container, logger, options, dataLoaderOnly }: LoaderOptions,
moduleDeclaration: InternalModuleDeclaration
): Promise<void> => {
const {
url,
options: redisOptions,
pubsub,
} = options?.redis as RedisWorkflowsOptions
// TODO: get default from ENV VAR
if (!url) {
throw Error(
"No `redis.url` provided in `workflowOrchestrator` module options. It is required for the Workflow Orchestrator Redis."
)
}
const cnnPubSub = pubsub ?? { url, options: redisOptions }
const queueName = options?.queueName ?? "medusa-workflows"
const jobQueueName = options?.jobQueueName ?? "medusa-workflows-jobs"
let connection
let redisPublisher
let redisSubscriber
let workerConnection
try {
connection = await getConnection(url, redisOptions)
workerConnection = await getConnection(url, {
...(redisOptions ?? {}),
maxRetriesPerRequest: null,
})
logger?.info(
`Connection to Redis in module 'workflow-engine-redis' established`
)
} catch (err) {
logger?.error(
`An error occurred while connecting to Redis in module 'workflow-engine-redis': ${err}`
)
}
try {
redisPublisher = await getConnection(cnnPubSub.url, cnnPubSub.options)
redisSubscriber = await getConnection(cnnPubSub.url, cnnPubSub.options)
logger?.info(
`Connection to Redis PubSub in module 'workflow-engine-redis' established`
)
} catch (err) {
logger?.error(
`An error occurred while connecting to Redis PubSub in module 'workflow-engine-redis': ${err}`
)
}
container.register({
isWorkerMode: asValue(moduleDeclaration.worker_mode !== "server"),
partialLoading: asValue(true),
redisConnection: asValue(connection),
redisWorkerConnection: asValue(workerConnection),
redisPublisher: asValue(redisPublisher),
redisSubscriber: asValue(redisSubscriber),
redisQueueName: asValue(queueName),
redisJobQueueName: asValue(jobQueueName),
redisDisconnectHandler: asValue(async () => {
connection.disconnect()
workerConnection.disconnect()
redisPublisher.disconnect()
redisSubscriber.disconnect()
}),
})
}
async function getConnection(url, redisOptions) {
const connection = new Redis(url, {
lazyConnect: true,
...(redisOptions ?? {}),
})
await new Promise(async (resolve) => {
await connection.connect(resolve)
})
return connection
}