diff --git a/.changeset/pink-emus-drum.md b/.changeset/pink-emus-drum.md new file mode 100644 index 0000000000..afa8032684 --- /dev/null +++ b/.changeset/pink-emus-drum.md @@ -0,0 +1,5 @@ +--- +"@medusajs/medusa": patch +--- + +feat(medusa): Use Bull `jobId` option to identify duplicates diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts index f7894a2f70..d7bbf197af 100644 --- a/packages/medusa/src/services/event-bus.ts +++ b/packages/medusa/src/services/event-bus.ts @@ -1,4 +1,4 @@ -import Bull from "bull" +import Bull, { JobOptions } from "bull" import Redis from "ioredis" import { isDefined } from "medusa-core-utils" import { EntityManager } from "typeorm" @@ -40,14 +40,16 @@ type SubscriberDescriptor = { subscriber: Subscriber } -type EmitOptions = { +export type EmitOptions = { delay?: number attempts: number backoff?: { type: "fixed" | "exponential" delay: number } -} +} & JobOptions + +const COMPLETED_JOB_TTL = 10000 /** * Can keep track of multiple subscribers to different events and run the @@ -225,10 +227,13 @@ export default class EventBusService { data: T, options: Record & EmitOptions = { attempts: 1 } ): Promise { - const opts: { removeOnComplete: boolean } & EmitOptions = { - removeOnComplete: true, - attempts: 1, + const opts: EmitOptions = { + removeOnComplete: { + age: COMPLETED_JOB_TTL, + }, + ...options, } + if (typeof options.attempts === "number") { opts.attempts = options.attempts if (isDefined(options.backoff)) { @@ -295,7 +300,7 @@ export default class EventBusService { this.queue_ .add( { eventName: job.event_name, data: job.data }, - job.options ?? { removeOnComplete: true } + { jobId: job.id, ...job.options } ) .then(async () => { await stagedJobRepo.remove(job)