From 4b114cc4191ba20832b66072ec82386b22a3533c Mon Sep 17 00:00:00 2001 From: Oliver Windall Juhl <59018053+olivermrbl@users.noreply.github.com> Date: Wed, 1 Mar 2023 18:11:30 +0100 Subject: [PATCH] feat(medusa): Use Bull `jobId` option to identify duplicates (#3351) * feat(medusa): Use Bull jobId option to identify duplicates * Create pink-emus-drum.md --- .changeset/pink-emus-drum.md | 5 +++++ packages/medusa/src/services/event-bus.ts | 19 ++++++++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) create mode 100644 .changeset/pink-emus-drum.md diff --git a/.changeset/pink-emus-drum.md b/.changeset/pink-emus-drum.md new file mode 100644 index 0000000000..afa8032684 --- /dev/null +++ b/.changeset/pink-emus-drum.md @@ -0,0 +1,5 @@ +--- +"@medusajs/medusa": patch +--- + +feat(medusa): Use Bull `jobId` option to identify duplicates diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts index f7894a2f70..d7bbf197af 100644 --- a/packages/medusa/src/services/event-bus.ts +++ b/packages/medusa/src/services/event-bus.ts @@ -1,4 +1,4 @@ -import Bull from "bull" +import Bull, { JobOptions } from "bull" import Redis from "ioredis" import { isDefined } from "medusa-core-utils" import { EntityManager } from "typeorm" @@ -40,14 +40,16 @@ type SubscriberDescriptor = { subscriber: Subscriber } -type EmitOptions = { +export type EmitOptions = { delay?: number attempts: number backoff?: { type: "fixed" | "exponential" delay: number } -} +} & JobOptions + +const COMPLETED_JOB_TTL = 10000 /** * Can keep track of multiple subscribers to different events and run the @@ -225,10 +227,13 @@ export default class EventBusService { data: T, options: Record & EmitOptions = { attempts: 1 } ): Promise { - const opts: { removeOnComplete: boolean } & EmitOptions = { - removeOnComplete: true, - attempts: 1, + const opts: EmitOptions = { + removeOnComplete: { + age: COMPLETED_JOB_TTL, + }, + ...options, } + if (typeof options.attempts === "number") { opts.attempts = options.attempts if (isDefined(options.backoff)) { @@ -295,7 +300,7 @@ export default class EventBusService { this.queue_ .add( { eventName: job.event_name, data: job.data }, - job.options ?? { removeOnComplete: true } + { jobId: job.id, ...job.options } ) .then(async () => { await stagedJobRepo.remove(job)