diff --git a/.changeset/empty-dragons-invite.md b/.changeset/empty-dragons-invite.md new file mode 100644 index 0000000000..5649c7e779 --- /dev/null +++ b/.changeset/empty-dragons-invite.md @@ -0,0 +1,5 @@ +--- +"@medusajs/medusa": patch +--- + +Fix eventBus.emit using redis mock diff --git a/.gitignore b/.gitignore index 789159e5e9..eb87687bd3 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,5 @@ www/**/.yarn/* .idea .turbo -build/** \ No newline at end of file +build/** +**/dist \ No newline at end of file diff --git a/packages/medusa/src/services/__tests__/event-bus.js b/packages/medusa/src/services/__tests__/event-bus.js index 97ac67ea98..ad7a0a4295 100644 --- a/packages/medusa/src/services/__tests__/event-bus.js +++ b/packages/medusa/src/services/__tests__/event-bus.js @@ -24,11 +24,18 @@ describe("EventBusService", () => { find: () => Promise.resolve([]), }) - eventBus = new EventBusService({ - manager: MockManager, - stagedJobRepository, - logger: loggerMock, - }) + eventBus = new EventBusService( + { + manager: MockManager, + stagedJobRepository, + logger: loggerMock, + }, + { + projectConfig: { + redis_url: "localhost", + }, + } + ) }) afterAll(async () => { @@ -49,10 +56,17 @@ describe("EventBusService", () => { beforeEach(() => { jest.resetAllMocks() - eventBus = new EventBusService({ - manager: MockManager, - logger: loggerMock, - }) + eventBus = new EventBusService( + { + manager: MockManager, + logger: loggerMock, + }, + { + projectConfig: { + redis_url: "localhost", + }, + } + ) }) afterAll(async () => { @@ -135,11 +149,18 @@ describe("EventBusService", () => { create: (data) => data, }) - eventBus = new EventBusService({ - logger: loggerMock, - manager: mockManager, - stagedJobRepository, - }) + eventBus = new EventBusService( + { + logger: loggerMock, + manager: mockManager, + stagedJobRepository, + }, + { + projectConfig: { + redis_url: "localhost", + }, + } + ) eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") }) @@ -195,11 +216,18 @@ describe("EventBusService", () => { create: (data) => data, }) - eventBus = new EventBusService({ - logger: loggerMock, - manager: mockManager, - stagedJobRepository, - }) + eventBus = new EventBusService( + { + logger: loggerMock, + manager: mockManager, + stagedJobRepository, + }, + { + projectConfig: { + redis_url: "localhost", + }, + } + ) eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") }) @@ -285,7 +313,10 @@ describe("EventBusService", () => { stagedJobRepository, }, { - projectConfig: { event_options: { removeOnComplete: 10 } }, + projectConfig: { + event_options: { removeOnComplete: 10 }, + redis_url: "localhost", + }, } ) @@ -323,11 +354,18 @@ describe("EventBusService", () => { create: (data) => data, }) - eventBus = new EventBusService({ - logger: loggerMock, - manager: mockManager, - stagedJobRepository, - }) + eventBus = new EventBusService( + { + logger: loggerMock, + manager: mockManager, + stagedJobRepository, + }, + { + projectConfig: { + redis_url: "localhost", + }, + } + ) eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") @@ -370,7 +408,10 @@ describe("EventBusService", () => { stagedJobRepository, }, { - projectConfig: { event_options: { removeOnComplete: 10 } }, + projectConfig: { + event_options: { removeOnComplete: 10 }, + redis_url: "localhost", + }, } ) @@ -418,14 +459,11 @@ describe("EventBusService", () => { find: () => Promise.resolve([]), }) - eventBus = new EventBusService( - { - manager: MockManager, - stagedJobRepository, - logger: loggerMock, - }, - {} - ) + eventBus = new EventBusService({ + manager: MockManager, + stagedJobRepository, + logger: loggerMock, + }) eventBus.subscribe("eventName", () => Promise.resolve("hi")) result = await eventBus.worker_({ data: { eventName: "eventName", data: {} }, diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts index 404c0e5a03..28c919da6e 100644 --- a/packages/medusa/src/services/event-bus.ts +++ b/packages/medusa/src/services/event-bus.ts @@ -308,7 +308,9 @@ export default class EventBusService { return (!isBulkEmit ? stagedJobs[0] : stagedJobs) as unknown as TResult } - await this.queue_.addBulk(events) + if (this.config_?.projectConfig?.redis_url) { + await this.queue_.addBulk(events) + } } startEnqueuer(): void { @@ -346,9 +348,11 @@ export default class EventBusService { } }) - await this.queue_.addBulk(eventsData).then(async () => { - return await stagedJobRepo.delete({ id: In(jobs.map((j) => j.id)) }) - }) + if (this.config_?.projectConfig?.redis_url) { + await this.queue_.addBulk(eventsData).then(async () => { + return await stagedJobRepo.delete({ id: In(jobs.map((j) => j.id)) }) + }) + } await sleep(3000) }