chore(): Module Internal Events (#13296)

* chore(): Ensure the product module emits all necessary events

* chore(): Ensure the product module emits all necessary events

* Update events tests

* more events and fixes

* more tests and category fixes

* more tests and category fixes

* Add todo

* update updateProduct_ event emitting and adjust test

* Adjust update products implementation to rely on already computed events

* rm unnecessary update variants events

* Fix formatting in changeset for product events

* refactor: Manage event emitting automatically (WIP)

* refactor: Manage event emitting automatically (WIP)

* chore(api-key): Add missing emit events and refactoring

* chore(cart): Add missing emit events and refactoring

* chore(customer): Add missing emit events and refactoring

* chore(fufillment, utils): Add missing emit events and refactoring

* chore(fufillment, utils): Add missing emit events and refactoring

* chore(inventory): Add missing emit events and refactoring

* chore(notification): Add missing emit events and refactoring

* chore(utils): Remove medusa service event handling legacy

* chore(product): Add missing emit events and refactoring

* chore(order): Add missing emit events and refactoring

* chore(payment): Add missing emit events and refactoring

* chore(pricing, util): Add missing emit events and refactoring, fix internal service upsertWithReplace event dispatching

* chore(promotions): Add missing emit events and refactoring

* chore(region): Add missing emit events and refactoring

* chore(sales-channel): Add missing emit events and refactoring

* chore(settings): Add missing emit events and refactoring

* chore(stock-location): Add missing emit events and refactoring

* chore(store): Add missing emit events and refactoring

* chore(taxes): Add missing emit events and refactoring

* chore(user): Add missing emit events and refactoring

* fix unit tests

* rm changeset for regeneration

* Create changeset for Medusa.js patch updates

Add a changeset for patch updates to multiple Medusa.js modules.

* rm unused product event builders

* address feedback

* remove old changeset

* fix event action for token generated

* fix user module events

* fix import

* fix promotion events

* add new module integration tests shard

* fix medusa service

* revert shard

* fix event action

* fix pipeline

* fix pipeline

---------

Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2025-09-10 14:37:38 +02:00
committed by GitHub
parent afe21741c4
commit e8822f3e69
55 changed files with 3614 additions and 2353 deletions

View File

@@ -9,12 +9,13 @@ import {
import { composeMessage } from "./build-event-messages"
export class MessageAggregator implements IMessageAggregator {
private messages: Message[] = []
#messagesHash: Set<string> = new Set()
#messages: Message[] = []
constructor() {}
count(): number {
return this.messages.length
return this.#messages.length
}
save(msg: Message | Message[]): void {
@@ -23,7 +24,19 @@ export class MessageAggregator implements IMessageAggregator {
return
}
this.messages.push(...messages)
for (const message of messages) {
try {
const hash = JSON.stringify(message)
if (!this.#messagesHash.has(hash)) {
this.#messagesHash.add(hash)
this.#messages.push(message)
}
} catch (e) {
// noop: if the message is not serializable, we don't want to deduplicate it
// It should not happen, but we don't want to fail the whole process
this.#messages.push(message)
}
}
}
saveRawMessageData<T>(
@@ -55,15 +68,15 @@ export class MessageAggregator implements IMessageAggregator {
const { groupBy, sortBy } = format ?? {}
if (sortBy) {
this.messages.sort((a, b) => this.compareMessages(a, b, sortBy))
this.#messages.sort((a, b) => this.compareMessages(a, b, sortBy))
}
let messages: { [group: string]: Message[] } = {
default: [...this.messages],
default: [...this.#messages],
}
if (groupBy) {
messages = this.messages.reduce<{
messages = this.#messages.reduce<{
[key: string]: Message[]
}>((acc, msg) => {
const key = groupBy
@@ -91,7 +104,8 @@ export class MessageAggregator implements IMessageAggregator {
clearMessages(): void {
// Ensure no references are left over in case something rely on messages
this.messages.length = 0
this.#messages.length = 0
this.#messagesHash.clear()
}
private getValueFromPath(obj: any, path: string): any {

View File

@@ -20,7 +20,7 @@ describe("Internal Module Service Factory", () => {
findAndCount: jest.fn(),
create: jest.fn(),
update: jest.fn(),
delete: jest.fn(),
delete: jest.fn().mockImplementation((ids) => Promise.resolve(ids)),
softDelete: jest.fn(),
restore: jest.fn(),
upsert: jest.fn(),
@@ -32,7 +32,7 @@ describe("Internal Module Service Factory", () => {
findAndCount: jest.fn(),
create: jest.fn(),
update: jest.fn(),
delete: jest.fn(),
delete: jest.fn().mockImplementation((ids) => Promise.resolve(ids)),
softDelete: jest.fn(),
restore: jest.fn(),
upsert: jest.fn(),

View File

@@ -0,0 +1,71 @@
import { Context } from "@medusajs/types"
import { EventArgs, EventSubscriber } from "@mikro-orm/core"
type Service = {
interceptEntityMutationEvents: (
event: "afterCreate" | "afterUpdate" | "afterUpsert" | "afterDelete",
args: EventArgs<any>,
context: Context
) => void
}
export type MedusaMikroOrmEventSubscriber = {
new (context: Context): EventSubscriber
}
/**
* Build a new mikro orm event subscriber for the given models
* @param models
* @returns
*/
export function createMedusaMikroOrmEventSubscriber(
keys: string[],
service: Service
): MedusaMikroOrmEventSubscriber {
const klass = class MikroOrmEventSubscriber implements EventSubscriber {
#context: Context
#service: Service = service
constructor(context: Context) {
this.#context = context
}
async afterCreate<T>(args: EventArgs<T>): Promise<void> {
this.#service.interceptEntityMutationEvents(
"afterCreate",
args,
this.#context
)
}
async afterUpdate<T>(args: EventArgs<T>): Promise<void> {
this.#service.interceptEntityMutationEvents(
"afterUpdate",
args,
this.#context
)
}
async afterUpsert<T>(args: EventArgs<T>): Promise<void> {
this.#service.interceptEntityMutationEvents(
"afterUpsert",
args,
this.#context
)
}
async afterDelete<T>(args: EventArgs<T>): Promise<void> {
this.#service.interceptEntityMutationEvents(
"afterDelete",
args,
this.#context
)
}
}
Object.defineProperty(klass, "name", {
value: keys.join(","),
writable: false,
})
return klass
}

View File

@@ -21,3 +21,4 @@ export * from "./types/links-config"
export * from "./types/medusa-service"
export * from "./module-provider-registration-key"
export * from "./modules-to-container-types"
export * from "./create-medusa-mikro-orm-event-subscriber"

View File

@@ -9,7 +9,12 @@ import {
PerformedActions,
UpsertWithReplaceConfig,
} from "@medusajs/types"
import type { EntityClass, EntitySchema } from "@mikro-orm/core"
import {
EventType,
type EntityClass,
type EntityManager,
type EntitySchema,
} from "@mikro-orm/core"
import {
isDefined,
isObject,
@@ -27,23 +32,60 @@ import {
InjectTransactionManager,
MedusaContext,
} from "./decorators"
import { MedusaMikroOrmEventSubscriber } from "./create-medusa-mikro-orm-event-subscriber"
type InternalService = {
new <TContainer extends object = object, TEntity extends object = any>(
container: TContainer
): ModulesSdkTypes.IMedusaInternalService<TEntity, TContainer>
setEventSubscriber(subscriber: MedusaMikroOrmEventSubscriber): void
}
type SelectorAndData = {
selector: FilterQuery<any> | BaseFilterable<FilterQuery<any>>
data: any
}
export function registerInternalServiceEventSubscriber(
context: Context,
subscriber?: MedusaMikroOrmEventSubscriber
) {
const manager = (context.transactionManager ??
context.manager) as EntityManager
if (manager && subscriber) {
const subscriberInstance = new subscriber(context)
// There is no public API to unregister subscribers or check if a subscriber is already
// registered. This means that we need to manually check if the subscriber is already
// registered, otherwise we will register the same subscriber twice.
const hasListeners = (manager.getEventManager() as any).subscribers.some(
(s) => s.constructor.name === subscriberInstance.constructor.name
)
if (!hasListeners) {
manager.getEventManager().registerSubscriber(subscriberInstance)
}
}
}
export const MedusaInternalServiceSymbol = Symbol.for(
"MedusaInternalServiceSymbol"
)
/**
* Check if a value is a Medusa internal service
* @param value
*/
export function isMedusaInternalService(value: any): value is InternalService {
return (
!!value?.[MedusaInternalServiceSymbol] ||
!!value?.prototype?.[MedusaInternalServiceSymbol]
)
}
export function MedusaInternalService<
TContainer extends object = object,
TEntity extends object = any
>(
rawModel: any
): {
new (container: TContainer): ModulesSdkTypes.IMedusaInternalService<
TEntity,
TContainer
>
} {
>(rawModel: any): InternalService {
const model = DmlEntity.isDmlEntity(rawModel)
? toMikroORMEntity(rawModel)
: rawModel
@@ -54,6 +96,10 @@ export function MedusaInternalService<
class AbstractService_
implements ModulesSdkTypes.IMedusaInternalService<TEntity, TContainer>
{
[MedusaInternalServiceSymbol] = true
#eventSubscriber?: MedusaMikroOrmEventSubscriber
readonly __container__: TContainer;
[key: string]: any
@@ -62,6 +108,10 @@ export function MedusaInternalService<
this[propertyRepositoryName] = container[injectedRepositoryName]
}
setEventSubscriber(subscriber: MedusaMikroOrmEventSubscriber) {
this.#eventSubscriber = subscriber
}
static applyFreeTextSearchFilter(
filters: FilterQuery & { q?: string },
config: FindConfig<any>
@@ -223,6 +273,11 @@ export function MedusaInternalService<
| InferEntityType<TEntity>[]
}
registerInternalServiceEventSubscriber(
sharedContext,
this.#eventSubscriber
)
const data_ = Array.isArray(data) ? data : [data]
const entities = await this[propertyRepositoryName].create(
data_,
@@ -260,6 +315,11 @@ export function MedusaInternalService<
| InferEntityType<TEntity>[]
}
registerInternalServiceEventSubscriber(
sharedContext,
this.#eventSubscriber
)
let shouldReturnArray = false
if (Array.isArray(input)) {
shouldReturnArray = true
@@ -419,6 +479,11 @@ export function MedusaInternalService<
return []
}
registerInternalServiceEventSubscriber(
sharedContext,
this.#eventSubscriber
)
const primaryKeys = AbstractService_.retrievePrimaryKeys(model)
if (
@@ -467,10 +532,29 @@ export function MedusaInternalService<
return []
}
return await this[propertyRepositoryName].delete(
const deletedIds = await this[propertyRepositoryName].delete(
deleteCriteria,
sharedContext
)
// Delete are handled a bit differently since we are going to the DB directly, therefore
// just like upsert with replace, we need to dispatch the events manually.
if (deletedIds.length) {
const manager = (sharedContext.transactionManager ??
sharedContext.manager) as EntityManager
const eventManager = manager.getEventManager()
deletedIds.forEach((id) => {
eventManager.dispatchEvent(EventType.afterDelete, {
entity: { id },
meta: {
className: model.name,
} as Parameters<typeof eventManager.dispatchEvent>[2],
})
})
}
return deletedIds
}
@InjectTransactionManager(propertyRepositoryName)
@@ -489,6 +573,11 @@ export function MedusaInternalService<
return [[], {}]
}
registerInternalServiceEventSubscriber(
sharedContext,
this.#eventSubscriber
)
return await this[propertyRepositoryName].softDelete(
idsOrFilter,
sharedContext
@@ -520,6 +609,11 @@ export function MedusaInternalService<
data: any | any[],
@MedusaContext() sharedContext: Context = {}
): Promise<InferEntityType<TEntity> | InferEntityType<TEntity>[]> {
registerInternalServiceEventSubscriber(
sharedContext,
this.#eventSubscriber
)
const data_ = Array.isArray(data) ? data : [data]
const entities = await this[propertyRepositoryName].upsert(
data_,
@@ -556,10 +650,74 @@ export function MedusaInternalService<
entities: InferEntityType<TEntity> | InferEntityType<TEntity>[]
performedActions: PerformedActions
}> {
registerInternalServiceEventSubscriber(
sharedContext,
this.#eventSubscriber
)
const data_ = Array.isArray(data) ? data : [data]
const { entities, performedActions } = await this[
propertyRepositoryName
].upsertWithReplace(data_, config, sharedContext)
const manager = (sharedContext.transactionManager ??
sharedContext.manager) as EntityManager
const eventManager = manager.getEventManager()
/**
* Since the upsertWithReplace method is not emitting events, we need to do it manually
* by dispatching the events manually.
*/
const createdEntities = !!Object.keys(performedActions.created).length
const updatedEntities = !!Object.keys(performedActions.updated).length
const deletedEntities = !!Object.keys(performedActions.deleted).length
if (createdEntities) {
Object.entries(
performedActions.created as Record<string, any[]>
).forEach(([modelName, entities]) => {
entities.forEach((entity) => {
eventManager.dispatchEvent(EventType.afterCreate, {
entity,
meta: {
className: modelName,
} as Parameters<typeof eventManager.dispatchEvent>[2],
})
})
})
}
if (updatedEntities) {
Object.entries(
performedActions.updated as Record<string, any[]>
).forEach(([modelName, entities]) => {
entities.forEach((entity) => {
eventManager.dispatchEvent(EventType.afterUpdate, {
entity,
meta: {
className: modelName,
} as Parameters<typeof eventManager.dispatchEvent>[2],
})
})
})
}
if (deletedEntities) {
Object.entries(
performedActions.deleted as Record<string, any[]>
).forEach(([modelName, entities]) => {
entities.forEach((entity) => {
eventManager.dispatchEvent(EventType.afterDelete, {
entity,
meta: {
className: modelName,
} as Parameters<typeof eventManager.dispatchEvent>[2],
})
})
})
}
return {
entities: Array.isArray(data) ? entities : entities[0],
performedActions,
@@ -567,5 +725,7 @@ export function MedusaInternalService<
}
}
return AbstractService_ as any
// We hide away the setEventSubscriber method from the public interface
// as it is not meant to be used by the user.
return AbstractService_ as unknown as InternalService
}

View File

@@ -10,9 +10,9 @@ import {
RestoreReturn,
SoftDeleteReturn,
} from "@medusajs/types"
import { EventArgs } from "@mikro-orm/core"
import {
camelToSnakeCase,
isString,
lowerCaseFirst,
mapObjectTo,
MapToConfig,
@@ -21,10 +21,12 @@ import {
} from "../common"
import { DmlEntity } from "../dml"
import { CommonEvents } from "../event-bus"
import { createMedusaMikroOrmEventSubscriber } from "./create-medusa-mikro-orm-event-subscriber"
import { EmitEvents, InjectManager, MedusaContext } from "./decorators"
import { Modules } from "./definition"
import { moduleEventBuilderFactory } from "./event-builder-factory"
import { buildModelsNameToLinkableKeysMap } from "./joiner-config-builder"
import { isMedusaInternalService } from "./medusa-internal-service"
import {
BaseMethods,
ExtractKeysFromConfig,
@@ -137,37 +139,6 @@ export function MedusaService<
? ModelConfigurationsToConfigTemplate<TModels>
: ModelsConfig
> {
function emitSoftDeleteRestoreEvents(
this: AbstractModuleService_,
klassPrototype: any,
cascadedModelsMap: Record<string, string[]>,
action: string,
sharedContext: Context
) {
const joinerConfig = (
typeof this.__joinerConfig === "function"
? this.__joinerConfig()
: this.__joinerConfig
) as ModuleJoinerConfig
const emittedEntities = new Set<string>()
Object.entries(cascadedModelsMap).forEach(([linkableKey, ids]) => {
const entity = joinerConfig.linkableKeys?.[linkableKey]!
if (entity && !emittedEntities.has(entity)) {
emittedEntities.add(entity)
const linkableKeyEntity = camelToSnakeCase(entity).toLowerCase()
klassPrototype.aggregatedEvents.bind(this)({
action: action,
object: linkableKeyEntity,
data: { id: ids },
context: sharedContext,
})
}
})
}
const buildAndAssignMethodImpl = function (
klassPrototype: any,
method: string,
@@ -220,16 +191,9 @@ export function MedusaService<
sharedContext: Context = {}
): Promise<T | T[]> {
const service = this.__container__[serviceRegistrationName]
const models = await service.create(data, sharedContext)
const models_ = await service.create(data, sharedContext)
klassPrototype.aggregatedEvents.bind(this)({
action: CommonEvents.CREATED,
object: camelToSnakeCase(modelName).toLowerCase(),
data: models,
context: sharedContext,
})
return await this.baseRepository_.serialize<T | T[]>(models)
return await this.baseRepository_.serialize<T | T[]>(models_)
}
applyMethod(methodImplementation, 1)
@@ -244,13 +208,6 @@ export function MedusaService<
const service = this.__container__[serviceRegistrationName]
const response = await service.update(data, sharedContext)
klassPrototype.aggregatedEvents.bind(this)({
action: CommonEvents.UPDATED,
object: camelToSnakeCase(modelName).toLowerCase(),
data: response,
context: sharedContext,
})
return await this.baseRepository_.serialize<T | T[]>(response)
}
@@ -300,19 +257,10 @@ export function MedusaService<
? primaryKeyValues
: [primaryKeyValues]
const ids = await this.__container__[serviceRegistrationName].delete(
await this.__container__[serviceRegistrationName].delete(
primaryKeyValues_,
sharedContext
)
ids.map((id) =>
klassPrototype.aggregatedEvents.bind(this)({
action: CommonEvents.DELETED,
object: camelToSnakeCase(modelName).toLowerCase(),
data: isString(id) ? { id: id } : id,
context: sharedContext,
})
)
}
applyMethod(methodImplementation, 1)
@@ -343,15 +291,6 @@ export function MedusaService<
}
)
if (mappedCascadedModelsMap) {
emitSoftDeleteRestoreEvents.bind(this)(
klassPrototype,
mappedCascadedModelsMap,
CommonEvents.DELETED,
sharedContext
)
}
return mappedCascadedModelsMap ? mappedCascadedModelsMap : void 0
}
@@ -384,15 +323,6 @@ export function MedusaService<
}
)
if (mappedCascadedModelsMap) {
emitSoftDeleteRestoreEvents.bind(this)(
klassPrototype,
mappedCascadedModelsMap,
CommonEvents.CREATED,
sharedContext
)
}
return mappedCascadedModelsMap ? mappedCascadedModelsMap : void 0
}
@@ -424,6 +354,41 @@ export function MedusaService<
this.__container__ = container
this.baseRepository_ = container.baseRepository
const joinerConfig = this.__joinerConfig?.()
/**
* Create a global subscriber to listen to all the entities mutations
* and forward them to the module service interceptEntityMutationEvents
* method.
*
* Assign the global subscriber to all internal services or class that extends it
* such that it can attach it accordingly and forward the events to the module service.
*/
if (joinerConfig?.serviceName !== "index") {
let globalSubscriber!: ReturnType<
typeof createMedusaMikroOrmEventSubscriber
>
Object.keys(container)
.filter((key) => {
return key.endsWith("Service")
})
.forEach((key: string) => {
globalSubscriber ??= createMedusaMikroOrmEventSubscriber(
["__medusa_entities_subscriber__"],
this
)
try {
const service = container[key]
if (isMedusaInternalService(service)) {
service.setEventSubscriber(globalSubscriber)
}
} catch (error) {
// Prevent circular issue which in that case would represent ourselves so we can skip
}
})
}
const hasEventBusModuleService = Object.keys(this.__container__).find(
(key) => key === Modules.EVENT_BUS
)
@@ -433,9 +398,72 @@ export function MedusaService<
: undefined
this[MedusaServiceModelNameToLinkableKeysMapSymbol] =
buildModelsNameToLinkableKeysMap(
this.__joinerConfig?.()?.linkableKeys ?? {}
)
buildModelsNameToLinkableKeysMap(joinerConfig?.linkableKeys ?? {})
}
/**
* @internal this method is meant to react to any event the orm might emit
* when an entity is being mutated (created, updated, deleted).
* The default implementation will handle all event to be emitted as part
* of the message aggregator from the context.
*
* If you want to handle the event differently, you can override this method.
*
* @example
*
* class MyService extends ModulesSdkUtils.MedusaService(models) {
* interceptEntityMutationEvents(event: "afterCreate" | "afterUpdate" | "afterUpsert" | "afterDelete", args: EventArgs<any>, context: Context) {
* console.log("interceptEntityMutationEvents", event, args.entity)
* }
* }
*
* @param event - The event type
* @param args - The event arguments
* @param context - The context
*/
interceptEntityMutationEvents(
event: "afterCreate" | "afterUpdate" | "afterUpsert" | "afterDelete",
args: EventArgs<any>,
context: Context
) {
let action = ""
switch (event) {
case "afterCreate":
action = CommonEvents.CREATED
break
case "afterUpdate":
const isSoftDeleted =
!!args.changeSet?.entity.deleted_at &&
!args.changeSet?.originalEntity?.deleted_at
const isRestored =
!!args.changeSet?.originalEntity?.deleted_at &&
!args.changeSet?.entity.deleted_at
action = CommonEvents.UPDATED
if (isSoftDeleted) {
action = CommonEvents.DELETED
}
if (isRestored) {
action = CommonEvents.RESTORED
}
break
case "afterDelete":
action = CommonEvents.DELETED
break
}
const object = camelToSnakeCase(args.meta.className).toLowerCase()
this.aggregatedEvents({
action,
object,
data: { id: args.entity.id },
context,
})
}
/**
@@ -463,6 +491,10 @@ export function MedusaService<
data: { id: any } | { id: any }[]
context: Context
}) {
if (!context.messageAggregator) {
return
}
const __joinerConfig = (
typeof this.__joinerConfig === "function"
? this.__joinerConfig()

View File

@@ -3,13 +3,14 @@ import {
Context,
FindConfig,
IDmlEntity,
InferEntityForModuleService,
InferEntityType,
Pluralize,
Prettify,
RestoreReturn,
SoftDeleteReturn,
InferEntityType,
InferEntityForModuleService,
} from "@medusajs/types"
import { EventArgs } from "@mikro-orm/core"
import { DmlEntity } from "../../dml"
export type BaseMethods =
@@ -318,4 +319,22 @@ export type MedusaServiceReturnType<ModelsConfig extends Record<string, any>> =
data: { id: any } | { id: any }[]
context: Context
}): void
/**
* this method is meant to react to any event the orm might emit
* when an entity is being mutated (created, updated, deleted).
* The default implementation will handle all event to be emitted as part
* of the message aggregator from the context.
*
* If you want to handle the event differently, you can override this method.
*
* @param event - The event type
* @param args - The event arguments
* @param context - The context
*/
interceptEntityMutationEvents(
event: "afterCreate" | "afterUpdate" | "afterUpsert" | "afterDelete",
args: EventArgs<any>,
context: Context
): void
}