fix(workflow-engine-redis): Split the queues and respect worker mode for job executions (#11740)

**What**
Currently, the workflow engine redis does not make any distinction between worker modes, when starting as server, the engine listen to the queue which contains everything and try to execute the corresponding workflow which does not exists since job workflows are not loaded in server mode. Now, a dedicated queue is created for jobs and the worker is only started if the instance is not in server mode. In order to clean up the old queue, if the old queue trigger a scheduled job then it gets removed from the queue since it will get re added to the new queue by the new worker instances
This commit is contained in:
Adrien de Peretti
2025-03-06 12:52:52 +01:00
committed by GitHub
parent 4f23901fcf
commit cc8422d3a1
4 changed files with 74 additions and 22 deletions

View File

@@ -36,37 +36,55 @@ export class RedisDistributedTransactionStorage
private redisClient: Redis
private redisWorkerConnection: Redis
private queueName: string
private jobQueueName: string
private queue: Queue
private jobQueue?: Queue
private worker: Worker
private jobWorker?: Worker
#isWorkerMode: boolean = false
constructor({
workflowExecutionService,
redisConnection,
redisWorkerConnection,
redisQueueName,
redisJobQueueName,
logger,
isWorkerMode,
}: {
workflowExecutionService: ModulesSdkTypes.IMedusaInternalService<any>
redisConnection: Redis
redisWorkerConnection: Redis
redisQueueName: string
redisJobQueueName: string
logger: Logger
isWorkerMode: boolean
}) {
this.workflowExecutionService_ = workflowExecutionService
this.logger_ = logger
this.redisClient = redisConnection
this.redisWorkerConnection = redisWorkerConnection
this.queueName = redisQueueName
this.jobQueueName = redisJobQueueName
this.queue = new Queue(redisQueueName, { connection: this.redisClient })
this.jobQueue = isWorkerMode
? new Queue(redisJobQueueName, {
connection: this.redisClient,
})
: undefined
this.#isWorkerMode = isWorkerMode
}
async onApplicationPrepareShutdown() {
// Close worker gracefully, i.e. wait for the current jobs to finish
await this.worker?.close()
await this.jobWorker?.close()
}
async onApplicationShutdown() {
await this.queue?.close()
await this.jobQueue?.close()
}
async onApplicationStart() {
@@ -76,6 +94,11 @@ export class RedisDistributedTransactionStorage
JobType.TRANSACTION_TIMEOUT,
]
const workerOptions = {
connection:
this.redisWorkerConnection /*, runRetryDelay: 100000 for tests */,
}
this.worker = new Worker(
this.queueName,
async (job) => {
@@ -86,19 +109,26 @@ export class RedisDistributedTransactionStorage
)
}
// Note: We might even want a separate worker with different concurrency settings in the future, but for now we keep it simple
if (job.name === JobType.SCHEDULE) {
// Remove repeatable job from the old queue since now we have a queue dedicated to scheduled jobs
await this.remove(job.data.jobId)
}
},
workerOptions
)
if (this.#isWorkerMode) {
this.jobWorker = new Worker(
this.jobQueueName,
async (job) => {
await this.executeScheduledJob(
job.data.jobId,
job.data.schedulerOptions
)
}
},
{
connection:
this.redisWorkerConnection /*, runRetryDelay: 100000 for tests */,
}
)
},
workerOptions
)
}
}
setWorkflowOrchestratorService(workflowOrchestratorService) {
@@ -369,10 +399,18 @@ export class RedisDistributedTransactionStorage
step?: TransactionStep
) {
const jobId = this.getJobId(type, transaction, step)
const job = await this.queue.getJob(jobId)
if (job && job.attemptsStarted === 0) {
await job.remove()
if (type === JobType.SCHEDULE) {
const job = await this.jobQueue?.getJob(jobId)
if (job) {
await job.remove()
}
} else {
const job = await this.queue.getJob(jobId)
if (job && job.attemptsStarted === 0) {
await job.remove()
}
}
}
@@ -385,7 +423,7 @@ export class RedisDistributedTransactionStorage
typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId
// If it is the same key (eg. the same workflow name), the old one will get overridden.
await this.queue.add(
await this.jobQueue?.add(
JobType.SCHEDULE,
{
jobId,
@@ -410,13 +448,13 @@ export class RedisDistributedTransactionStorage
}
async remove(jobId: string): Promise<void> {
await this.queue.removeRepeatableByKey(`${JobType.SCHEDULE}_${jobId}`)
await this.jobQueue?.removeRepeatableByKey(`${JobType.SCHEDULE}_${jobId}`)
}
async removeAll(): Promise<void> {
const repeatableJobs = await this.queue.getRepeatableJobs()
const repeatableJobs = (await this.jobQueue?.getRepeatableJobs()) ?? []
await promiseAll(
repeatableJobs.map((job) => this.queue.removeRepeatableByKey(job.key))
repeatableJobs.map((job) => this.jobQueue?.removeRepeatableByKey(job.key))
)
}
}