feat(medusa): Add global job options for events (#3394)
This commit is contained in:
committed by
GitHub
parent
c7eee77bd9
commit
ce577f2696
5
.changeset/quick-ravens-lie.md
Normal file
5
.changeset/quick-ravens-lie.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@medusajs/medusa": patch
|
||||
---
|
||||
|
||||
feat(medusa): Add global job options for events
|
||||
@@ -141,6 +141,149 @@ describe("EventBusService", () => {
|
||||
expect(eventBus.queue_.add).toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
describe("successfully adds job to queue with local options", () => {
|
||||
beforeAll(() => {
|
||||
jest.resetAllMocks()
|
||||
const stagedJobRepository = MockRepository({
|
||||
find: () => Promise.resolve([]),
|
||||
})
|
||||
|
||||
eventBus = new EventBusService({
|
||||
logger: loggerMock,
|
||||
manager: MockManager,
|
||||
stagedJobRepository,
|
||||
})
|
||||
|
||||
eventBus.queue_.add.mockImplementationOnce(() => "hi")
|
||||
|
||||
job = eventBus.emit(
|
||||
"eventName",
|
||||
{ hi: "1234" },
|
||||
{ removeOnComplete: 100 }
|
||||
)
|
||||
})
|
||||
afterAll(async () => {
|
||||
await eventBus.stopEnqueuer()
|
||||
})
|
||||
|
||||
it("calls queue.add", () => {
|
||||
expect(eventBus.queue_.add).toHaveBeenCalled()
|
||||
expect(eventBus.queue_.add).toHaveBeenCalledWith(
|
||||
{ eventName: "eventName", data: { hi: "1234" } },
|
||||
{ removeOnComplete: 100 }
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe("successfully adds job to queue with global options", () => {
|
||||
beforeAll(() => {
|
||||
jest.resetAllMocks()
|
||||
const stagedJobRepository = MockRepository({
|
||||
find: () => Promise.resolve([]),
|
||||
})
|
||||
|
||||
eventBus = new EventBusService(
|
||||
{
|
||||
logger: loggerMock,
|
||||
manager: MockManager,
|
||||
stagedJobRepository,
|
||||
},
|
||||
{
|
||||
projectConfig: { event_options: { removeOnComplete: 10 } },
|
||||
}
|
||||
)
|
||||
|
||||
eventBus.queue_.add.mockImplementationOnce(() => "hi")
|
||||
|
||||
job = eventBus.emit("eventName", { hi: "1234" })
|
||||
})
|
||||
afterAll(async () => {
|
||||
await eventBus.stopEnqueuer()
|
||||
})
|
||||
|
||||
it("calls queue.add", () => {
|
||||
expect(eventBus.queue_.add).toHaveBeenCalled()
|
||||
expect(eventBus.queue_.add).toHaveBeenCalledWith(
|
||||
{ eventName: "eventName", data: { hi: "1234" } },
|
||||
{ removeOnComplete: 10, attempts: 1 }
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe("successfully adds job to queue with default options", () => {
|
||||
beforeAll(() => {
|
||||
jest.resetAllMocks()
|
||||
const stagedJobRepository = MockRepository({
|
||||
find: () => Promise.resolve([]),
|
||||
})
|
||||
|
||||
eventBus = new EventBusService({
|
||||
logger: loggerMock,
|
||||
manager: MockManager,
|
||||
stagedJobRepository,
|
||||
})
|
||||
|
||||
eventBus.queue_.add.mockImplementationOnce(() => "hi")
|
||||
|
||||
job = eventBus.emit("eventName", { hi: "1234" })
|
||||
})
|
||||
afterAll(async () => {
|
||||
await eventBus.stopEnqueuer()
|
||||
})
|
||||
|
||||
it("calls queue.add", () => {
|
||||
expect(eventBus.queue_.add).toHaveBeenCalled()
|
||||
expect(eventBus.queue_.add).toHaveBeenCalledWith(
|
||||
{ eventName: "eventName", data: { hi: "1234" } },
|
||||
{ removeOnComplete: true, attempts: 1 }
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe("successfully adds job to queue with local options and global options merged", () => {
|
||||
beforeAll(() => {
|
||||
jest.resetAllMocks()
|
||||
const stagedJobRepository = MockRepository({
|
||||
find: () => Promise.resolve([]),
|
||||
})
|
||||
|
||||
eventBus = new EventBusService(
|
||||
{
|
||||
logger: loggerMock,
|
||||
manager: MockManager,
|
||||
stagedJobRepository,
|
||||
},
|
||||
{
|
||||
projectConfig: { event_options: { removeOnComplete: 10 } },
|
||||
}
|
||||
)
|
||||
|
||||
eventBus.queue_.add.mockImplementationOnce(() => "hi")
|
||||
|
||||
job = eventBus.emit(
|
||||
"eventName",
|
||||
{ hi: "1234" },
|
||||
{ attempts: 10, delay: 1000, backoff: { type: "exponential" } }
|
||||
)
|
||||
})
|
||||
afterAll(async () => {
|
||||
await eventBus.stopEnqueuer()
|
||||
})
|
||||
|
||||
it("calls queue.add", () => {
|
||||
expect(eventBus.queue_.add).toHaveBeenCalled()
|
||||
expect(eventBus.queue_.add).toHaveBeenCalledWith(
|
||||
{ eventName: "eventName", data: { hi: "1234" } },
|
||||
{
|
||||
removeOnComplete: 10, // global option
|
||||
attempts: 10, // local option
|
||||
delay: 1000, // local option
|
||||
backoff: { type: "exponential" }, // local option
|
||||
}
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("worker", () => {
|
||||
|
||||
@@ -225,8 +225,15 @@ export default class EventBusService {
|
||||
data: T,
|
||||
options: Record<string, unknown> & EmitOptions = { attempts: 1 }
|
||||
): Promise<StagedJob | void> {
|
||||
const globalEventOptions = this.config_?.projectConfig?.event_options ?? {}
|
||||
|
||||
// The order of precedence for job options is:
|
||||
// 1. local options
|
||||
// 2. global options
|
||||
// 3. default options
|
||||
const opts: EmitOptions = {
|
||||
removeOnComplete: true,
|
||||
...globalEventOptions,
|
||||
...options,
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import { Request } from "express"
|
||||
import { LoggerOptions } from "typeorm"
|
||||
import { Logger as _Logger } from "winston"
|
||||
import { Customer, User } from "../models"
|
||||
import { EmitOptions } from "../services/event-bus"
|
||||
import { FindConfig, RequestQueryFields } from "./common"
|
||||
|
||||
declare global {
|
||||
@@ -114,6 +115,25 @@ export type ConfigModule = {
|
||||
projectConfig: {
|
||||
redis_url?: string
|
||||
|
||||
/**
|
||||
* Global options passed to all `EventBusService.emit` in the core as well as your own emitters. The options are forwarded to Bull's `Queue.add` method.
|
||||
*
|
||||
* The global options can be overridden by passing options to `EventBusService.emit` directly.
|
||||
*
|
||||
* Note: This will be deprecated as we move to Event Bus module in 1.8
|
||||
*
|
||||
*
|
||||
* Example
|
||||
* ```js
|
||||
* {
|
||||
* removeOnComplete: { age: 10 },
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* @see https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueadd
|
||||
*/
|
||||
event_options?: Record<string, unknown> & EmitOptions
|
||||
|
||||
session_options?: SessionOptions
|
||||
|
||||
jwt_secret?: string
|
||||
|
||||
Reference in New Issue
Block a user