From ce577f2696aa2181bef8f3096b1a639feabe2714 Mon Sep 17 00:00:00 2001 From: Oliver Windall Juhl <59018053+olivermrbl@users.noreply.github.com> Date: Wed, 8 Mar 2023 11:09:43 +0100 Subject: [PATCH] feat(medusa): Add global job options for events (#3394) --- .changeset/quick-ravens-lie.md | 5 + .../src/services/__tests__/event-bus.js | 143 ++++++++++++++++++ packages/medusa/src/services/event-bus.ts | 7 + packages/medusa/src/types/global.ts | 20 +++ 4 files changed, 175 insertions(+) create mode 100644 .changeset/quick-ravens-lie.md diff --git a/.changeset/quick-ravens-lie.md b/.changeset/quick-ravens-lie.md new file mode 100644 index 0000000000..d2c75f8d0f --- /dev/null +++ b/.changeset/quick-ravens-lie.md @@ -0,0 +1,5 @@ +--- +"@medusajs/medusa": patch +--- + +feat(medusa): Add global job options for events diff --git a/packages/medusa/src/services/__tests__/event-bus.js b/packages/medusa/src/services/__tests__/event-bus.js index 9501351941..0532cc8cba 100644 --- a/packages/medusa/src/services/__tests__/event-bus.js +++ b/packages/medusa/src/services/__tests__/event-bus.js @@ -141,6 +141,149 @@ describe("EventBusService", () => { expect(eventBus.queue_.add).toHaveBeenCalled() }) }) + + describe("successfully adds job to queue with local options", () => { + beforeAll(() => { + jest.resetAllMocks() + const stagedJobRepository = MockRepository({ + find: () => Promise.resolve([]), + }) + + eventBus = new EventBusService({ + logger: loggerMock, + manager: MockManager, + stagedJobRepository, + }) + + eventBus.queue_.add.mockImplementationOnce(() => "hi") + + job = eventBus.emit( + "eventName", + { hi: "1234" }, + { removeOnComplete: 100 } + ) + }) + afterAll(async () => { + await eventBus.stopEnqueuer() + }) + + it("calls queue.add", () => { + expect(eventBus.queue_.add).toHaveBeenCalled() + expect(eventBus.queue_.add).toHaveBeenCalledWith( + { eventName: "eventName", data: { hi: "1234" } }, + { removeOnComplete: 100 } + ) + }) + }) + + describe("successfully adds job to queue with global options", () => { + beforeAll(() => { + jest.resetAllMocks() + const stagedJobRepository = MockRepository({ + find: () => Promise.resolve([]), + }) + + eventBus = new EventBusService( + { + logger: loggerMock, + manager: MockManager, + stagedJobRepository, + }, + { + projectConfig: { event_options: { removeOnComplete: 10 } }, + } + ) + + eventBus.queue_.add.mockImplementationOnce(() => "hi") + + job = eventBus.emit("eventName", { hi: "1234" }) + }) + afterAll(async () => { + await eventBus.stopEnqueuer() + }) + + it("calls queue.add", () => { + expect(eventBus.queue_.add).toHaveBeenCalled() + expect(eventBus.queue_.add).toHaveBeenCalledWith( + { eventName: "eventName", data: { hi: "1234" } }, + { removeOnComplete: 10, attempts: 1 } + ) + }) + }) + + describe("successfully adds job to queue with default options", () => { + beforeAll(() => { + jest.resetAllMocks() + const stagedJobRepository = MockRepository({ + find: () => Promise.resolve([]), + }) + + eventBus = new EventBusService({ + logger: loggerMock, + manager: MockManager, + stagedJobRepository, + }) + + eventBus.queue_.add.mockImplementationOnce(() => "hi") + + job = eventBus.emit("eventName", { hi: "1234" }) + }) + afterAll(async () => { + await eventBus.stopEnqueuer() + }) + + it("calls queue.add", () => { + expect(eventBus.queue_.add).toHaveBeenCalled() + expect(eventBus.queue_.add).toHaveBeenCalledWith( + { eventName: "eventName", data: { hi: "1234" } }, + { removeOnComplete: true, attempts: 1 } + ) + }) + }) + + describe("successfully adds job to queue with local options and global options merged", () => { + beforeAll(() => { + jest.resetAllMocks() + const stagedJobRepository = MockRepository({ + find: () => Promise.resolve([]), + }) + + eventBus = new EventBusService( + { + logger: loggerMock, + manager: MockManager, + stagedJobRepository, + }, + { + projectConfig: { event_options: { removeOnComplete: 10 } }, + } + ) + + eventBus.queue_.add.mockImplementationOnce(() => "hi") + + job = eventBus.emit( + "eventName", + { hi: "1234" }, + { attempts: 10, delay: 1000, backoff: { type: "exponential" } } + ) + }) + afterAll(async () => { + await eventBus.stopEnqueuer() + }) + + it("calls queue.add", () => { + expect(eventBus.queue_.add).toHaveBeenCalled() + expect(eventBus.queue_.add).toHaveBeenCalledWith( + { eventName: "eventName", data: { hi: "1234" } }, + { + removeOnComplete: 10, // global option + attempts: 10, // local option + delay: 1000, // local option + backoff: { type: "exponential" }, // local option + } + ) + }) + }) }) describe("worker", () => { diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts index efd394e718..5549b1e36c 100644 --- a/packages/medusa/src/services/event-bus.ts +++ b/packages/medusa/src/services/event-bus.ts @@ -225,8 +225,15 @@ export default class EventBusService { data: T, options: Record & EmitOptions = { attempts: 1 } ): Promise { + const globalEventOptions = this.config_?.projectConfig?.event_options ?? {} + + // The order of precedence for job options is: + // 1. local options + // 2. global options + // 3. default options const opts: EmitOptions = { removeOnComplete: true, + ...globalEventOptions, ...options, } diff --git a/packages/medusa/src/types/global.ts b/packages/medusa/src/types/global.ts index eee037ac01..cf0d08bde5 100644 --- a/packages/medusa/src/types/global.ts +++ b/packages/medusa/src/types/global.ts @@ -3,6 +3,7 @@ import { Request } from "express" import { LoggerOptions } from "typeorm" import { Logger as _Logger } from "winston" import { Customer, User } from "../models" +import { EmitOptions } from "../services/event-bus" import { FindConfig, RequestQueryFields } from "./common" declare global { @@ -114,6 +115,25 @@ export type ConfigModule = { projectConfig: { redis_url?: string + /** + * Global options passed to all `EventBusService.emit` in the core as well as your own emitters. The options are forwarded to Bull's `Queue.add` method. + * + * The global options can be overridden by passing options to `EventBusService.emit` directly. + * + * Note: This will be deprecated as we move to Event Bus module in 1.8 + * + * + * Example + * ```js + * { + * removeOnComplete: { age: 10 }, + * } + * ``` + * + * @see https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueadd + */ + event_options?: Record & EmitOptions + session_options?: SessionOptions jwt_secret?: string