diff --git a/.changeset/rude-queens-deny.md b/.changeset/rude-queens-deny.md new file mode 100644 index 0000000000..015b780450 --- /dev/null +++ b/.changeset/rude-queens-deny.md @@ -0,0 +1,6 @@ +--- +"@medusajs/medusa": patch +"@medusajs/utils": patch +--- + +fix(): event emitting diff --git a/integration-tests/http/__fixtures__/worker-mode-server/medusa-config.js b/integration-tests/http/__fixtures__/worker-mode-server/medusa-config.js new file mode 100644 index 0000000000..0c58af7f4e --- /dev/null +++ b/integration-tests/http/__fixtures__/worker-mode-server/medusa-config.js @@ -0,0 +1,47 @@ +const { defineConfig, Modules } = require("@medusajs/utils") +const os = require("os") +const path = require("path") + +const DB_HOST = process.env.DB_HOST +const DB_USERNAME = process.env.DB_USERNAME +const DB_PASSWORD = process.env.DB_PASSWORD +const DB_NAME = process.env.DB_TEMP_NAME +const DB_URL = `postgres://${DB_USERNAME}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}` +process.env.DATABASE_URL = DB_URL +process.env.LOG_LEVEL = "error" + +module.exports = defineConfig({ + admin: { + disable: true, + }, + projectConfig: { + http: { + jwtSecret: "test", + }, + workerMode: "server", + redisUrl: process.env.REDIS_URL ?? "redis://localhost:6379", + }, + modules: { + [Modules.EVENT_BUS]: { + resolve: "@medusajs/event-bus-redis", + options: { + redisUrl: process.env.REDIS_URL ?? "redis://localhost:6379", + }, + }, + [Modules.FILE]: { + resolve: "@medusajs/file", + options: { + providers: [ + { + resolve: "@medusajs/file-local", + id: "local", + options: { + upload_dir: path.join(os.tmpdir(), "uploads"), + private_upload_dir: path.join(os.tmpdir(), "static"), + }, + }, + ], + }, + }, + }, +}) diff --git a/integration-tests/http/__tests__/event-bus/subscriber-registration.spec.ts b/integration-tests/http/__tests__/event-bus/subscriber-registration.spec.ts new file mode 100644 index 0000000000..e5852e9a3f --- /dev/null +++ b/integration-tests/http/__tests__/event-bus/subscriber-registration.spec.ts @@ -0,0 +1,92 @@ +import { medusaIntegrationTestRunner } from "@medusajs/test-utils" +import { IEventBusModuleService } from "@medusajs/types" +import { composeMessage, Modules, PaymentWebhookEvents } from "@medusajs/utils" +import path from "path" + +jest.setTimeout(100000) + +medusaIntegrationTestRunner({ + medusaConfigFile: path.join( + __dirname, + "../../__fixtures__/worker-mode-server" + ), + testSuite: ({ getContainer }) => { + describe("Event Bus - Server Worker Mode", () => { + let eventBus: IEventBusModuleService + + beforeAll(() => { + eventBus = getContainer().resolve(Modules.EVENT_BUS) + }) + + it("should register subscribers, queue events with subscribers, and skip events without subscribers", async () => { + const subscribersMap = (eventBus as any).eventToSubscribersMap + expect(subscribersMap).toBeDefined() + expect(subscribersMap.size).toBeGreaterThan(0) + + const paymentWebhookSubscribers = subscribersMap.get( + PaymentWebhookEvents.WebhookReceived + ) + expect(paymentWebhookSubscribers).toBeDefined() + expect(paymentWebhookSubscribers.length).toBeGreaterThan(0) + + const bullWorker = (eventBus as any).bullWorker_ + expect(bullWorker).toBeUndefined() + + const testEventName = "test.server-mode-event" + const subscriberMock = jest.fn() + + eventBus.subscribe(testEventName, subscriberMock, { + subscriberId: "test-server-mode-subscriber", + }) + expect(subscribersMap.get(testEventName)).toBeDefined() + + const queue = (eventBus as any).queue_ + const jobCountsBefore = await queue.getJobCounts() + const totalJobsBefore = + jobCountsBefore.waiting + jobCountsBefore.delayed + + await eventBus.emit( + composeMessage(testEventName, { + data: { test: "data" }, + object: "test", + source: "integration-test", + action: "created", + }) + ) + + const jobCountsAfterWithSubscriber = await queue.getJobCounts() + const totalJobsAfterWithSubscriber = + jobCountsAfterWithSubscriber.waiting + + jobCountsAfterWithSubscriber.delayed + + expect(totalJobsAfterWithSubscriber).toBeGreaterThan(totalJobsBefore) + + await new Promise((resolve) => setTimeout(resolve, 500)) + + expect(subscriberMock).not.toHaveBeenCalled() + + const eventWithNoSubscribers = "test.event-without-subscribers" + expect(subscribersMap.get(eventWithNoSubscribers)).toBeUndefined() + + const jobCountsBeforeNoSub = await queue.getJobCounts() + const totalJobsBeforeNoSub = + jobCountsBeforeNoSub.waiting + jobCountsBeforeNoSub.delayed + + await eventBus.emit( + composeMessage(eventWithNoSubscribers, { + data: { test: "should-not-be-queued" }, + object: "test", + source: "integration-test", + action: "created", + }) + ) + + const jobCountsAfterNoSub = await queue.getJobCounts() + const totalJobsAfterNoSub = + jobCountsAfterNoSub.waiting + jobCountsAfterNoSub.delayed + + expect(totalJobsAfterNoSub).toBe(totalJobsBeforeNoSub) + }) + }) + }, +}) diff --git a/integration-tests/http/package.json b/integration-tests/http/package.json index 674fe8be61..e8ff4a35fd 100644 --- a/integration-tests/http/package.json +++ b/integration-tests/http/package.json @@ -16,6 +16,7 @@ "@medusajs/core-flows": "workspace:*", "@medusajs/customer": "workspace:^", "@medusajs/event-bus-local": "workspace:*", + "@medusajs/event-bus-redis": "workspace:*", "@medusajs/framework": "workspace:*", "@medusajs/fulfillment": "workspace:^", "@medusajs/fulfillment-manual": "workspace:^", diff --git a/packages/core/utils/src/event-bus/index.ts b/packages/core/utils/src/event-bus/index.ts index 5cdb889bd4..07cda07d84 100644 --- a/packages/core/utils/src/event-bus/index.ts +++ b/packages/core/utils/src/event-bus/index.ts @@ -87,10 +87,6 @@ export abstract class AbstractEventBusModuleService subscriber: EventBusTypes.Subscriber, context?: EventBusTypes.SubscriberContext ): this { - if (!this.isWorkerMode) { - return this - } - if (typeof subscriber !== `function`) { throw new Error("Subscriber must be a function") } diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index c5d17304e6..7cf8f07076 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -91,11 +91,12 @@ async function loadEntrypoints( ContainerRegistrationKeys.CONFIG_MODULE ) + // Subscribers should be loaded no matter the worker mode, simply they will never handle anything + // since worker/shared instances only will have a running worker to process events. + await subscribersLoader(plugins, container) + if (shouldLoadBackgroundProcessors(configModule)) { - await promiseAll([ - subscribersLoader(plugins, container), - jobsLoader(plugins, container), - ]) + await jobsLoader(plugins, container) } if (isWorkerMode(configModule)) {