chore(): Emit events in batch and index process event ids in batch (#12097)

**What**
First iteration to prevent events from overwhelming the systems.
- Group emitted event ids when possible instead of creating a message per id which leads to reduced amount of events to process massively in cases of import for example
- Update the index engine to process event data in batches of 100
- Update event handling by the index engine to be able to upsert by batch as well
- Fix index engine build config for intermediate listeners inferrence
This commit is contained in:
Adrien de Peretti
2025-04-08 18:57:08 +02:00
committed by GitHub
parent b05807bfc1
commit 74381addc3
21 changed files with 548 additions and 463 deletions
+12
View File
@@ -0,0 +1,12 @@
---
"@medusajs/fulfillment": patch
"@medusajs/index": patch
"@medusajs/inventory": patch
"@medusajs/link-modules": patch
"@medusajs/pricing": patch
"@medusajs/user": patch
"@medusajs/types": patch
"@medusajs/utils": patch
---
chore(): Emit events in batch and index process event ids in batch
+3 -3
View File
@@ -18,7 +18,7 @@ export type EventMetadata = Record<string, unknown> & {
/**
* The ID of the event's group. Grouped events are useful when you have distributed transactions
* where you need to explicitly group, release and clear events upon lifecycle events of a transaction.
*
*
* When set, you must release the grouped events using the Event Module's `releaseGroupedEvents` method to emit the events.
*/
eventGroupId?: string
@@ -27,7 +27,7 @@ export type EventMetadata = Record<string, unknown> & {
export type Event<TData = unknown> = {
/**
* The event's name.
*
*
* @example
* user.created
*/
@@ -37,7 +37,7 @@ export type Event<TData = unknown> = {
*/
metadata?: EventMetadata
/**
* The data payload that subscribers receive. For example, the ID of the created user.
* The data payload that subscribers receive. For example, the ID or IDs of the created user. (e.g. { id: "123" } or { ids: ["123", "456"] })
*/
data: TData
}
+2 -1
View File
@@ -44,7 +44,8 @@ export function buildModuleResourceEventName({
action: string
}): string {
const kebabCaseName = lowerCaseFirst(kebabCase(objectName))
return `${prefix ? `${prefix}.` : ""}${kebabCaseName}.${action}`
const conventionalePrefix = prefix && lowerCaseFirst(kebabCase(prefix))
return `${prefix ? `${conventionalePrefix}.` : ""}${kebabCaseName}.${action}`
}
/**
@@ -60,15 +60,15 @@ export function moduleEventBuilderFactory({
})
}
data.forEach((dataItem) => {
messages.push({
source,
action,
context: sharedContext,
data: { id: dataItem.id },
eventName: eventName!,
object,
})
messages.push({
source,
action,
context: sharedContext,
data: {
id: data.length === 1 ? data[0].id : data.map((item) => item.id),
},
eventName: eventName!,
object,
})
aggregator.saveRawMessageData(messages)
@@ -218,7 +218,9 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.FULFILLMENT_SET_CREATED,
action: "created",
object: "fulfillment_set",
data: { id: fulfillmentSets[i].id },
data: {
id: expect.arrayContaining([fulfillmentSets[i].id]),
},
}),
]),
{
@@ -335,13 +337,19 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.FULFILLMENT_SET_CREATED,
action: "created",
object: "fulfillment_set",
data: { id: fulfillmentSets[i].id },
data: {
id: expect.arrayContaining([fulfillmentSets[i].id]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.SERVICE_ZONE_CREATED,
action: "created",
object: "service_zone",
data: { id: fulfillmentSets[i].service_zones[0].id },
data: {
id: expect.arrayContaining([
fulfillmentSets[i].service_zones[0].id,
]),
},
}),
]),
{
@@ -505,20 +513,28 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.FULFILLMENT_SET_CREATED,
action: "created",
object: "fulfillment_set",
data: { id: fulfillmentSets[i].id },
data: {
id: expect.arrayContaining([fulfillmentSets[i].id]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.SERVICE_ZONE_CREATED,
action: "created",
object: "service_zone",
data: { id: fulfillmentSets[i].service_zones[0].id },
data: {
id: expect.arrayContaining([
fulfillmentSets[i].service_zones[0].id,
]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.GEO_ZONE_CREATED,
action: "created",
object: "geo_zone",
data: {
id: fulfillmentSets[i].service_zones[0].geo_zones[0].id,
id: expect.arrayContaining([
fulfillmentSets[i].service_zones[0].geo_zones[0].id,
]),
},
}),
]),
@@ -708,7 +724,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
})
expect(updatedFulfillmentSets).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(1)
for (const data_ of updateData) {
const currentFullfillmentSet = fullfillmentSets.find(
@@ -729,7 +745,9 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.FULFILLMENT_SET_UPDATED,
action: "updated",
object: "fulfillment_set",
data: { id: currentFullfillmentSet.id },
data: {
id: expect.arrayContaining([currentFullfillmentSet.id]),
},
}),
]),
{
@@ -1057,7 +1075,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
)
expect(updatedFulfillmentSets).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(10)
expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(5)
for (const data_ of updateData) {
const expectedFulfillmentSet = updatedFulfillmentSets.find(
@@ -1097,36 +1115,48 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.FULFILLMENT_SET_UPDATED,
action: "updated",
object: "fulfillment_set",
data: { id: expectedFulfillmentSet.id },
data: {
id: expect.arrayContaining([expectedFulfillmentSet.id]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.SERVICE_ZONE_CREATED,
action: "created",
object: "service_zone",
data: { id: expectedFulfillmentSet.service_zones[0].id },
data: {
id: expect.arrayContaining([
expectedFulfillmentSet.service_zones[0].id,
]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.GEO_ZONE_CREATED,
action: "created",
object: "geo_zone",
data: {
id: expectedFulfillmentSet.service_zones[0].geo_zones[0]
.id,
id: expect.arrayContaining([
expectedFulfillmentSet.service_zones[0].geo_zones[0].id,
]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.SERVICE_ZONE_DELETED,
action: "deleted",
object: "service_zone",
data: { id: originalFulfillmentSet.service_zones[0].id },
data: {
id: expect.arrayContaining([
originalFulfillmentSet.service_zones[0].id,
]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.GEO_ZONE_DELETED,
action: "deleted",
object: "geo_zone",
data: {
id: originalFulfillmentSet.service_zones[0].geo_zones[0]
.id,
id: expect.arrayContaining([
originalFulfillmentSet.service_zones[0].geo_zones[0].id,
]),
},
}),
]),
@@ -1215,7 +1245,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
)
expect(updatedFulfillmentSets).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(6)
expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(3)
for (const data_ of updateData) {
const expectedFulfillmentSet = updatedFulfillmentSets.find(
@@ -1259,20 +1289,26 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.FULFILLMENT_SET_UPDATED,
action: "updated",
object: "fulfillment_set",
data: { id: expectedFulfillmentSet.id },
data: {
id: expect.arrayContaining([expectedFulfillmentSet.id]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.SERVICE_ZONE_CREATED,
action: "created",
object: "service_zone",
data: { id: createdServiceZone.id },
data: {
id: expect.arrayContaining([createdServiceZone.id]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.GEO_ZONE_CREATED,
action: "created",
object: "geo_zone",
data: {
id: createdServiceZone.geo_zones[0].id,
id: expect.arrayContaining([
createdServiceZone.geo_zones[0].id,
]),
},
}),
]),
@@ -154,7 +154,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
const geoZones = await service.createGeoZones(data)
expect(geoZones).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(1)
let i = 0
for (const data_ of data) {
@@ -172,7 +172,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.GEO_ZONE_CREATED,
action: "created",
object: "geo_zone",
data: { id: geoZones[i].id },
data: { id: expect.arrayContaining([geoZones[i].id]) },
}),
]),
{
@@ -331,7 +331,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
const updatedGeoZones = await service.updateGeoZones(updateData)
expect(updatedGeoZones).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(1)
for (const data_ of updateData) {
const expectedGeoZone = updatedGeoZones.find(
@@ -352,7 +352,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.GEO_ZONE_UPDATED,
action: "updated",
object: "geo_zone",
data: { id: expectedGeoZone.id },
data: { id: expect.arrayContaining([expectedGeoZone.id]) },
}),
]),
{
@@ -510,7 +510,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
expect(updatedServiceZones).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(6) // Since the update only calls create and update which are already tested, only check the length
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(3) // Since the update only calls create and update which are already tested, only check the length
for (const data_ of updateData) {
const expectedServiceZone = updatedServiceZones.find(
@@ -582,7 +582,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
)
expect(createdShippingOptions).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(6)
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(3)
let i = 0
for (const data_ of createData) {
@@ -619,19 +619,31 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.SHIPPING_OPTION_CREATED,
action: "created",
object: "shipping_option",
data: { id: createdShippingOptions[i].id },
data: {
id: expect.arrayContaining([
createdShippingOptions[i].id,
]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.SHIPPING_OPTION_TYPE_CREATED,
action: "created",
object: "shipping_option_type",
data: { id: createdShippingOptions[i].type.id },
data: {
id: expect.arrayContaining([
createdShippingOptions[i].type.id,
]),
},
}),
buildExpectedEventMessageShape({
eventName: FulfillmentEvents.SHIPPING_OPTION_RULE_CREATED,
action: "created",
object: "shipping_option_rule",
data: { id: createdShippingOptions[i].rules[0].id },
data: {
id: expect.arrayContaining([
createdShippingOptions[i].rules[0].id,
]),
},
}),
]),
{
@@ -100,7 +100,7 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
await service.createShippingProfiles(createData)
expect(createdShippingProfiles).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(2)
expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(1)
let i = 0
for (const data_ of createData) {
@@ -117,7 +117,11 @@ moduleIntegrationTestRunner<IFulfillmentModuleService>({
eventName: FulfillmentEvents.SHIPPING_PROFILE_CREATED,
action: "created",
object: "shipping_profile",
data: { id: createdShippingProfiles[i].id },
data: {
id: expect.arrayContaining([
createdShippingProfiles[i].id,
]),
},
}),
]),
{
@@ -19,7 +19,7 @@ import { asValue } from "awilix"
import * as path from "path"
import { setTimeout } from "timers/promises"
import { EventBusServiceMock } from "../__fixtures__"
import { dbName } from "../__fixtures__/medusa-config"
import config, { dbName } from "../__fixtures__/medusa-config"
const eventBusMock = new EventBusServiceMock()
const queryMock = {
@@ -147,14 +147,15 @@ describe("DataSynchronizer", () => {
// Mock query response for products
queryMock.graph.mockImplementation(async (config) => {
if (Array.isArray(config.filters.id)) {
let data: any[] = []
if (config.filters.id.includes(testProductId)) {
return {
data: [mockData[0]],
}
data.push(mockData[0])
} else if (config.filters.id.includes(testProductId2)) {
return {
data: [mockData[1]],
}
data.push(mockData[1])
}
return {
data,
}
}
@@ -141,6 +141,7 @@ const beforeAll_ = async () => {
;(index as any).eventBusModuleService_ = eventBusMock
await globalApp.onApplicationStart()
await setTimeout(3000)
;(index as any).storageProvider_.query_ = queryMock
return globalApp
@@ -196,6 +197,8 @@ describe("IndexModuleService", function () {
await dbUtils.shutdown(dbName)
})
afterEach(afterEach_)
describe("on created or attached events", function () {
let manager
@@ -222,7 +225,7 @@ describe("IndexModuleService", function () {
},
},
{
name: "PriceSet.created",
name: "pricing.price-set.created",
data: {
id: priceSetId,
},
@@ -247,7 +250,6 @@ describe("IndexModuleService", function () {
]
beforeEach(async () => {
await setTimeout(1000)
await beforeEach_(eventDataToEmit)
manager = (medusaApp.sharedContainer!.resolve(Modules.INDEX) as any)
@@ -392,7 +394,7 @@ describe("IndexModuleService", function () {
},
},
{
name: "PriceSet.created",
name: "pricing.price-set.created",
data: {
id: priceSetId,
},
@@ -689,7 +691,7 @@ describe("IndexModuleService", function () {
},
},
{
name: "PriceSet.created",
name: "pricing.price-set.created",
data: {
id: priceSetId,
},
@@ -761,8 +763,6 @@ describe("IndexModuleService", function () {
await eventBusMock.emit(deleteEventDataToEmit)
})
afterEach(afterEach_)
it("should consume all deleted events and delete the index entries", async () => {
const indexEntries = await manager.find(toMikroORMEntity(IndexData), {})
const indexRelationEntries = await manager.find(
@@ -290,7 +290,7 @@ export class DataSynchronizer {
filters["updated_at"] = { $gt: pagination.updated_at }
}
const { data } = await this.#query.graph({
const queryResult = await this.#query.graph({
entity: alias,
fields: [entityPrimaryKey],
filters,
@@ -302,10 +302,12 @@ export class DataSynchronizer {
},
})
if (!data.length) {
if (!queryResult?.data?.length) {
break
}
const data = queryResult.data
const envelop: Event = {
data,
name: !isLink
@@ -163,9 +163,11 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
}
}
protected static parseMessageData<T>(message?: Event): {
protected static parseMessageData<TData extends { id: string | string[] }>(
message?: Event
): {
action: string
data: { id: string }[]
data: TData[]
ids: string[]
} | void {
const isExpectedFormat =
@@ -177,7 +179,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
const result: {
action: string
data: { id: string }[]
data: TData[]
ids: string[]
} = {
action: "",
@@ -186,60 +188,76 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
}
result.action = message!.metadata!.action as string
result.data = message!.data as { id: string }[]
result.data = message!.data as TData[]
result.data = Array.isArray(result.data) ? result.data : [result.data]
result.ids = result.data.map((d) => d.id)
result.ids = result.data.flatMap((d) =>
Array.isArray(d.id) ? d.id : [d.id]
)
return result
}
consumeEvent(
schemaEntityObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation
): Subscriber<{ id: string }> {
return async (data: Event) => {
): Subscriber<{ id: string | string[] }> {
return async (event: Event) => {
await this.#isReady_
const data_: { id: string }[] = Array.isArray(data.data)
? data.data
: [data.data]
let ids: string[] = data_.map((d) => d.id)
let action = data.name.split(".").pop() || ""
const data_: { id: string }[] = Array.isArray(event.data)
? event.data
: [event.data]
let ids: string[] = data_.flatMap((d) =>
Array.isArray(d.id) ? d.id : [d.id]
)
let action = event.name.split(".").pop() || ""
const parsedMessage = PostgresProvider.parseMessageData(data)
const parsedMessage = PostgresProvider.parseMessageData(event)
if (parsedMessage) {
action = parsedMessage.action
ids = parsedMessage.ids
}
const { fields, alias } = schemaEntityObjectRepresentation
const graphConfig: Parameters<QueryGraphFunction>[0] = {
entity: alias,
filters: {
id: ids,
},
fields: [...new Set(["id", ...fields])],
}
if (action === CommonEvents.DELETED || action === CommonEvents.DETACHED) {
graphConfig.withDeleted = true
}
const { data: entityData } = await this.query_.graph(graphConfig)
const argument = {
entity: schemaEntityObjectRepresentation.entity,
data: entityData,
schemaEntityObjectRepresentation,
}
const targetMethod = this.eventActionToMethodMap_[action]
if (!targetMethod) {
return
}
await this[targetMethod](argument)
const { fields, alias } = schemaEntityObjectRepresentation
let withDeleted: boolean | undefined
if (action === CommonEvents.DELETED || action === CommonEvents.DETACHED) {
withDeleted = true
}
// Process ids in batches of 100
const batchSize = 100
const idsBatches: string[][] = []
for (let i = 0; i < ids.length; i += batchSize) {
idsBatches.push(ids.slice(i, i + batchSize))
}
for (const idsBatch of idsBatches) {
const graphConfig: Parameters<QueryGraphFunction>[0] = {
entity: alias,
filters: {
id: idsBatch,
},
fields: [...new Set(["id", ...fields])],
withDeleted,
}
const { data: entityData } = await this.query_.graph(graphConfig)
const argument = {
entity: schemaEntityObjectRepresentation.entity,
data: entityData,
schemaEntityObjectRepresentation,
}
await this[targetMethod](argument)
}
}
}
@@ -374,7 +392,9 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
@MedusaContext() sharedContext: Context<SqlEntityManager> = {}
) {
const { transactionManager: em } = sharedContext
const indexRepository = em!.getRepository(toMikroORMEntity(IndexData))
const indexRepository = em!.getRepository(
toMikroORMEntity(IndexData)
) as EntityRepository<any>
const indexRelationRepository: EntityRepository<any> = em!.getRepository(
toMikroORMEntity(IndexRelation)
)
@@ -385,33 +405,32 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
parentsProperties,
} = PostgresProvider.parseData(data, schemaEntityObjectRepresentation)
/**
* Clean the entity data to only keep the properties that are defined in the schema
*/
const cleanedData = data_.map((entityData) => {
return entityProperties.reduce((acc, property) => {
acc[property] = entityData[property]
return acc
}, {}) as TData
})
/**
* Loop through the data and create index entries for each entity as well as the
* index relation entries if the entity has parents
*/
for (const entityData of data_) {
/**
* Clean the entity data to only keep the properties that are defined in the schema
*/
const entitiesToUpsert: Set<string> = new Set()
const relationsToUpsert: Set<string> = new Set()
const cleanedEntityData = entityProperties.reduce((acc, property) => {
acc[property] = entityData[property]
return acc
}, {}) as TData
await indexRepository.upsert(
{
id: cleanedEntityData.id,
cleanedData.forEach((entityData, index) => {
entitiesToUpsert.add(
JSON.stringify({
id: entityData.id,
name: entity,
data: cleanedEntityData,
data: entityData,
staled_at: null,
},
{
onConflictAction: "merge",
onConflictFields: ["id", "name"],
onConflictMergeFields: ["data", "staled_at"],
}
})
)
/**
@@ -422,7 +441,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
parentsProperties
)) {
const parentAlias = parentProperties[0].split(".")[0]
const parentData = entityData[parentAlias] as TData[]
const parentData = data_[index][parentAlias] as TData[]
if (!parentData) {
continue
@@ -433,43 +452,46 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
: [parentData]
for (const parentData_ of parentDataCollection) {
await indexRepository.upsert(
{
id: (parentData_ as any).id,
name: parentEntity,
data: parentData_,
staled_at: null,
},
{
onConflictAction: "merge",
onConflictFields: ["id", "name"],
onConflictMergeFields: ["staled_at"],
}
)
await indexRelationRepository.upsert(
{
parent_id: (parentData_ as any).id,
relationsToUpsert.add(
JSON.stringify({
parent_id: parentData_.id,
parent_name: parentEntity,
child_id: cleanedEntityData.id,
child_id: entityData.id,
child_name: entity,
pivot: `${parentEntity}-${entity}`,
staled_at: null,
},
{
onConflictAction: "merge",
onConflictFields: [
"pivot",
"parent_id",
"child_id",
"parent_name",
"child_name",
],
onConflictMergeFields: ["staled_at"],
}
})
)
}
}
})
if (entitiesToUpsert.size) {
await indexRepository.upsertMany(
Array.from(entitiesToUpsert).map((entity) => JSON.parse(entity)),
{
onConflictAction: "merge",
onConflictFields: ["id", "name"],
onConflictMergeFields: ["data", "staled_at"],
}
)
}
if (relationsToUpsert.size) {
await indexRelationRepository.upsertMany(
Array.from(relationsToUpsert).map((relation) => JSON.parse(relation)),
{
onConflictAction: "merge",
onConflictFields: [
"pivot",
"parent_id",
"child_id",
"parent_name",
"child_name",
],
onConflictMergeFields: ["staled_at"],
}
)
}
}
@@ -495,7 +517,9 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
@MedusaContext() sharedContext: Context<SqlEntityManager> = {}
) {
const { transactionManager: em } = sharedContext
const indexRepository = em!.getRepository(toMikroORMEntity(IndexData))
const indexRepository = em!.getRepository(
toMikroORMEntity(IndexData)
) as EntityRepository<any>
const { data: data_, entityProperties } = PostgresProvider.parseData(
data,
@@ -503,24 +527,22 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
)
await indexRepository.upsertMany(
data_.map(
(entityData) => {
return {
id: entityData.id,
name: entity,
data: entityProperties.reduce((acc, property) => {
acc[property] = entityData[property]
return acc
}, {}),
staled_at: null,
}
},
{
onConflictAction: "merge",
onConflictFields: ["id", "name"],
onConflictMergeFields: ["data", "staled_at"],
data_.map((entityData) => {
return {
id: entityData.id,
name: entity,
data: entityProperties.reduce((acc, property) => {
acc[property] = entityData[property]
return acc
}, {}),
staled_at: null,
}
)
}),
{
onConflictAction: "merge",
onConflictFields: ["id", "name"],
onConflictMergeFields: ["data", "staled_at"],
}
)
}
@@ -601,7 +623,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
const indexRepository = em!.getRepository(toMikroORMEntity(IndexData))
const indexRelationRepository = em!.getRepository(
toMikroORMEntity(IndexRelation)
)
) as EntityRepository<any>
const { data: data_, entityProperties } = PostgresProvider.parseData(
data,
@@ -652,77 +674,66 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
)
}
for (const entityData of data_) {
/**
* Clean the link entity data to only keep the properties that are defined in the schema
*/
/**
* Clean the link entity data to only keep the properties that are defined in the schema
*/
const cleanedEntityData = entityProperties.reduce((acc, property) => {
const cleanedData = data_.map((entityData) => {
return entityProperties.reduce((acc, property) => {
acc[property] = entityData[property]
return acc
}, {}) as TData
})
await indexRepository.upsert(
{
id: cleanedEntityData.id,
name: entity,
data: cleanedEntityData,
staled_at: null,
},
{
onConflictAction: "merge",
onConflictFields: ["id", "name"],
onConflictMergeFields: ["data", "staled_at"],
}
)
/**
* Create the index relation entries for the parent entity and the child entity
*/
await indexRelationRepository.upsert(
let relationsToUpsert: any[] = []
const entitiesToUpdate = cleanedData.map((entityData) => {
relationsToUpsert.push(
{
parent_id: entityData[parentPropertyId] as string,
parent_name: parentEntityName,
child_id: cleanedEntityData.id,
child_id: entityData.id,
child_name: entity,
pivot: `${parentEntityName}-${entity}`,
staled_at: null,
},
{
onConflictAction: "merge",
onConflictFields: [
"pivot",
"parent_id",
"child_id",
"parent_name",
"child_name",
],
onConflictMergeFields: ["staled_at"],
}
)
await indexRelationRepository.upsert(
{
parent_id: cleanedEntityData.id,
parent_id: entityData.id,
parent_name: entity,
child_id: entityData[childPropertyId] as string,
child_name: childEntityName,
pivot: `${entity}-${childEntityName}`,
staled_at: null,
},
{
onConflictAction: "merge",
onConflictFields: [
"pivot",
"parent_id",
"child_id",
"parent_name",
"child_name",
],
onConflictMergeFields: ["staled_at"],
}
)
return {
id: entityData.id,
name: entity,
data: entityData,
staled_at: null,
}
})
if (entitiesToUpdate.length) {
await indexRepository.upsertMany(entitiesToUpdate, {
onConflictAction: "merge",
onConflictFields: ["id", "name"],
onConflictMergeFields: ["data", "staled_at"],
})
}
if (relationsToUpsert.length) {
await indexRelationRepository.upsertMany(relationsToUpsert, {
onConflictAction: "merge",
onConflictFields: [
"pivot",
"parent_id",
"child_id",
"parent_name",
"child_name",
],
onConflictMergeFields: ["staled_at"],
})
}
}
@@ -5,7 +5,11 @@ import {
ModuleJoinerConfig,
ModuleJoinerRelationship,
} from "@medusajs/framework/types"
import { CommonEvents, GraphQLUtils } from "@medusajs/framework/utils"
import {
buildModuleResourceEventName,
CommonEvents,
GraphQLUtils,
} from "@medusajs/framework/utils"
import { schemaObjectRepresentationPropertiesToOmit } from "@types"
export const CustomDirectives = {
@@ -561,9 +565,23 @@ function processEntity(
intermediateEntityObjectRepresentationRef.alias =
intermediateEntityAlias
intermediateEntityObjectRepresentationRef.listeners = [
intermediateEntityName + "." + CommonEvents.CREATED,
intermediateEntityName + "." + CommonEvents.UPDATED,
buildModuleResourceEventName({
action: CommonEvents.CREATED,
objectName: intermediateEntityName,
prefix: intermediateEntityModule.serviceName,
}),
buildModuleResourceEventName({
action: CommonEvents.UPDATED,
objectName: intermediateEntityName,
prefix: intermediateEntityModule.serviceName,
}),
buildModuleResourceEventName({
action: CommonEvents.DELETED,
objectName: intermediateEntityName,
prefix: intermediateEntityModule.serviceName,
}),
]
intermediateEntityObjectRepresentationRef.moduleConfig =
intermediateEntityModule
@@ -26,6 +26,7 @@ import {
MedusaContext,
MedusaError,
MedusaService,
moduleEventBuilderFactory,
Modules,
partitionArray,
} from "@medusajs/framework/utils"
@@ -248,16 +249,15 @@ export default class InventoryModuleService
const toCreate = Array.isArray(input) ? input : [input]
const created = await this.createReservationItems_(toCreate, context)
context.messageAggregator?.saveRawMessageData(
created.map((reservationItem) => ({
eventName: InventoryEvents.RESERVATION_ITEM_CREATED,
source: Modules.INVENTORY,
action: CommonEvents.CREATED,
object: "reservation-item",
context,
data: { id: reservationItem.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.CREATED,
object: "reservation-item",
source: Modules.INVENTORY,
eventName: InventoryEvents.RESERVATION_ITEM_CREATED,
})({
data: created,
sharedContext: context,
})
const serializedReservations = await this.baseRepository_.serialize<
InventoryTypes.ReservationItemDTO[] | InventoryTypes.ReservationItemDTO
@@ -350,16 +350,15 @@ export default class InventoryModuleService
)
const result = await this.createInventoryItems_(toCreate, context)
context.messageAggregator?.saveRawMessageData(
result.map((inventoryItem) => ({
eventName: InventoryEvents.INVENTORY_ITEM_CREATED,
source: Modules.INVENTORY,
action: CommonEvents.CREATED,
object: "inventory-item",
context,
data: { id: inventoryItem.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.CREATED,
object: "inventory-item",
source: Modules.INVENTORY,
eventName: InventoryEvents.INVENTORY_ITEM_CREATED,
})({
data: result,
sharedContext: context,
})
const serializedItems = await this.baseRepository_.serialize<
InventoryTypes.InventoryItemDTO | InventoryTypes.InventoryItemDTO[]
@@ -406,16 +405,15 @@ export default class InventoryModuleService
const created = await this.createInventoryLevels_(toCreate, context)
context.messageAggregator?.saveRawMessageData(
created.map((inventoryLevel) => ({
eventName: InventoryEvents.INVENTORY_LEVEL_CREATED,
source: Modules.INVENTORY,
action: CommonEvents.CREATED,
object: "inventory-level",
context,
data: { id: inventoryLevel.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.CREATED,
object: "inventory-level",
source: Modules.INVENTORY,
eventName: InventoryEvents.INVENTORY_LEVEL_CREATED,
})({
data: created,
sharedContext: context,
})
const serialized = await this.baseRepository_.serialize<
InventoryTypes.InventoryLevelDTO[] | InventoryTypes.InventoryLevelDTO
@@ -462,16 +460,15 @@ export default class InventoryModuleService
const result = await this.updateInventoryItems_(updates, context)
context.messageAggregator?.saveRawMessageData(
result.map((inventoryItem) => ({
eventName: InventoryEvents.INVENTORY_ITEM_UPDATED,
source: Modules.INVENTORY,
action: CommonEvents.UPDATED,
object: "inventory-item",
context,
data: { id: inventoryItem.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.UPDATED,
object: "inventory-item",
source: Modules.INVENTORY,
eventName: InventoryEvents.INVENTORY_ITEM_UPDATED,
})({
data: result,
sharedContext: context,
})
const serializedItems = await this.baseRepository_.serialize<
InventoryTypes.InventoryItemDTO | InventoryTypes.InventoryItemDTO[]
@@ -503,16 +500,15 @@ export default class InventoryModuleService
context
)
context.messageAggregator?.saveRawMessageData(
result[0].map((inventoryLevel) => ({
eventName: InventoryEvents.INVENTORY_LEVEL_DELETED,
source: Modules.INVENTORY,
action: CommonEvents.DELETED,
object: "inventory-level",
context,
data: { id: inventoryLevel.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.DELETED,
object: "inventory-level",
source: Modules.INVENTORY,
eventName: InventoryEvents.INVENTORY_LEVEL_DELETED,
})({
data: result[0],
sharedContext: context,
})
return result
}
@@ -524,6 +520,7 @@ export default class InventoryModuleService
* @param context
*/
@InjectTransactionManager()
@EmitEvents()
async deleteInventoryLevel(
inventoryItemId: string,
locationId: string,
@@ -535,13 +532,14 @@ export default class InventoryModuleService
context
)
context.messageAggregator?.saveRawMessageData({
eventName: InventoryEvents.INVENTORY_LEVEL_DELETED,
source: Modules.INVENTORY,
moduleEventBuilderFactory({
action: CommonEvents.DELETED,
object: "inventory-level",
context,
source: Modules.INVENTORY,
eventName: InventoryEvents.INVENTORY_LEVEL_DELETED,
})({
data: { id: inventoryLevel.id },
sharedContext: context,
})
if (!inventoryLevel) {
@@ -579,16 +577,15 @@ export default class InventoryModuleService
const levels = await this.updateInventoryLevels_(input, context)
context.messageAggregator?.saveRawMessageData(
levels.map((inventoryLevel) => ({
eventName: InventoryEvents.INVENTORY_LEVEL_UPDATED,
source: Modules.INVENTORY,
action: CommonEvents.UPDATED,
object: "inventory-level",
context,
data: { id: inventoryLevel.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.UPDATED,
object: "inventory-level",
source: Modules.INVENTORY,
eventName: InventoryEvents.INVENTORY_LEVEL_UPDATED,
})({
data: levels,
sharedContext: context,
})
const updatedLevels = await this.baseRepository_.serialize<
InventoryTypes.InventoryLevelDTO | InventoryTypes.InventoryLevelDTO[]
@@ -662,16 +659,15 @@ export default class InventoryModuleService
const update = Array.isArray(input) ? input : [input]
const result = await this.updateReservationItems_(update, context)
context.messageAggregator?.saveRawMessageData(
result.map((reservationItem) => ({
eventName: InventoryEvents.RESERVATION_ITEM_UPDATED,
source: Modules.INVENTORY,
action: CommonEvents.UPDATED,
object: "reservation-item",
context,
data: { id: reservationItem.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.UPDATED,
object: "reservation-item",
source: Modules.INVENTORY,
eventName: InventoryEvents.RESERVATION_ITEM_UPDATED,
})({
data: result,
sharedContext: context,
})
const serialized = await this.baseRepository_.serialize<
InventoryTypes.ReservationItemDTO | InventoryTypes.ReservationItemDTO[]
@@ -867,16 +863,15 @@ export default class InventoryModuleService
context
)
context.messageAggregator?.saveRawMessageData(
reservations.map((reservationItem) => ({
eventName: InventoryEvents.RESERVATION_ITEM_DELETED,
source: Modules.INVENTORY,
action: CommonEvents.DELETED,
object: "reservation-item",
context,
data: { id: reservationItem.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.DELETED,
object: "reservation-item",
source: Modules.INVENTORY,
eventName: InventoryEvents.RESERVATION_ITEM_DELETED,
})({
data: reservations,
sharedContext: context,
})
await this.adjustInventoryLevelsForReservationsDeletion(
reservations,
@@ -909,16 +904,15 @@ export default class InventoryModuleService
context
)
context.messageAggregator?.saveRawMessageData(
reservations.map((reservationItem) => ({
eventName: InventoryEvents.RESERVATION_ITEM_DELETED,
source: Modules.INVENTORY,
action: CommonEvents.DELETED,
object: "reservation-item",
context,
data: { id: reservationItem.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.DELETED,
object: "reservation-item",
source: Modules.INVENTORY,
eventName: InventoryEvents.RESERVATION_ITEM_DELETED,
})({
data: reservations,
sharedContext: context,
})
}
/**
@@ -946,16 +940,15 @@ export default class InventoryModuleService
context
)
context.messageAggregator?.saveRawMessageData(
reservations.map((reservationItem) => ({
eventName: InventoryEvents.RESERVATION_ITEM_CREATED,
source: Modules.INVENTORY,
action: CommonEvents.CREATED,
object: "reservation-item",
context,
data: { id: reservationItem.id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.CREATED,
object: "reservation-item",
source: Modules.INVENTORY,
eventName: InventoryEvents.RESERVATION_ITEM_CREATED,
})({
data: reservations,
sharedContext: context,
})
}
/**
@@ -1016,13 +1009,14 @@ export default class InventoryModuleService
)
results.push(result)
context.messageAggregator?.saveRawMessageData({
eventName: InventoryEvents.INVENTORY_LEVEL_UPDATED,
source: Modules.INVENTORY,
moduleEventBuilderFactory({
action: CommonEvents.UPDATED,
object: "inventory-level",
context,
source: Modules.INVENTORY,
eventName: InventoryEvents.INVENTORY_LEVEL_UPDATED,
})({
data: { id: result.id },
sharedContext: context,
})
}
@@ -11,6 +11,7 @@ import {
} from "@medusajs/framework/types"
import {
CommonEvents,
EmitEvents,
InjectManager,
InjectTransactionManager,
isDefined,
@@ -18,6 +19,7 @@ import {
MapToConfig,
MedusaContext,
MedusaError,
moduleEventBuilderFactory,
Modules,
ModulesSdkUtils,
} from "@medusajs/framework/utils"
@@ -175,6 +177,7 @@ export default class LinkModuleService implements ILinkModule {
}
@InjectTransactionManager()
@EmitEvents()
async create(
primaryKeyOrBulkData:
| string
@@ -207,18 +210,15 @@ export default class LinkModuleService implements ILinkModule {
const links = await this.linkService_.create(data, sharedContext)
await this.eventBusModuleService_?.emit<Record<string, unknown>>(
(data as { id: unknown }[]).map(({ id }) => ({
name: this.entityName_ + "." + CommonEvents.ATTACHED,
metadata: {
source: this.serviceName_,
action: CommonEvents.ATTACHED,
object: this.entityName_,
eventGroupId: sharedContext.eventGroupId,
},
data: { id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.ATTACHED,
object: this.entityName_,
source: this.serviceName_,
eventName: this.entityName_ + "." + CommonEvents.ATTACHED,
})({
data: data as { id: string }[],
sharedContext,
})
return (await this.baseRepository_.serialize(links)) as unknown[]
}
@@ -249,6 +249,7 @@ export default class LinkModuleService implements ILinkModule {
}
@InjectTransactionManager()
@EmitEvents()
async delete(
data: any,
@MedusaContext() sharedContext: Context = {}
@@ -258,20 +259,19 @@ export default class LinkModuleService implements ILinkModule {
await this.linkService_.delete(data, sharedContext)
const allData = Array.isArray(data) ? data : [data]
await this.eventBusModuleService_?.emit<Record<string, unknown>>(
allData.map(({ id }) => ({
name: this.entityName_ + "." + CommonEvents.DETACHED,
metadata: {
source: this.serviceName_,
action: CommonEvents.DETACHED,
object: this.entityName_,
eventGroupId: sharedContext.eventGroupId,
},
data: { id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.DETACHED,
object: this.entityName_,
source: this.serviceName_,
eventName: this.entityName_ + "." + CommonEvents.DETACHED,
})({
data: allData as { id: string }[],
sharedContext,
})
}
@InjectTransactionManager()
@EmitEvents()
async softDelete(
data: any,
{ returnLinkableKeys }: SoftDeleteReturn = {},
@@ -307,18 +307,15 @@ export default class LinkModuleService implements ILinkModule {
)
}
await this.eventBusModuleService_?.emit<Record<string, unknown>>(
(deletedEntities as { id: string }[]).map(({ id }) => ({
name: this.entityName_ + "." + CommonEvents.DETACHED,
metadata: {
source: this.serviceName_,
action: CommonEvents.DETACHED,
object: this.entityName_,
eventGroupId: sharedContext.eventGroupId,
},
data: { id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.DETACHED,
object: this.entityName_,
source: this.serviceName_,
eventName: this.entityName_ + "." + CommonEvents.DETACHED,
})({
data: deletedEntities as { id: string }[],
sharedContext,
})
return mappedCascadedEntitiesMap ? mappedCascadedEntitiesMap : void 0
}
@@ -331,6 +328,8 @@ export default class LinkModuleService implements ILinkModule {
return await this.linkService_.softDelete(data, sharedContext)
}
@InjectTransactionManager()
@EmitEvents()
async restore(
data: any,
{ returnLinkableKeys }: RestoreReturn = {},
@@ -365,18 +364,15 @@ export default class LinkModuleService implements ILinkModule {
)
}
await this.eventBusModuleService_?.emit<Record<string, unknown>>(
(restoredEntities as { id: string }[]).map(({ id }) => ({
name: this.entityName_ + "." + CommonEvents.ATTACHED,
metadata: {
source: this.serviceName_,
action: CommonEvents.ATTACHED,
object: this.entityName_,
eventGroupId: sharedContext.eventGroupId,
},
data: { id },
}))
)
moduleEventBuilderFactory({
action: CommonEvents.ATTACHED,
object: this.entityName_,
source: this.serviceName_,
eventName: this.entityName_ + "." + CommonEvents.ATTACHED,
})({
data: restoredEntities as { id: string }[],
sharedContext,
})
return mappedCascadedEntitiesMap ? mappedCascadedEntitiesMap : void 0
}
@@ -388,4 +384,21 @@ export default class LinkModuleService implements ILinkModule {
): Promise<[object[], Record<string, string[]>]> {
return await this.linkService_.restore(data, sharedContext)
}
protected async emitEvents_(groupedEvents) {
if (!this.eventBusModuleService_ || !groupedEvents) {
return
}
const promises: Promise<void>[] = []
for (const group of Object.keys(groupedEvents)) {
promises.push(
this.eventBusModuleService_.emit(groupedEvents[group], {
internal: true,
})
)
}
await Promise.all(promises)
}
}
@@ -461,7 +461,7 @@ moduleIntegrationTestRunner<IPricingModuleService>({
)
const events = eventBusEmitSpy.mock.calls[0][0]
expect(events).toHaveLength(4)
expect(events).toHaveLength(3)
expect(events[0]).toEqual(
composeMessage(PricingEvents.PRICE_LIST_CREATED, {
source: Modules.PRICING,
@@ -475,18 +475,15 @@ moduleIntegrationTestRunner<IPricingModuleService>({
source: Modules.PRICING,
action: CommonEvents.CREATED,
object: "price_list_rule",
data: { id: priceList.price_list_rules?.[0].id },
data: {
id: [
priceList.price_list_rules?.[0].id,
priceList.price_list_rules?.[1].id,
],
},
})
)
expect(events[2]).toEqual(
composeMessage(PricingEvents.PRICE_LIST_RULE_CREATED, {
source: Modules.PRICING,
action: CommonEvents.CREATED,
object: "price_list_rule",
data: { id: priceList.price_list_rules?.[1].id },
})
)
expect(events[3]).toEqual(
composeMessage(PricingEvents.PRICE_CREATED, {
source: Modules.PRICING,
action: CommonEvents.CREATED,
@@ -1003,12 +1003,12 @@ export default class PricingModuleService
})
// Bulk create price sets
const createdPriceSets = await this.priceSetService_.create(
const priceSets = await this.priceSetService_.create(
toCreate,
sharedContext
)
const eventsData = createdPriceSets.reduce(
const eventsData = priceSets.reduce(
(eventsData, priceSet) => {
eventsData.priceSets.push({
id: priceSet.id,
@@ -1051,7 +1051,7 @@ export default class PricingModuleService
sharedContext,
})
return createdPriceSets
return priceSets
}
@InjectTransactionManager()
@@ -255,19 +255,11 @@ moduleIntegrationTestRunner<IUserModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith(
[
expect.objectContaining({
data: { id: "1" },
data: { id: ["1", "2"] },
name: UserEvents.INVITE_CREATED,
}),
expect.objectContaining({
data: { id: "2" },
name: UserEvents.INVITE_CREATED,
}),
expect.objectContaining({
data: { id: "1" },
name: UserEvents.INVITE_TOKEN_GENERATED,
}),
expect.objectContaining({
data: { id: "2" },
data: { id: ["1", "2"] },
name: UserEvents.INVITE_TOKEN_GENERATED,
}),
],
@@ -257,11 +257,7 @@ moduleIntegrationTestRunner<IUserModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith(
[
expect.objectContaining({
data: { id: "1" },
name: UserEvents.USER_CREATED,
}),
expect.objectContaining({
data: { id: "2" },
data: { id: ["1", "2"] },
name: UserEvents.USER_CREATED,
}),
],
@@ -16,6 +16,8 @@ import {
MedusaContext,
MedusaError,
MedusaService,
moduleEventBuilderFactory,
Modules,
UserEvents,
} from "@medusajs/framework/utils"
import jwt, { JwtPayload } from "jsonwebtoken"
@@ -110,16 +112,15 @@ export default class UserModuleService
): Promise<UserTypes.InviteDTO[]> {
const invites = await this.refreshInviteTokens_(inviteIds, sharedContext)
sharedContext.messageAggregator?.saveRawMessageData(
invites.map((invite) => ({
eventName: UserEvents.INVITE_TOKEN_GENERATED,
source: this.constructor.name,
action: "token_generated",
object: "invite",
context: sharedContext,
data: { id: invite.id },
}))
)
moduleEventBuilderFactory({
eventName: UserEvents.INVITE_TOKEN_GENERATED,
source: Modules.USER,
action: "token_generated",
object: "invite",
})({
data: invites,
sharedContext,
})
return await this.baseRepository_.serialize<UserTypes.InviteDTO[]>(
invites,
@@ -193,16 +194,15 @@ export default class UserModuleService
populate: true,
})
sharedContext.messageAggregator?.saveRawMessageData(
users.map((user) => ({
eventName: UserEvents.USER_CREATED,
source: this.constructor.name,
action: CommonEvents.CREATED,
object: "user",
context: sharedContext,
data: { id: user.id },
}))
)
moduleEventBuilderFactory({
eventName: UserEvents.USER_CREATED,
source: Modules.USER,
action: CommonEvents.CREATED,
object: "user",
})({
data: serializedUsers,
sharedContext,
})
return Array.isArray(data) ? serializedUsers : serializedUsers[0]
}
@@ -235,16 +235,15 @@ export default class UserModuleService
populate: true,
})
sharedContext.messageAggregator?.saveRawMessageData(
updatedUsers.map((user) => ({
eventName: UserEvents.USER_UPDATED,
source: this.constructor.name,
action: CommonEvents.UPDATED,
object: "user",
context: sharedContext,
data: { id: user.id },
}))
)
moduleEventBuilderFactory({
eventName: UserEvents.USER_UPDATED,
source: Modules.USER,
action: CommonEvents.UPDATED,
object: "user",
})({
data: serializedUsers,
sharedContext,
})
return Array.isArray(data) ? serializedUsers : serializedUsers[0]
}
@@ -277,27 +276,25 @@ export default class UserModuleService
populate: true,
})
sharedContext.messageAggregator?.saveRawMessageData(
invites.map((invite) => ({
eventName: UserEvents.INVITE_CREATED,
source: this.constructor.name,
action: CommonEvents.CREATED,
object: "invite",
context: sharedContext,
data: { id: invite.id },
}))
)
moduleEventBuilderFactory({
eventName: UserEvents.INVITE_CREATED,
source: Modules.USER,
action: CommonEvents.CREATED,
object: "invite",
})({
data: serializedInvites,
sharedContext,
})
sharedContext.messageAggregator?.saveRawMessageData(
invites.map((invite) => ({
eventName: UserEvents.INVITE_TOKEN_GENERATED,
source: this.constructor.name,
action: "token_generated",
object: "invite",
context: sharedContext,
data: { id: invite.id },
}))
)
moduleEventBuilderFactory({
eventName: UserEvents.INVITE_TOKEN_GENERATED,
source: Modules.USER,
action: "token_generated",
object: "invite",
})({
data: serializedInvites,
sharedContext,
})
return Array.isArray(data) ? serializedInvites : serializedInvites[0]
}
@@ -364,16 +361,15 @@ export default class UserModuleService
populate: true,
})
sharedContext.messageAggregator?.saveRawMessageData(
serializedInvites.map((invite) => ({
eventName: UserEvents.INVITE_UPDATED,
source: this.constructor.name,
action: CommonEvents.UPDATED,
object: "invite",
context: sharedContext,
data: { id: invite.id },
}))
)
moduleEventBuilderFactory({
eventName: UserEvents.INVITE_UPDATED,
source: Modules.USER,
action: CommonEvents.UPDATED,
object: "invite",
})({
data: serializedInvites,
sharedContext,
})
return Array.isArray(data) ? serializedInvites : serializedInvites[0]
}