fix(): event emitting (#14196)

* fix(): event emitting

* Create rude-queens-deny.md

* fix(): store subscriber should not be constraint

* Update rude-queens-deny.md

* Add tests to prevent regression
This commit is contained in:
Adrien de Peretti
2025-12-03 08:53:09 +01:00
committed by GitHub
parent fe3c28488c
commit 391d8dc6cd
6 changed files with 151 additions and 8 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/medusa": patch
"@medusajs/utils": patch
---
fix(): event emitting

View File

@@ -0,0 +1,47 @@
const { defineConfig, Modules } = require("@medusajs/utils")
const os = require("os")
const path = require("path")
const DB_HOST = process.env.DB_HOST
const DB_USERNAME = process.env.DB_USERNAME
const DB_PASSWORD = process.env.DB_PASSWORD
const DB_NAME = process.env.DB_TEMP_NAME
const DB_URL = `postgres://${DB_USERNAME}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}`
process.env.DATABASE_URL = DB_URL
process.env.LOG_LEVEL = "error"
module.exports = defineConfig({
admin: {
disable: true,
},
projectConfig: {
http: {
jwtSecret: "test",
},
workerMode: "server",
redisUrl: process.env.REDIS_URL ?? "redis://localhost:6379",
},
modules: {
[Modules.EVENT_BUS]: {
resolve: "@medusajs/event-bus-redis",
options: {
redisUrl: process.env.REDIS_URL ?? "redis://localhost:6379",
},
},
[Modules.FILE]: {
resolve: "@medusajs/file",
options: {
providers: [
{
resolve: "@medusajs/file-local",
id: "local",
options: {
upload_dir: path.join(os.tmpdir(), "uploads"),
private_upload_dir: path.join(os.tmpdir(), "static"),
},
},
],
},
},
},
})

View File

@@ -0,0 +1,92 @@
import { medusaIntegrationTestRunner } from "@medusajs/test-utils"
import { IEventBusModuleService } from "@medusajs/types"
import { composeMessage, Modules, PaymentWebhookEvents } from "@medusajs/utils"
import path from "path"
jest.setTimeout(100000)
medusaIntegrationTestRunner({
medusaConfigFile: path.join(
__dirname,
"../../__fixtures__/worker-mode-server"
),
testSuite: ({ getContainer }) => {
describe("Event Bus - Server Worker Mode", () => {
let eventBus: IEventBusModuleService
beforeAll(() => {
eventBus = getContainer().resolve(Modules.EVENT_BUS)
})
it("should register subscribers, queue events with subscribers, and skip events without subscribers", async () => {
const subscribersMap = (eventBus as any).eventToSubscribersMap
expect(subscribersMap).toBeDefined()
expect(subscribersMap.size).toBeGreaterThan(0)
const paymentWebhookSubscribers = subscribersMap.get(
PaymentWebhookEvents.WebhookReceived
)
expect(paymentWebhookSubscribers).toBeDefined()
expect(paymentWebhookSubscribers.length).toBeGreaterThan(0)
const bullWorker = (eventBus as any).bullWorker_
expect(bullWorker).toBeUndefined()
const testEventName = "test.server-mode-event"
const subscriberMock = jest.fn()
eventBus.subscribe(testEventName, subscriberMock, {
subscriberId: "test-server-mode-subscriber",
})
expect(subscribersMap.get(testEventName)).toBeDefined()
const queue = (eventBus as any).queue_
const jobCountsBefore = await queue.getJobCounts()
const totalJobsBefore =
jobCountsBefore.waiting + jobCountsBefore.delayed
await eventBus.emit(
composeMessage(testEventName, {
data: { test: "data" },
object: "test",
source: "integration-test",
action: "created",
})
)
const jobCountsAfterWithSubscriber = await queue.getJobCounts()
const totalJobsAfterWithSubscriber =
jobCountsAfterWithSubscriber.waiting +
jobCountsAfterWithSubscriber.delayed
expect(totalJobsAfterWithSubscriber).toBeGreaterThan(totalJobsBefore)
await new Promise((resolve) => setTimeout(resolve, 500))
expect(subscriberMock).not.toHaveBeenCalled()
const eventWithNoSubscribers = "test.event-without-subscribers"
expect(subscribersMap.get(eventWithNoSubscribers)).toBeUndefined()
const jobCountsBeforeNoSub = await queue.getJobCounts()
const totalJobsBeforeNoSub =
jobCountsBeforeNoSub.waiting + jobCountsBeforeNoSub.delayed
await eventBus.emit(
composeMessage(eventWithNoSubscribers, {
data: { test: "should-not-be-queued" },
object: "test",
source: "integration-test",
action: "created",
})
)
const jobCountsAfterNoSub = await queue.getJobCounts()
const totalJobsAfterNoSub =
jobCountsAfterNoSub.waiting + jobCountsAfterNoSub.delayed
expect(totalJobsAfterNoSub).toBe(totalJobsBeforeNoSub)
})
})
},
})

View File

@@ -16,6 +16,7 @@
"@medusajs/core-flows": "workspace:*",
"@medusajs/customer": "workspace:^",
"@medusajs/event-bus-local": "workspace:*",
"@medusajs/event-bus-redis": "workspace:*",
"@medusajs/framework": "workspace:*",
"@medusajs/fulfillment": "workspace:^",
"@medusajs/fulfillment-manual": "workspace:^",

View File

@@ -87,10 +87,6 @@ export abstract class AbstractEventBusModuleService
subscriber: EventBusTypes.Subscriber,
context?: EventBusTypes.SubscriberContext
): this {
if (!this.isWorkerMode) {
return this
}
if (typeof subscriber !== `function`) {
throw new Error("Subscriber must be a function")
}

View File

@@ -91,11 +91,12 @@ async function loadEntrypoints(
ContainerRegistrationKeys.CONFIG_MODULE
)
// Subscribers should be loaded no matter the worker mode, simply they will never handle anything
// since worker/shared instances only will have a running worker to process events.
await subscribersLoader(plugins, container)
if (shouldLoadBackgroundProcessors(configModule)) {
await promiseAll([
subscribersLoader(plugins, container),
jobsLoader(plugins, container),
])
await jobsLoader(plugins, container)
}
if (isWorkerMode(configModule)) {