feat(core, event-bus): Compensate emit event step utility (#13281)

* feat(core, event-bus): Compensate emit event step utility

* tests

* Update changeset to remove integration-tests-modules

Removed integration-tests-modules from changeset.

* revert test script
This commit is contained in:
Adrien de Peretti
2025-08-25 15:08:09 +02:00
committed by GitHub
parent 73a25abecb
commit fc49253273
11 changed files with 547 additions and 22 deletions

View File

@@ -0,0 +1,9 @@
---
"@medusajs/event-bus-local": patch
"@medusajs/event-bus-redis": patch
"@medusajs/core-flows": patch
"@medusajs/types": patch
"@medusajs/utils": patch
---
feat(core, event-bus): Compensate emit event step utility

View File

@@ -12,6 +12,7 @@ import { medusaIntegrationTestRunner } from "@medusajs/test-utils"
import {
ICartModuleService,
ICustomerModuleService,
IEventBusModuleService,
IFulfillmentModuleService,
IInventoryService,
IPaymentModuleService,
@@ -20,6 +21,7 @@ import {
IRegionModuleService,
ISalesChannelModuleService,
IStockLocationService,
Message,
} from "@medusajs/types"
import {
ContainerRegistrationKeys,
@@ -60,6 +62,7 @@ medusaIntegrationTestRunner({
let defaultRegion
let customer, storeHeadersWithCustomer
let setPricingContextHook: any
let eventBus: IEventBusModuleService
beforeAll(async () => {
appContainer = getContainer()
@@ -70,6 +73,7 @@ medusaIntegrationTestRunner({
productModule = appContainer.resolve(Modules.PRODUCT)
pricingModule = appContainer.resolve(Modules.PRICING)
paymentModule = appContainer.resolve(Modules.PAYMENT)
eventBus = appContainer.resolve(Modules.EVENT_BUS)
fulfillmentModule = appContainer.resolve(Modules.FULFILLMENT)
inventoryModule = appContainer.resolve(Modules.INVENTORY)
stockLocationModule = appContainer.resolve(Modules.STOCK_LOCATION)
@@ -747,6 +751,7 @@ medusaIntegrationTestRunner({
paymentSession.id
)
})
it("should complete cart when payment webhook and storefront are called in simultaneously", async () => {
const salesChannel = await scModuleService.createSalesChannels({
name: "Webshop",
@@ -876,6 +881,142 @@ medusaIntegrationTestRunner({
expect(fullOrder.payment_collections[0].captured_amount).toBe(3000)
expect(fullOrder.payment_collections[0].status).toBe("completed")
})
it("should clear events when complete cart fails after emitting events", async () => {
const salesChannel = await scModuleService.createSalesChannels({
name: "Webshop",
})
const location = await stockLocationModule.createStockLocations({
name: "Warehouse",
})
const region = await regionModuleService.createRegions({
name: "US",
currency_code: "usd",
})
let cart = await cartModuleService.createCarts({
currency_code: "usd",
sales_channel_id: salesChannel.id,
region_id: region.id,
})
await remoteLink.create([
{
[Modules.SALES_CHANNEL]: {
sales_channel_id: salesChannel.id,
},
[Modules.STOCK_LOCATION]: {
stock_location_id: location.id,
},
},
])
cart = await cartModuleService.retrieveCart(cart.id, {
select: ["id", "region_id", "currency_code", "sales_channel_id"],
})
await addToCartWorkflow(appContainer).run({
input: {
items: [
{
title: "Test item",
subtitle: "Test subtitle",
thumbnail: "some-url",
requires_shipping: false,
is_discountable: false,
is_tax_inclusive: false,
unit_price: 3000,
metadata: {
foo: "bar",
},
quantity: 1,
},
{
title: "zero price item",
subtitle: "zero price item",
thumbnail: "some-url",
requires_shipping: false,
is_discountable: false,
is_tax_inclusive: false,
unit_price: 0,
quantity: 1,
},
],
cart_id: cart.id,
},
})
cart = await cartModuleService.retrieveCart(cart.id, {
relations: ["items"],
})
await createPaymentCollectionForCartWorkflow(appContainer).run({
input: {
cart_id: cart.id,
},
})
const [paymentCollection] =
await paymentModule.listPaymentCollections({})
await createPaymentSessionsWorkflow(appContainer).run({
input: {
payment_collection_id: paymentCollection.id,
provider_id: "pp_system_default",
context: {},
data: {},
},
})
let grouppedEventBefore: Message[] = []
let eventGroupId!: string
/**
* Register order.placed listener to trigger the event group
* registration and be able to check the event group during
* the workflow execution against it after compensation
*/
eventBus.subscribe("order.placed", async () => {
// noop
})
const workflow = completeCartWorkflow(appContainer)
workflow.addAction("throw", {
invoke: async function failStep({ context }) {
eventGroupId = context!.eventGroupId!
grouppedEventBefore = (
(eventBus as any).groupedEventsMap_ as Map<string, any>
).get(context!.eventGroupId!)
throw new Error(
`Failed to do something before ending complete cart workflow`
)
},
})
const { errors } = await workflow.run({
input: {
id: cart.id,
},
throwOnError: false,
})
const grouppedEventAfter =
((eventBus as any).groupedEventsMap_ as Map<string, any>).get(
eventGroupId
) ?? []
expect(grouppedEventBefore).toHaveLength(1)
expect(grouppedEventAfter).toHaveLength(0) // events have been compensated
expect(errors[0].error.message).toBe(
"Failed to do something before ending complete cart workflow"
)
})
})
})
},

View File

@@ -5,6 +5,7 @@ import {
import { Modules } from "@medusajs/framework/utils"
import {
StepExecutionContext,
StepResponse,
createStep,
} from "@medusajs/framework/workflows-sdk"
@@ -85,6 +86,25 @@ export const emitEventStep = createStep(
}
await eventBus.emit(message)
return new StepResponse({
eventGroupId: context.eventGroupId,
eventName: input.eventName,
})
},
async (data: void) => {}
async (data, context) => {
if (!data || !data?.eventGroupId) {
return
}
const { container } = context
const eventBus: IEventBusModuleService = container.resolve(
Modules.EVENT_BUS
)
await eventBus.clearGroupedEvents(data!.eventGroupId, {
eventNames: [data!.eventName],
})
}
)

View File

@@ -3,14 +3,14 @@ import { Message, Subscriber, SubscriberContext } from "./common"
export interface IEventBusModuleService {
/**
* This method emits one or more events. Subscribers listening to the event(s) are executed asynchronously.
*
*
* @param data - The details of the events to emit.
* @param options - Additional options for the event.
*
*
* @example
* await eventModuleService.emit({
* name: "user.created",
* data: {
* await eventModuleService.emit({
* name: "user.created",
* data: {
* user_id: "user_123"
* }
* })
@@ -22,12 +22,12 @@ export interface IEventBusModuleService {
/**
* This method adds a subscriber to an event. It's mainly used internally to register subscribers.
*
*
* @param eventName - The name of the event to subscribe to.
* @param subscriber - The subscriber function to execute when the event is emitted.
* @param context - The context of the subscriber.
* @returns The instance of the Event Module
*
*
* @example
* eventModuleService.subscribe("user.created", async (data) => {
* console.log("User created", data)
@@ -41,12 +41,12 @@ export interface IEventBusModuleService {
/**
* This method removes a subscriber from an event. It's mainly used internally to unregister subscribers.
*
*
* @param eventName - The name of the event to unsubscribe from.
* @param subscriber - The subscriber function to remove.
* @param context - The context of the subscriber.
* @returns The instance of the Event Module
*
*
* @example
* eventModuleService.unsubscribe("user.created", async (data) => {
* console.log("User created", data)
@@ -61,9 +61,9 @@ export interface IEventBusModuleService {
/**
* This method emits all events in the specified group. Grouped events are useful when you have distributed transactions
* where you need to explicitly group, release and clear events upon lifecycle events of a transaction.
*
*
* @param eventGroupId - The ID of the event group.
*
*
* @example
* await eventModuleService.releaseGroupedEvents("group_123")
*/
@@ -71,11 +71,19 @@ export interface IEventBusModuleService {
/**
* This method removes all events in the specified group. Grouped events are useful when you have distributed transactions
* where you need to explicitly group, release and clear events upon lifecycle events of a transaction.
*
*
* @param eventGroupId - The ID of the event group.
*
* @param options - Additional options for the event.
* @param options.eventNames - The names of the events to clear. If not provided, The group will
* be entirely cleared.
*
* @example
* await eventModuleService.clearGroupedEvents("group_123")
*/
clearGroupedEvents(eventGroupId: string): Promise<void>
clearGroupedEvents(
eventGroupId: string,
options?: {
eventNames?: string[]
}
): Promise<void>
}

View File

@@ -38,8 +38,16 @@ export abstract class AbstractEventBusModuleService
*/
// Given a eventGroupId, all the grouped events will be released
abstract releaseGroupedEvents(eventGroupId: string): Promise<void>
// Given a eventGroupId, all the grouped events will be cleared
abstract clearGroupedEvents(eventGroupId: string): Promise<void>
// Given a eventGroupId, all the grouped events will be cleared unless eventNames are provided
// If eventNames are provided, only the events that match the eventNames will be cleared from the
// group
abstract clearGroupedEvents(
eventGroupId: string,
options?: {
eventNames?: string[]
}
): Promise<void>
protected storeSubscribers({
event,

View File

@@ -0,0 +1,141 @@
import {
CommonEvents,
composeMessage,
Modules,
} from "@medusajs/framework/utils"
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
import { IEventBusModuleService } from "@medusajs/types"
moduleIntegrationTestRunner<IEventBusModuleService>({
moduleName: Modules.EVENT_BUS,
testSuite: ({ service: eventBus }) => {
describe("Event Bus Local Service", () => {
it("should emit an event", async () => {
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
await eventBus.emit(
composeMessage("test", {
data: {
test: "test",
},
action: CommonEvents.CREATED,
source: "test",
object: "test",
})
)
expect(subscriber).toHaveBeenCalledWith({
data: {
test: "test",
},
metadata: {
source: "test",
object: "test",
action: "created",
},
name: "test",
})
eventBus.unsubscribe("test", subscriber)
})
it("should release grouped events", async () => {
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
await eventBus.emit(
composeMessage("test", {
data: {
test: "test",
},
context: {
eventGroupId: "123",
},
action: CommonEvents.CREATED,
source: "test",
object: "test",
})
)
expect(subscriber).toHaveBeenCalledTimes(0)
await eventBus.releaseGroupedEvents("123")
expect(subscriber).toHaveBeenCalledTimes(1)
expect(subscriber).toHaveBeenCalledWith({
data: {
test: "test",
},
metadata: {
source: "test",
eventGroupId: "123",
object: "test",
action: "created",
},
name: "test",
})
eventBus.unsubscribe("test", subscriber)
})
it("should clear grouped events", async () => {
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
await eventBus.emit(
composeMessage("test", {
data: {
test: "test",
},
context: {
eventGroupId: "123",
},
action: CommonEvents.CREATED,
source: "test",
object: "test",
})
)
expect(subscriber).toHaveBeenCalledTimes(0)
await eventBus.clearGroupedEvents("123")
await eventBus.releaseGroupedEvents("123")
expect(subscriber).toHaveBeenCalledTimes(0)
eventBus.unsubscribe("test", subscriber)
})
it("should clear grouped events with event names", async () => {
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
await eventBus.emit(
composeMessage("test", {
data: {
test: "test",
},
context: {
eventGroupId: "123",
},
action: CommonEvents.CREATED,
source: "test",
object: "test",
})
)
await eventBus.clearGroupedEvents("123", {
eventNames: ["test"],
})
await eventBus.releaseGroupedEvents("123")
expect(subscriber).toHaveBeenCalledTimes(0)
eventBus.unsubscribe("test", subscriber)
})
})
},
})

View File

@@ -33,7 +33,8 @@
"scripts": {
"watch": "tsc --build --watch",
"build": "rimraf dist && tsc --build",
"test": "jest --passWithNoTests"
"test": "jest --passWithNoTests",
"test:integration": "jest --no-cache --maxWorkers=50% --bail --detectOpenHandles --forceExit --logHeapUsage -- integration-tests/__tests__/**/*.spec.ts"
},
"dependencies": {
"ulid": "^2.3.0"

View File

@@ -130,8 +130,19 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
await this.clearGroupedEvents(eventGroupId)
}
async clearGroupedEvents(eventGroupId: string) {
this.groupedEventsMap_.delete(eventGroupId)
async clearGroupedEvents(
eventGroupId: string,
{ eventNames }: { eventNames?: string[] } = {}
) {
if (eventNames?.length) {
const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []
const eventsToKeep = groupedEvents.filter(
(event) => !eventNames!.includes(event.name)
)
this.groupedEventsMap_.set(eventGroupId, eventsToKeep)
} else {
this.groupedEventsMap_.delete(eventGroupId)
}
}
subscribe(

View File

@@ -0,0 +1,147 @@
import {
CommonEvents,
composeMessage,
Modules,
} from "@medusajs/framework/utils"
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
import { IEventBusModuleService } from "@medusajs/types"
moduleIntegrationTestRunner<IEventBusModuleService>({
moduleName: Modules.EVENT_BUS,
moduleOptions: {
redis: {
host: "localhost",
port: 6379,
},
},
testSuite: ({ service: eventBus }) => {
describe("Event Bus Redis Service", () => {
it("should emit an event", async () => {
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
await eventBus.emit(
composeMessage("test", {
data: {
test: "test",
},
action: CommonEvents.CREATED,
source: "test",
object: "test",
})
)
expect(subscriber).toHaveBeenCalledWith({
data: {
test: "test",
},
metadata: {
source: "test",
object: "test",
action: "created",
},
name: "test",
})
eventBus.unsubscribe("test", subscriber)
})
it("should release grouped events", async () => {
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
await eventBus.emit(
composeMessage("test", {
data: {
test: "test",
},
context: {
eventGroupId: "123",
},
action: CommonEvents.CREATED,
source: "test",
object: "test",
})
)
expect(subscriber).toHaveBeenCalledTimes(0)
await eventBus.releaseGroupedEvents("123")
expect(subscriber).toHaveBeenCalledTimes(1)
expect(subscriber).toHaveBeenCalledWith({
data: {
test: "test",
},
metadata: {
source: "test",
eventGroupId: "123",
object: "test",
action: "created",
},
name: "test",
})
eventBus.unsubscribe("test", subscriber)
})
it("should clear grouped events", async () => {
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
await eventBus.emit(
composeMessage("test", {
data: {
test: "test",
},
context: {
eventGroupId: "123",
},
action: CommonEvents.CREATED,
source: "test",
object: "test",
})
)
expect(subscriber).toHaveBeenCalledTimes(0)
await eventBus.clearGroupedEvents("123")
await eventBus.releaseGroupedEvents("123")
expect(subscriber).toHaveBeenCalledTimes(0)
eventBus.unsubscribe("test", subscriber)
})
it("should clear grouped events with event names", async () => {
const subscriber = jest.fn()
eventBus.subscribe("test", subscriber)
await eventBus.emit(
composeMessage("test", {
data: {
test: "test",
},
context: {
eventGroupId: "123",
},
action: CommonEvents.CREATED,
source: "test",
object: "test",
})
)
await eventBus.clearGroupedEvents("123", {
eventNames: ["test"],
})
await eventBus.releaseGroupedEvents("123")
expect(subscriber).toHaveBeenCalledTimes(0)
eventBus.unsubscribe("test", subscriber)
})
})
},
})

View File

@@ -33,7 +33,8 @@
"scripts": {
"watch": "tsc --build --watch",
"build": "rimraf dist && tsc --build",
"test": "jest --silent --bail --maxWorkers=50% --forceExit"
"test": "jest --silent --bail --maxWorkers=50% --forceExit",
"test:integration": "jest --no-cache --maxWorkers=50% --bail --detectOpenHandles --forceExit --logHeapUsage -- integration-tests/__tests__/**/*.spec.ts"
},
"dependencies": {
"bullmq": "5.13.0",

View File

@@ -218,11 +218,49 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
await this.clearGroupedEvents(eventGroupId)
}
async clearGroupedEvents(eventGroupId: string) {
async clearGroupedEvents(
eventGroupId: string,
{
eventNames,
}: {
eventNames?: string[]
} = {}
) {
if (!eventGroupId) {
return
}
if (eventNames?.length) {
/**
* If any event names are provided, we keep all events except the ones that match the event
* names. which allow to partially clear an event group.
*/
const eventsToKeep = await this.eventBusRedisConnection_
.lrange(`staging:${eventGroupId}`, 0, -1)
.then((result) => {
return result
.map((jsonString) => JSON.parse(jsonString))
.filter((event) => !eventNames.includes(event.name))
})
// Create a pipeline
const pipeline = this.eventBusRedisConnection_.pipeline()
// Empty the current list
pipeline.del(`staging:${eventGroupId}`)
// Add the remaining events to the list
pipeline.rpush(
`staging:${eventGroupId}`,
...eventsToKeep.map((event) => JSON.stringify(event))
)
await pipeline.exec()
return
}
await this.eventBusRedisConnection_.unlink(`staging:${eventGroupId}`)
}