diff --git a/.changeset/clever-pens-swim.md b/.changeset/clever-pens-swim.md new file mode 100644 index 0000000000..23959abb29 --- /dev/null +++ b/.changeset/clever-pens-swim.md @@ -0,0 +1,12 @@ +--- +"@medusajs/fulfillment": patch +"@medusajs/index": patch +"@medusajs/inventory": patch +"@medusajs/link-modules": patch +"@medusajs/pricing": patch +"@medusajs/user": patch +"@medusajs/types": patch +"@medusajs/utils": patch +--- + +chore(): Emit events in batch and index process event ids in batch diff --git a/packages/core/types/src/event-bus/common.ts b/packages/core/types/src/event-bus/common.ts index 744b13f909..ba8e2ec06b 100644 --- a/packages/core/types/src/event-bus/common.ts +++ b/packages/core/types/src/event-bus/common.ts @@ -18,7 +18,7 @@ export type EventMetadata = Record & { /** * The ID of the event's group. Grouped events are useful when you have distributed transactions * where you need to explicitly group, release and clear events upon lifecycle events of a transaction. - * + * * When set, you must release the grouped events using the Event Module's `releaseGroupedEvents` method to emit the events. */ eventGroupId?: string @@ -27,7 +27,7 @@ export type EventMetadata = Record & { export type Event = { /** * The event's name. - * + * * @example * user.created */ @@ -37,7 +37,7 @@ export type Event = { */ metadata?: EventMetadata /** - * The data payload that subscribers receive. For example, the ID of the created user. + * The data payload that subscribers receive. For example, the ID or IDs of the created user. (e.g. { id: "123" } or { ids: ["123", "456"] }) */ data: TData } diff --git a/packages/core/utils/src/event-bus/utils.ts b/packages/core/utils/src/event-bus/utils.ts index 92b40046dd..51617dcfb6 100644 --- a/packages/core/utils/src/event-bus/utils.ts +++ b/packages/core/utils/src/event-bus/utils.ts @@ -44,7 +44,8 @@ export function buildModuleResourceEventName({ action: string }): string { const kebabCaseName = lowerCaseFirst(kebabCase(objectName)) - return `${prefix ? `${prefix}.` : ""}${kebabCaseName}.${action}` + const conventionalePrefix = prefix && lowerCaseFirst(kebabCase(prefix)) + return `${prefix ? `${conventionalePrefix}.` : ""}${kebabCaseName}.${action}` } /** diff --git a/packages/core/utils/src/modules-sdk/event-builder-factory.ts b/packages/core/utils/src/modules-sdk/event-builder-factory.ts index 3154e6f44f..03a7e10b9a 100644 --- a/packages/core/utils/src/modules-sdk/event-builder-factory.ts +++ b/packages/core/utils/src/modules-sdk/event-builder-factory.ts @@ -60,15 +60,15 @@ export function moduleEventBuilderFactory({ }) } - data.forEach((dataItem) => { - messages.push({ - source, - action, - context: sharedContext, - data: { id: dataItem.id }, - eventName: eventName!, - object, - }) + messages.push({ + source, + action, + context: sharedContext, + data: { + id: data.length === 1 ? data[0].id : data.map((item) => item.id), + }, + eventName: eventName!, + object, }) aggregator.saveRawMessageData(messages) diff --git a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/fulfillment-set.spec.ts b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/fulfillment-set.spec.ts index c1cc4154d9..d3e93e6bfb 100644 --- a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/fulfillment-set.spec.ts +++ b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/fulfillment-set.spec.ts @@ -218,7 +218,9 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.FULFILLMENT_SET_CREATED, action: "created", object: "fulfillment_set", - data: { id: fulfillmentSets[i].id }, + data: { + id: expect.arrayContaining([fulfillmentSets[i].id]), + }, }), ]), { @@ -335,13 +337,19 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.FULFILLMENT_SET_CREATED, action: "created", object: "fulfillment_set", - data: { id: fulfillmentSets[i].id }, + data: { + id: expect.arrayContaining([fulfillmentSets[i].id]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.SERVICE_ZONE_CREATED, action: "created", object: "service_zone", - data: { id: fulfillmentSets[i].service_zones[0].id }, + data: { + id: expect.arrayContaining([ + fulfillmentSets[i].service_zones[0].id, + ]), + }, }), ]), { @@ -505,20 +513,28 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.FULFILLMENT_SET_CREATED, action: "created", object: "fulfillment_set", - data: { id: fulfillmentSets[i].id }, + data: { + id: expect.arrayContaining([fulfillmentSets[i].id]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.SERVICE_ZONE_CREATED, action: "created", object: "service_zone", - data: { id: fulfillmentSets[i].service_zones[0].id }, + data: { + id: expect.arrayContaining([ + fulfillmentSets[i].service_zones[0].id, + ]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.GEO_ZONE_CREATED, action: "created", object: "geo_zone", data: { - id: fulfillmentSets[i].service_zones[0].geo_zones[0].id, + id: expect.arrayContaining([ + fulfillmentSets[i].service_zones[0].geo_zones[0].id, + ]), }, }), ]), @@ -708,7 +724,7 @@ moduleIntegrationTestRunner({ }) expect(updatedFulfillmentSets).toHaveLength(2) - expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(2) + expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(1) for (const data_ of updateData) { const currentFullfillmentSet = fullfillmentSets.find( @@ -729,7 +745,9 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.FULFILLMENT_SET_UPDATED, action: "updated", object: "fulfillment_set", - data: { id: currentFullfillmentSet.id }, + data: { + id: expect.arrayContaining([currentFullfillmentSet.id]), + }, }), ]), { @@ -1057,7 +1075,7 @@ moduleIntegrationTestRunner({ ) expect(updatedFulfillmentSets).toHaveLength(2) - expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(10) + expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(5) for (const data_ of updateData) { const expectedFulfillmentSet = updatedFulfillmentSets.find( @@ -1097,36 +1115,48 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.FULFILLMENT_SET_UPDATED, action: "updated", object: "fulfillment_set", - data: { id: expectedFulfillmentSet.id }, + data: { + id: expect.arrayContaining([expectedFulfillmentSet.id]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.SERVICE_ZONE_CREATED, action: "created", object: "service_zone", - data: { id: expectedFulfillmentSet.service_zones[0].id }, + data: { + id: expect.arrayContaining([ + expectedFulfillmentSet.service_zones[0].id, + ]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.GEO_ZONE_CREATED, action: "created", object: "geo_zone", data: { - id: expectedFulfillmentSet.service_zones[0].geo_zones[0] - .id, + id: expect.arrayContaining([ + expectedFulfillmentSet.service_zones[0].geo_zones[0].id, + ]), }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.SERVICE_ZONE_DELETED, action: "deleted", object: "service_zone", - data: { id: originalFulfillmentSet.service_zones[0].id }, + data: { + id: expect.arrayContaining([ + originalFulfillmentSet.service_zones[0].id, + ]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.GEO_ZONE_DELETED, action: "deleted", object: "geo_zone", data: { - id: originalFulfillmentSet.service_zones[0].geo_zones[0] - .id, + id: expect.arrayContaining([ + originalFulfillmentSet.service_zones[0].geo_zones[0].id, + ]), }, }), ]), @@ -1215,7 +1245,7 @@ moduleIntegrationTestRunner({ ) expect(updatedFulfillmentSets).toHaveLength(2) - expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(6) + expect(eventBusEmitSpy.mock.calls[1][0]).toHaveLength(3) for (const data_ of updateData) { const expectedFulfillmentSet = updatedFulfillmentSets.find( @@ -1259,20 +1289,26 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.FULFILLMENT_SET_UPDATED, action: "updated", object: "fulfillment_set", - data: { id: expectedFulfillmentSet.id }, + data: { + id: expect.arrayContaining([expectedFulfillmentSet.id]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.SERVICE_ZONE_CREATED, action: "created", object: "service_zone", - data: { id: createdServiceZone.id }, + data: { + id: expect.arrayContaining([createdServiceZone.id]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.GEO_ZONE_CREATED, action: "created", object: "geo_zone", data: { - id: createdServiceZone.geo_zones[0].id, + id: expect.arrayContaining([ + createdServiceZone.geo_zones[0].id, + ]), }, }), ]), diff --git a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/geo-zone.spec.ts b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/geo-zone.spec.ts index 630a89e06c..5d24fb4c14 100644 --- a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/geo-zone.spec.ts +++ b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/geo-zone.spec.ts @@ -154,7 +154,7 @@ moduleIntegrationTestRunner({ const geoZones = await service.createGeoZones(data) expect(geoZones).toHaveLength(2) - expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(2) + expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(1) let i = 0 for (const data_ of data) { @@ -172,7 +172,7 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.GEO_ZONE_CREATED, action: "created", object: "geo_zone", - data: { id: geoZones[i].id }, + data: { id: expect.arrayContaining([geoZones[i].id]) }, }), ]), { @@ -331,7 +331,7 @@ moduleIntegrationTestRunner({ const updatedGeoZones = await service.updateGeoZones(updateData) expect(updatedGeoZones).toHaveLength(2) - expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(2) + expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(1) for (const data_ of updateData) { const expectedGeoZone = updatedGeoZones.find( @@ -352,7 +352,7 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.GEO_ZONE_UPDATED, action: "updated", object: "geo_zone", - data: { id: expectedGeoZone.id }, + data: { id: expect.arrayContaining([expectedGeoZone.id]) }, }), ]), { diff --git a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/service-zone.spec.ts b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/service-zone.spec.ts index 1ea7beafbf..845ecd9b19 100644 --- a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/service-zone.spec.ts +++ b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/service-zone.spec.ts @@ -510,7 +510,7 @@ moduleIntegrationTestRunner({ expect(updatedServiceZones).toHaveLength(2) - expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(6) // Since the update only calls create and update which are already tested, only check the length + expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(3) // Since the update only calls create and update which are already tested, only check the length for (const data_ of updateData) { const expectedServiceZone = updatedServiceZones.find( diff --git a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/shipping-option.spec.ts b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/shipping-option.spec.ts index 441c7e9e2b..d10c2415c3 100644 --- a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/shipping-option.spec.ts +++ b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/shipping-option.spec.ts @@ -582,7 +582,7 @@ moduleIntegrationTestRunner({ ) expect(createdShippingOptions).toHaveLength(2) - expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(6) + expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(3) let i = 0 for (const data_ of createData) { @@ -619,19 +619,31 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.SHIPPING_OPTION_CREATED, action: "created", object: "shipping_option", - data: { id: createdShippingOptions[i].id }, + data: { + id: expect.arrayContaining([ + createdShippingOptions[i].id, + ]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.SHIPPING_OPTION_TYPE_CREATED, action: "created", object: "shipping_option_type", - data: { id: createdShippingOptions[i].type.id }, + data: { + id: expect.arrayContaining([ + createdShippingOptions[i].type.id, + ]), + }, }), buildExpectedEventMessageShape({ eventName: FulfillmentEvents.SHIPPING_OPTION_RULE_CREATED, action: "created", object: "shipping_option_rule", - data: { id: createdShippingOptions[i].rules[0].id }, + data: { + id: expect.arrayContaining([ + createdShippingOptions[i].rules[0].id, + ]), + }, }), ]), { diff --git a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/shipping-profile.spec.ts b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/shipping-profile.spec.ts index e7d4a2eb2b..f36596f9a0 100644 --- a/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/shipping-profile.spec.ts +++ b/packages/modules/fulfillment/integration-tests/__tests__/fulfillment-module-service/shipping-profile.spec.ts @@ -100,7 +100,7 @@ moduleIntegrationTestRunner({ await service.createShippingProfiles(createData) expect(createdShippingProfiles).toHaveLength(2) - expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(2) + expect(eventBusEmitSpy.mock.calls[0][0]).toHaveLength(1) let i = 0 for (const data_ of createData) { @@ -117,7 +117,11 @@ moduleIntegrationTestRunner({ eventName: FulfillmentEvents.SHIPPING_PROFILE_CREATED, action: "created", object: "shipping_profile", - data: { id: createdShippingProfiles[i].id }, + data: { + id: expect.arrayContaining([ + createdShippingProfiles[i].id, + ]), + }, }), ]), { diff --git a/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts b/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts index 17f48c5b54..ec04804372 100644 --- a/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts @@ -19,7 +19,7 @@ import { asValue } from "awilix" import * as path from "path" import { setTimeout } from "timers/promises" import { EventBusServiceMock } from "../__fixtures__" -import { dbName } from "../__fixtures__/medusa-config" +import config, { dbName } from "../__fixtures__/medusa-config" const eventBusMock = new EventBusServiceMock() const queryMock = { @@ -147,14 +147,15 @@ describe("DataSynchronizer", () => { // Mock query response for products queryMock.graph.mockImplementation(async (config) => { if (Array.isArray(config.filters.id)) { + let data: any[] = [] if (config.filters.id.includes(testProductId)) { - return { - data: [mockData[0]], - } + data.push(mockData[0]) } else if (config.filters.id.includes(testProductId2)) { - return { - data: [mockData[1]], - } + data.push(mockData[1]) + } + + return { + data, } } diff --git a/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts b/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts index 7e244f5624..38f3b10a7d 100644 --- a/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts @@ -141,6 +141,7 @@ const beforeAll_ = async () => { ;(index as any).eventBusModuleService_ = eventBusMock await globalApp.onApplicationStart() + await setTimeout(3000) ;(index as any).storageProvider_.query_ = queryMock return globalApp @@ -196,6 +197,8 @@ describe("IndexModuleService", function () { await dbUtils.shutdown(dbName) }) + afterEach(afterEach_) + describe("on created or attached events", function () { let manager @@ -222,7 +225,7 @@ describe("IndexModuleService", function () { }, }, { - name: "PriceSet.created", + name: "pricing.price-set.created", data: { id: priceSetId, }, @@ -247,7 +250,6 @@ describe("IndexModuleService", function () { ] beforeEach(async () => { - await setTimeout(1000) await beforeEach_(eventDataToEmit) manager = (medusaApp.sharedContainer!.resolve(Modules.INDEX) as any) @@ -392,7 +394,7 @@ describe("IndexModuleService", function () { }, }, { - name: "PriceSet.created", + name: "pricing.price-set.created", data: { id: priceSetId, }, @@ -689,7 +691,7 @@ describe("IndexModuleService", function () { }, }, { - name: "PriceSet.created", + name: "pricing.price-set.created", data: { id: priceSetId, }, @@ -761,8 +763,6 @@ describe("IndexModuleService", function () { await eventBusMock.emit(deleteEventDataToEmit) }) - afterEach(afterEach_) - it("should consume all deleted events and delete the index entries", async () => { const indexEntries = await manager.find(toMikroORMEntity(IndexData), {}) const indexRelationEntries = await manager.find( diff --git a/packages/modules/index/src/services/data-synchronizer.ts b/packages/modules/index/src/services/data-synchronizer.ts index 11239e2275..07cb6b3ce4 100644 --- a/packages/modules/index/src/services/data-synchronizer.ts +++ b/packages/modules/index/src/services/data-synchronizer.ts @@ -290,7 +290,7 @@ export class DataSynchronizer { filters["updated_at"] = { $gt: pagination.updated_at } } - const { data } = await this.#query.graph({ + const queryResult = await this.#query.graph({ entity: alias, fields: [entityPrimaryKey], filters, @@ -302,10 +302,12 @@ export class DataSynchronizer { }, }) - if (!data.length) { + if (!queryResult?.data?.length) { break } + const data = queryResult.data + const envelop: Event = { data, name: !isLink diff --git a/packages/modules/index/src/services/postgres-provider.ts b/packages/modules/index/src/services/postgres-provider.ts index f8e40cea18..df3d422f7e 100644 --- a/packages/modules/index/src/services/postgres-provider.ts +++ b/packages/modules/index/src/services/postgres-provider.ts @@ -163,9 +163,11 @@ export class PostgresProvider implements IndexTypes.StorageProvider { } } - protected static parseMessageData(message?: Event): { + protected static parseMessageData( + message?: Event + ): { action: string - data: { id: string }[] + data: TData[] ids: string[] } | void { const isExpectedFormat = @@ -177,7 +179,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { const result: { action: string - data: { id: string }[] + data: TData[] ids: string[] } = { action: "", @@ -186,60 +188,76 @@ export class PostgresProvider implements IndexTypes.StorageProvider { } result.action = message!.metadata!.action as string - result.data = message!.data as { id: string }[] + result.data = message!.data as TData[] result.data = Array.isArray(result.data) ? result.data : [result.data] - result.ids = result.data.map((d) => d.id) + result.ids = result.data.flatMap((d) => + Array.isArray(d.id) ? d.id : [d.id] + ) return result } consumeEvent( schemaEntityObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation - ): Subscriber<{ id: string }> { - return async (data: Event) => { + ): Subscriber<{ id: string | string[] }> { + return async (event: Event) => { await this.#isReady_ - const data_: { id: string }[] = Array.isArray(data.data) - ? data.data - : [data.data] - let ids: string[] = data_.map((d) => d.id) - let action = data.name.split(".").pop() || "" + const data_: { id: string }[] = Array.isArray(event.data) + ? event.data + : [event.data] + let ids: string[] = data_.flatMap((d) => + Array.isArray(d.id) ? d.id : [d.id] + ) + let action = event.name.split(".").pop() || "" - const parsedMessage = PostgresProvider.parseMessageData(data) + const parsedMessage = PostgresProvider.parseMessageData(event) if (parsedMessage) { action = parsedMessage.action ids = parsedMessage.ids } - const { fields, alias } = schemaEntityObjectRepresentation - - const graphConfig: Parameters[0] = { - entity: alias, - filters: { - id: ids, - }, - fields: [...new Set(["id", ...fields])], - } - - if (action === CommonEvents.DELETED || action === CommonEvents.DETACHED) { - graphConfig.withDeleted = true - } - - const { data: entityData } = await this.query_.graph(graphConfig) - - const argument = { - entity: schemaEntityObjectRepresentation.entity, - data: entityData, - schemaEntityObjectRepresentation, - } - const targetMethod = this.eventActionToMethodMap_[action] if (!targetMethod) { return } - await this[targetMethod](argument) + const { fields, alias } = schemaEntityObjectRepresentation + + let withDeleted: boolean | undefined + if (action === CommonEvents.DELETED || action === CommonEvents.DETACHED) { + withDeleted = true + } + + // Process ids in batches of 100 + const batchSize = 100 + const idsBatches: string[][] = [] + + for (let i = 0; i < ids.length; i += batchSize) { + idsBatches.push(ids.slice(i, i + batchSize)) + } + + for (const idsBatch of idsBatches) { + const graphConfig: Parameters[0] = { + entity: alias, + filters: { + id: idsBatch, + }, + fields: [...new Set(["id", ...fields])], + withDeleted, + } + + const { data: entityData } = await this.query_.graph(graphConfig) + + const argument = { + entity: schemaEntityObjectRepresentation.entity, + data: entityData, + schemaEntityObjectRepresentation, + } + + await this[targetMethod](argument) + } } } @@ -374,7 +392,9 @@ export class PostgresProvider implements IndexTypes.StorageProvider { @MedusaContext() sharedContext: Context = {} ) { const { transactionManager: em } = sharedContext - const indexRepository = em!.getRepository(toMikroORMEntity(IndexData)) + const indexRepository = em!.getRepository( + toMikroORMEntity(IndexData) + ) as EntityRepository const indexRelationRepository: EntityRepository = em!.getRepository( toMikroORMEntity(IndexRelation) ) @@ -385,33 +405,32 @@ export class PostgresProvider implements IndexTypes.StorageProvider { parentsProperties, } = PostgresProvider.parseData(data, schemaEntityObjectRepresentation) + /** + * Clean the entity data to only keep the properties that are defined in the schema + */ + const cleanedData = data_.map((entityData) => { + return entityProperties.reduce((acc, property) => { + acc[property] = entityData[property] + return acc + }, {}) as TData + }) + /** * Loop through the data and create index entries for each entity as well as the * index relation entries if the entity has parents */ - for (const entityData of data_) { - /** - * Clean the entity data to only keep the properties that are defined in the schema - */ + const entitiesToUpsert: Set = new Set() + const relationsToUpsert: Set = new Set() - const cleanedEntityData = entityProperties.reduce((acc, property) => { - acc[property] = entityData[property] - return acc - }, {}) as TData - - await indexRepository.upsert( - { - id: cleanedEntityData.id, + cleanedData.forEach((entityData, index) => { + entitiesToUpsert.add( + JSON.stringify({ + id: entityData.id, name: entity, - data: cleanedEntityData, + data: entityData, staled_at: null, - }, - { - onConflictAction: "merge", - onConflictFields: ["id", "name"], - onConflictMergeFields: ["data", "staled_at"], - } + }) ) /** @@ -422,7 +441,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { parentsProperties )) { const parentAlias = parentProperties[0].split(".")[0] - const parentData = entityData[parentAlias] as TData[] + const parentData = data_[index][parentAlias] as TData[] if (!parentData) { continue @@ -433,43 +452,46 @@ export class PostgresProvider implements IndexTypes.StorageProvider { : [parentData] for (const parentData_ of parentDataCollection) { - await indexRepository.upsert( - { - id: (parentData_ as any).id, - name: parentEntity, - data: parentData_, - staled_at: null, - }, - { - onConflictAction: "merge", - onConflictFields: ["id", "name"], - onConflictMergeFields: ["staled_at"], - } - ) - - await indexRelationRepository.upsert( - { - parent_id: (parentData_ as any).id, + relationsToUpsert.add( + JSON.stringify({ + parent_id: parentData_.id, parent_name: parentEntity, - child_id: cleanedEntityData.id, + child_id: entityData.id, child_name: entity, pivot: `${parentEntity}-${entity}`, staled_at: null, - }, - { - onConflictAction: "merge", - onConflictFields: [ - "pivot", - "parent_id", - "child_id", - "parent_name", - "child_name", - ], - onConflictMergeFields: ["staled_at"], - } + }) ) } } + }) + + if (entitiesToUpsert.size) { + await indexRepository.upsertMany( + Array.from(entitiesToUpsert).map((entity) => JSON.parse(entity)), + { + onConflictAction: "merge", + onConflictFields: ["id", "name"], + onConflictMergeFields: ["data", "staled_at"], + } + ) + } + + if (relationsToUpsert.size) { + await indexRelationRepository.upsertMany( + Array.from(relationsToUpsert).map((relation) => JSON.parse(relation)), + { + onConflictAction: "merge", + onConflictFields: [ + "pivot", + "parent_id", + "child_id", + "parent_name", + "child_name", + ], + onConflictMergeFields: ["staled_at"], + } + ) } } @@ -495,7 +517,9 @@ export class PostgresProvider implements IndexTypes.StorageProvider { @MedusaContext() sharedContext: Context = {} ) { const { transactionManager: em } = sharedContext - const indexRepository = em!.getRepository(toMikroORMEntity(IndexData)) + const indexRepository = em!.getRepository( + toMikroORMEntity(IndexData) + ) as EntityRepository const { data: data_, entityProperties } = PostgresProvider.parseData( data, @@ -503,24 +527,22 @@ export class PostgresProvider implements IndexTypes.StorageProvider { ) await indexRepository.upsertMany( - data_.map( - (entityData) => { - return { - id: entityData.id, - name: entity, - data: entityProperties.reduce((acc, property) => { - acc[property] = entityData[property] - return acc - }, {}), - staled_at: null, - } - }, - { - onConflictAction: "merge", - onConflictFields: ["id", "name"], - onConflictMergeFields: ["data", "staled_at"], + data_.map((entityData) => { + return { + id: entityData.id, + name: entity, + data: entityProperties.reduce((acc, property) => { + acc[property] = entityData[property] + return acc + }, {}), + staled_at: null, } - ) + }), + { + onConflictAction: "merge", + onConflictFields: ["id", "name"], + onConflictMergeFields: ["data", "staled_at"], + } ) } @@ -601,7 +623,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { const indexRepository = em!.getRepository(toMikroORMEntity(IndexData)) const indexRelationRepository = em!.getRepository( toMikroORMEntity(IndexRelation) - ) + ) as EntityRepository const { data: data_, entityProperties } = PostgresProvider.parseData( data, @@ -652,77 +674,66 @@ export class PostgresProvider implements IndexTypes.StorageProvider { ) } - for (const entityData of data_) { - /** - * Clean the link entity data to only keep the properties that are defined in the schema - */ + /** + * Clean the link entity data to only keep the properties that are defined in the schema + */ - const cleanedEntityData = entityProperties.reduce((acc, property) => { + const cleanedData = data_.map((entityData) => { + return entityProperties.reduce((acc, property) => { acc[property] = entityData[property] return acc }, {}) as TData + }) - await indexRepository.upsert( - { - id: cleanedEntityData.id, - name: entity, - data: cleanedEntityData, - staled_at: null, - }, - { - onConflictAction: "merge", - onConflictFields: ["id", "name"], - onConflictMergeFields: ["data", "staled_at"], - } - ) - - /** - * Create the index relation entries for the parent entity and the child entity - */ - - await indexRelationRepository.upsert( + let relationsToUpsert: any[] = [] + const entitiesToUpdate = cleanedData.map((entityData) => { + relationsToUpsert.push( { parent_id: entityData[parentPropertyId] as string, parent_name: parentEntityName, - child_id: cleanedEntityData.id, + child_id: entityData.id, child_name: entity, pivot: `${parentEntityName}-${entity}`, staled_at: null, }, { - onConflictAction: "merge", - onConflictFields: [ - "pivot", - "parent_id", - "child_id", - "parent_name", - "child_name", - ], - onConflictMergeFields: ["staled_at"], - } - ) - - await indexRelationRepository.upsert( - { - parent_id: cleanedEntityData.id, + parent_id: entityData.id, parent_name: entity, child_id: entityData[childPropertyId] as string, child_name: childEntityName, pivot: `${entity}-${childEntityName}`, staled_at: null, - }, - { - onConflictAction: "merge", - onConflictFields: [ - "pivot", - "parent_id", - "child_id", - "parent_name", - "child_name", - ], - onConflictMergeFields: ["staled_at"], } ) + + return { + id: entityData.id, + name: entity, + data: entityData, + staled_at: null, + } + }) + + if (entitiesToUpdate.length) { + await indexRepository.upsertMany(entitiesToUpdate, { + onConflictAction: "merge", + onConflictFields: ["id", "name"], + onConflictMergeFields: ["data", "staled_at"], + }) + } + + if (relationsToUpsert.length) { + await indexRelationRepository.upsertMany(relationsToUpsert, { + onConflictAction: "merge", + onConflictFields: [ + "pivot", + "parent_id", + "child_id", + "parent_name", + "child_name", + ], + onConflictMergeFields: ["staled_at"], + }) } } diff --git a/packages/modules/index/src/utils/build-config.ts b/packages/modules/index/src/utils/build-config.ts index 6e72e51b07..d61b93ec9f 100644 --- a/packages/modules/index/src/utils/build-config.ts +++ b/packages/modules/index/src/utils/build-config.ts @@ -5,7 +5,11 @@ import { ModuleJoinerConfig, ModuleJoinerRelationship, } from "@medusajs/framework/types" -import { CommonEvents, GraphQLUtils } from "@medusajs/framework/utils" +import { + buildModuleResourceEventName, + CommonEvents, + GraphQLUtils, +} from "@medusajs/framework/utils" import { schemaObjectRepresentationPropertiesToOmit } from "@types" export const CustomDirectives = { @@ -561,9 +565,23 @@ function processEntity( intermediateEntityObjectRepresentationRef.alias = intermediateEntityAlias + intermediateEntityObjectRepresentationRef.listeners = [ - intermediateEntityName + "." + CommonEvents.CREATED, - intermediateEntityName + "." + CommonEvents.UPDATED, + buildModuleResourceEventName({ + action: CommonEvents.CREATED, + objectName: intermediateEntityName, + prefix: intermediateEntityModule.serviceName, + }), + buildModuleResourceEventName({ + action: CommonEvents.UPDATED, + objectName: intermediateEntityName, + prefix: intermediateEntityModule.serviceName, + }), + buildModuleResourceEventName({ + action: CommonEvents.DELETED, + objectName: intermediateEntityName, + prefix: intermediateEntityModule.serviceName, + }), ] intermediateEntityObjectRepresentationRef.moduleConfig = intermediateEntityModule diff --git a/packages/modules/inventory/src/services/inventory-module.ts b/packages/modules/inventory/src/services/inventory-module.ts index c470f005d4..26515a2df2 100644 --- a/packages/modules/inventory/src/services/inventory-module.ts +++ b/packages/modules/inventory/src/services/inventory-module.ts @@ -26,6 +26,7 @@ import { MedusaContext, MedusaError, MedusaService, + moduleEventBuilderFactory, Modules, partitionArray, } from "@medusajs/framework/utils" @@ -248,16 +249,15 @@ export default class InventoryModuleService const toCreate = Array.isArray(input) ? input : [input] const created = await this.createReservationItems_(toCreate, context) - context.messageAggregator?.saveRawMessageData( - created.map((reservationItem) => ({ - eventName: InventoryEvents.RESERVATION_ITEM_CREATED, - source: Modules.INVENTORY, - action: CommonEvents.CREATED, - object: "reservation-item", - context, - data: { id: reservationItem.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.CREATED, + object: "reservation-item", + source: Modules.INVENTORY, + eventName: InventoryEvents.RESERVATION_ITEM_CREATED, + })({ + data: created, + sharedContext: context, + }) const serializedReservations = await this.baseRepository_.serialize< InventoryTypes.ReservationItemDTO[] | InventoryTypes.ReservationItemDTO @@ -350,16 +350,15 @@ export default class InventoryModuleService ) const result = await this.createInventoryItems_(toCreate, context) - context.messageAggregator?.saveRawMessageData( - result.map((inventoryItem) => ({ - eventName: InventoryEvents.INVENTORY_ITEM_CREATED, - source: Modules.INVENTORY, - action: CommonEvents.CREATED, - object: "inventory-item", - context, - data: { id: inventoryItem.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.CREATED, + object: "inventory-item", + source: Modules.INVENTORY, + eventName: InventoryEvents.INVENTORY_ITEM_CREATED, + })({ + data: result, + sharedContext: context, + }) const serializedItems = await this.baseRepository_.serialize< InventoryTypes.InventoryItemDTO | InventoryTypes.InventoryItemDTO[] @@ -406,16 +405,15 @@ export default class InventoryModuleService const created = await this.createInventoryLevels_(toCreate, context) - context.messageAggregator?.saveRawMessageData( - created.map((inventoryLevel) => ({ - eventName: InventoryEvents.INVENTORY_LEVEL_CREATED, - source: Modules.INVENTORY, - action: CommonEvents.CREATED, - object: "inventory-level", - context, - data: { id: inventoryLevel.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.CREATED, + object: "inventory-level", + source: Modules.INVENTORY, + eventName: InventoryEvents.INVENTORY_LEVEL_CREATED, + })({ + data: created, + sharedContext: context, + }) const serialized = await this.baseRepository_.serialize< InventoryTypes.InventoryLevelDTO[] | InventoryTypes.InventoryLevelDTO @@ -462,16 +460,15 @@ export default class InventoryModuleService const result = await this.updateInventoryItems_(updates, context) - context.messageAggregator?.saveRawMessageData( - result.map((inventoryItem) => ({ - eventName: InventoryEvents.INVENTORY_ITEM_UPDATED, - source: Modules.INVENTORY, - action: CommonEvents.UPDATED, - object: "inventory-item", - context, - data: { id: inventoryItem.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.UPDATED, + object: "inventory-item", + source: Modules.INVENTORY, + eventName: InventoryEvents.INVENTORY_ITEM_UPDATED, + })({ + data: result, + sharedContext: context, + }) const serializedItems = await this.baseRepository_.serialize< InventoryTypes.InventoryItemDTO | InventoryTypes.InventoryItemDTO[] @@ -503,16 +500,15 @@ export default class InventoryModuleService context ) - context.messageAggregator?.saveRawMessageData( - result[0].map((inventoryLevel) => ({ - eventName: InventoryEvents.INVENTORY_LEVEL_DELETED, - source: Modules.INVENTORY, - action: CommonEvents.DELETED, - object: "inventory-level", - context, - data: { id: inventoryLevel.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.DELETED, + object: "inventory-level", + source: Modules.INVENTORY, + eventName: InventoryEvents.INVENTORY_LEVEL_DELETED, + })({ + data: result[0], + sharedContext: context, + }) return result } @@ -524,6 +520,7 @@ export default class InventoryModuleService * @param context */ @InjectTransactionManager() + @EmitEvents() async deleteInventoryLevel( inventoryItemId: string, locationId: string, @@ -535,13 +532,14 @@ export default class InventoryModuleService context ) - context.messageAggregator?.saveRawMessageData({ - eventName: InventoryEvents.INVENTORY_LEVEL_DELETED, - source: Modules.INVENTORY, + moduleEventBuilderFactory({ action: CommonEvents.DELETED, object: "inventory-level", - context, + source: Modules.INVENTORY, + eventName: InventoryEvents.INVENTORY_LEVEL_DELETED, + })({ data: { id: inventoryLevel.id }, + sharedContext: context, }) if (!inventoryLevel) { @@ -579,16 +577,15 @@ export default class InventoryModuleService const levels = await this.updateInventoryLevels_(input, context) - context.messageAggregator?.saveRawMessageData( - levels.map((inventoryLevel) => ({ - eventName: InventoryEvents.INVENTORY_LEVEL_UPDATED, - source: Modules.INVENTORY, - action: CommonEvents.UPDATED, - object: "inventory-level", - context, - data: { id: inventoryLevel.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.UPDATED, + object: "inventory-level", + source: Modules.INVENTORY, + eventName: InventoryEvents.INVENTORY_LEVEL_UPDATED, + })({ + data: levels, + sharedContext: context, + }) const updatedLevels = await this.baseRepository_.serialize< InventoryTypes.InventoryLevelDTO | InventoryTypes.InventoryLevelDTO[] @@ -662,16 +659,15 @@ export default class InventoryModuleService const update = Array.isArray(input) ? input : [input] const result = await this.updateReservationItems_(update, context) - context.messageAggregator?.saveRawMessageData( - result.map((reservationItem) => ({ - eventName: InventoryEvents.RESERVATION_ITEM_UPDATED, - source: Modules.INVENTORY, - action: CommonEvents.UPDATED, - object: "reservation-item", - context, - data: { id: reservationItem.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.UPDATED, + object: "reservation-item", + source: Modules.INVENTORY, + eventName: InventoryEvents.RESERVATION_ITEM_UPDATED, + })({ + data: result, + sharedContext: context, + }) const serialized = await this.baseRepository_.serialize< InventoryTypes.ReservationItemDTO | InventoryTypes.ReservationItemDTO[] @@ -867,16 +863,15 @@ export default class InventoryModuleService context ) - context.messageAggregator?.saveRawMessageData( - reservations.map((reservationItem) => ({ - eventName: InventoryEvents.RESERVATION_ITEM_DELETED, - source: Modules.INVENTORY, - action: CommonEvents.DELETED, - object: "reservation-item", - context, - data: { id: reservationItem.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.DELETED, + object: "reservation-item", + source: Modules.INVENTORY, + eventName: InventoryEvents.RESERVATION_ITEM_DELETED, + })({ + data: reservations, + sharedContext: context, + }) await this.adjustInventoryLevelsForReservationsDeletion( reservations, @@ -909,16 +904,15 @@ export default class InventoryModuleService context ) - context.messageAggregator?.saveRawMessageData( - reservations.map((reservationItem) => ({ - eventName: InventoryEvents.RESERVATION_ITEM_DELETED, - source: Modules.INVENTORY, - action: CommonEvents.DELETED, - object: "reservation-item", - context, - data: { id: reservationItem.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.DELETED, + object: "reservation-item", + source: Modules.INVENTORY, + eventName: InventoryEvents.RESERVATION_ITEM_DELETED, + })({ + data: reservations, + sharedContext: context, + }) } /** @@ -946,16 +940,15 @@ export default class InventoryModuleService context ) - context.messageAggregator?.saveRawMessageData( - reservations.map((reservationItem) => ({ - eventName: InventoryEvents.RESERVATION_ITEM_CREATED, - source: Modules.INVENTORY, - action: CommonEvents.CREATED, - object: "reservation-item", - context, - data: { id: reservationItem.id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.CREATED, + object: "reservation-item", + source: Modules.INVENTORY, + eventName: InventoryEvents.RESERVATION_ITEM_CREATED, + })({ + data: reservations, + sharedContext: context, + }) } /** @@ -1016,13 +1009,14 @@ export default class InventoryModuleService ) results.push(result) - context.messageAggregator?.saveRawMessageData({ - eventName: InventoryEvents.INVENTORY_LEVEL_UPDATED, - source: Modules.INVENTORY, + moduleEventBuilderFactory({ action: CommonEvents.UPDATED, object: "inventory-level", - context, + source: Modules.INVENTORY, + eventName: InventoryEvents.INVENTORY_LEVEL_UPDATED, + })({ data: { id: result.id }, + sharedContext: context, }) } diff --git a/packages/modules/link-modules/src/services/link-module-service.ts b/packages/modules/link-modules/src/services/link-module-service.ts index 7cf6e77e39..b61e56f58f 100644 --- a/packages/modules/link-modules/src/services/link-module-service.ts +++ b/packages/modules/link-modules/src/services/link-module-service.ts @@ -11,6 +11,7 @@ import { } from "@medusajs/framework/types" import { CommonEvents, + EmitEvents, InjectManager, InjectTransactionManager, isDefined, @@ -18,6 +19,7 @@ import { MapToConfig, MedusaContext, MedusaError, + moduleEventBuilderFactory, Modules, ModulesSdkUtils, } from "@medusajs/framework/utils" @@ -175,6 +177,7 @@ export default class LinkModuleService implements ILinkModule { } @InjectTransactionManager() + @EmitEvents() async create( primaryKeyOrBulkData: | string @@ -207,18 +210,15 @@ export default class LinkModuleService implements ILinkModule { const links = await this.linkService_.create(data, sharedContext) - await this.eventBusModuleService_?.emit>( - (data as { id: unknown }[]).map(({ id }) => ({ - name: this.entityName_ + "." + CommonEvents.ATTACHED, - metadata: { - source: this.serviceName_, - action: CommonEvents.ATTACHED, - object: this.entityName_, - eventGroupId: sharedContext.eventGroupId, - }, - data: { id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.ATTACHED, + object: this.entityName_, + source: this.serviceName_, + eventName: this.entityName_ + "." + CommonEvents.ATTACHED, + })({ + data: data as { id: string }[], + sharedContext, + }) return (await this.baseRepository_.serialize(links)) as unknown[] } @@ -249,6 +249,7 @@ export default class LinkModuleService implements ILinkModule { } @InjectTransactionManager() + @EmitEvents() async delete( data: any, @MedusaContext() sharedContext: Context = {} @@ -258,20 +259,19 @@ export default class LinkModuleService implements ILinkModule { await this.linkService_.delete(data, sharedContext) const allData = Array.isArray(data) ? data : [data] - await this.eventBusModuleService_?.emit>( - allData.map(({ id }) => ({ - name: this.entityName_ + "." + CommonEvents.DETACHED, - metadata: { - source: this.serviceName_, - action: CommonEvents.DETACHED, - object: this.entityName_, - eventGroupId: sharedContext.eventGroupId, - }, - data: { id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.DETACHED, + object: this.entityName_, + source: this.serviceName_, + eventName: this.entityName_ + "." + CommonEvents.DETACHED, + })({ + data: allData as { id: string }[], + sharedContext, + }) } + @InjectTransactionManager() + @EmitEvents() async softDelete( data: any, { returnLinkableKeys }: SoftDeleteReturn = {}, @@ -307,18 +307,15 @@ export default class LinkModuleService implements ILinkModule { ) } - await this.eventBusModuleService_?.emit>( - (deletedEntities as { id: string }[]).map(({ id }) => ({ - name: this.entityName_ + "." + CommonEvents.DETACHED, - metadata: { - source: this.serviceName_, - action: CommonEvents.DETACHED, - object: this.entityName_, - eventGroupId: sharedContext.eventGroupId, - }, - data: { id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.DETACHED, + object: this.entityName_, + source: this.serviceName_, + eventName: this.entityName_ + "." + CommonEvents.DETACHED, + })({ + data: deletedEntities as { id: string }[], + sharedContext, + }) return mappedCascadedEntitiesMap ? mappedCascadedEntitiesMap : void 0 } @@ -331,6 +328,8 @@ export default class LinkModuleService implements ILinkModule { return await this.linkService_.softDelete(data, sharedContext) } + @InjectTransactionManager() + @EmitEvents() async restore( data: any, { returnLinkableKeys }: RestoreReturn = {}, @@ -365,18 +364,15 @@ export default class LinkModuleService implements ILinkModule { ) } - await this.eventBusModuleService_?.emit>( - (restoredEntities as { id: string }[]).map(({ id }) => ({ - name: this.entityName_ + "." + CommonEvents.ATTACHED, - metadata: { - source: this.serviceName_, - action: CommonEvents.ATTACHED, - object: this.entityName_, - eventGroupId: sharedContext.eventGroupId, - }, - data: { id }, - })) - ) + moduleEventBuilderFactory({ + action: CommonEvents.ATTACHED, + object: this.entityName_, + source: this.serviceName_, + eventName: this.entityName_ + "." + CommonEvents.ATTACHED, + })({ + data: restoredEntities as { id: string }[], + sharedContext, + }) return mappedCascadedEntitiesMap ? mappedCascadedEntitiesMap : void 0 } @@ -388,4 +384,21 @@ export default class LinkModuleService implements ILinkModule { ): Promise<[object[], Record]> { return await this.linkService_.restore(data, sharedContext) } + + protected async emitEvents_(groupedEvents) { + if (!this.eventBusModuleService_ || !groupedEvents) { + return + } + + const promises: Promise[] = [] + for (const group of Object.keys(groupedEvents)) { + promises.push( + this.eventBusModuleService_.emit(groupedEvents[group], { + internal: true, + }) + ) + } + + await Promise.all(promises) + } } diff --git a/packages/modules/pricing/integration-tests/__tests__/services/pricing-module/price-list.spec.ts b/packages/modules/pricing/integration-tests/__tests__/services/pricing-module/price-list.spec.ts index 0ccd07305c..9b4c70efc4 100644 --- a/packages/modules/pricing/integration-tests/__tests__/services/pricing-module/price-list.spec.ts +++ b/packages/modules/pricing/integration-tests/__tests__/services/pricing-module/price-list.spec.ts @@ -461,7 +461,7 @@ moduleIntegrationTestRunner({ ) const events = eventBusEmitSpy.mock.calls[0][0] - expect(events).toHaveLength(4) + expect(events).toHaveLength(3) expect(events[0]).toEqual( composeMessage(PricingEvents.PRICE_LIST_CREATED, { source: Modules.PRICING, @@ -475,18 +475,15 @@ moduleIntegrationTestRunner({ source: Modules.PRICING, action: CommonEvents.CREATED, object: "price_list_rule", - data: { id: priceList.price_list_rules?.[0].id }, + data: { + id: [ + priceList.price_list_rules?.[0].id, + priceList.price_list_rules?.[1].id, + ], + }, }) ) expect(events[2]).toEqual( - composeMessage(PricingEvents.PRICE_LIST_RULE_CREATED, { - source: Modules.PRICING, - action: CommonEvents.CREATED, - object: "price_list_rule", - data: { id: priceList.price_list_rules?.[1].id }, - }) - ) - expect(events[3]).toEqual( composeMessage(PricingEvents.PRICE_CREATED, { source: Modules.PRICING, action: CommonEvents.CREATED, diff --git a/packages/modules/pricing/src/services/pricing-module.ts b/packages/modules/pricing/src/services/pricing-module.ts index b1d6896360..aa19b9d305 100644 --- a/packages/modules/pricing/src/services/pricing-module.ts +++ b/packages/modules/pricing/src/services/pricing-module.ts @@ -1003,12 +1003,12 @@ export default class PricingModuleService }) // Bulk create price sets - const createdPriceSets = await this.priceSetService_.create( + const priceSets = await this.priceSetService_.create( toCreate, sharedContext ) - const eventsData = createdPriceSets.reduce( + const eventsData = priceSets.reduce( (eventsData, priceSet) => { eventsData.priceSets.push({ id: priceSet.id, @@ -1051,7 +1051,7 @@ export default class PricingModuleService sharedContext, }) - return createdPriceSets + return priceSets } @InjectTransactionManager() diff --git a/packages/modules/user/integration-tests/__tests__/invite.spec.ts b/packages/modules/user/integration-tests/__tests__/invite.spec.ts index 29d5d9e8a6..5e238035ad 100644 --- a/packages/modules/user/integration-tests/__tests__/invite.spec.ts +++ b/packages/modules/user/integration-tests/__tests__/invite.spec.ts @@ -255,19 +255,11 @@ moduleIntegrationTestRunner({ expect(eventBusSpy).toHaveBeenCalledWith( [ expect.objectContaining({ - data: { id: "1" }, + data: { id: ["1", "2"] }, name: UserEvents.INVITE_CREATED, }), expect.objectContaining({ - data: { id: "2" }, - name: UserEvents.INVITE_CREATED, - }), - expect.objectContaining({ - data: { id: "1" }, - name: UserEvents.INVITE_TOKEN_GENERATED, - }), - expect.objectContaining({ - data: { id: "2" }, + data: { id: ["1", "2"] }, name: UserEvents.INVITE_TOKEN_GENERATED, }), ], diff --git a/packages/modules/user/integration-tests/__tests__/user.spec.ts b/packages/modules/user/integration-tests/__tests__/user.spec.ts index 8acb225e5d..00da40fadb 100644 --- a/packages/modules/user/integration-tests/__tests__/user.spec.ts +++ b/packages/modules/user/integration-tests/__tests__/user.spec.ts @@ -257,11 +257,7 @@ moduleIntegrationTestRunner({ expect(eventBusSpy).toHaveBeenCalledWith( [ expect.objectContaining({ - data: { id: "1" }, - name: UserEvents.USER_CREATED, - }), - expect.objectContaining({ - data: { id: "2" }, + data: { id: ["1", "2"] }, name: UserEvents.USER_CREATED, }), ], diff --git a/packages/modules/user/src/services/user-module.ts b/packages/modules/user/src/services/user-module.ts index 31e880e549..61d6b91836 100644 --- a/packages/modules/user/src/services/user-module.ts +++ b/packages/modules/user/src/services/user-module.ts @@ -16,6 +16,8 @@ import { MedusaContext, MedusaError, MedusaService, + moduleEventBuilderFactory, + Modules, UserEvents, } from "@medusajs/framework/utils" import jwt, { JwtPayload } from "jsonwebtoken" @@ -110,16 +112,15 @@ export default class UserModuleService ): Promise { const invites = await this.refreshInviteTokens_(inviteIds, sharedContext) - sharedContext.messageAggregator?.saveRawMessageData( - invites.map((invite) => ({ - eventName: UserEvents.INVITE_TOKEN_GENERATED, - source: this.constructor.name, - action: "token_generated", - object: "invite", - context: sharedContext, - data: { id: invite.id }, - })) - ) + moduleEventBuilderFactory({ + eventName: UserEvents.INVITE_TOKEN_GENERATED, + source: Modules.USER, + action: "token_generated", + object: "invite", + })({ + data: invites, + sharedContext, + }) return await this.baseRepository_.serialize( invites, @@ -193,16 +194,15 @@ export default class UserModuleService populate: true, }) - sharedContext.messageAggregator?.saveRawMessageData( - users.map((user) => ({ - eventName: UserEvents.USER_CREATED, - source: this.constructor.name, - action: CommonEvents.CREATED, - object: "user", - context: sharedContext, - data: { id: user.id }, - })) - ) + moduleEventBuilderFactory({ + eventName: UserEvents.USER_CREATED, + source: Modules.USER, + action: CommonEvents.CREATED, + object: "user", + })({ + data: serializedUsers, + sharedContext, + }) return Array.isArray(data) ? serializedUsers : serializedUsers[0] } @@ -235,16 +235,15 @@ export default class UserModuleService populate: true, }) - sharedContext.messageAggregator?.saveRawMessageData( - updatedUsers.map((user) => ({ - eventName: UserEvents.USER_UPDATED, - source: this.constructor.name, - action: CommonEvents.UPDATED, - object: "user", - context: sharedContext, - data: { id: user.id }, - })) - ) + moduleEventBuilderFactory({ + eventName: UserEvents.USER_UPDATED, + source: Modules.USER, + action: CommonEvents.UPDATED, + object: "user", + })({ + data: serializedUsers, + sharedContext, + }) return Array.isArray(data) ? serializedUsers : serializedUsers[0] } @@ -277,27 +276,25 @@ export default class UserModuleService populate: true, }) - sharedContext.messageAggregator?.saveRawMessageData( - invites.map((invite) => ({ - eventName: UserEvents.INVITE_CREATED, - source: this.constructor.name, - action: CommonEvents.CREATED, - object: "invite", - context: sharedContext, - data: { id: invite.id }, - })) - ) + moduleEventBuilderFactory({ + eventName: UserEvents.INVITE_CREATED, + source: Modules.USER, + action: CommonEvents.CREATED, + object: "invite", + })({ + data: serializedInvites, + sharedContext, + }) - sharedContext.messageAggregator?.saveRawMessageData( - invites.map((invite) => ({ - eventName: UserEvents.INVITE_TOKEN_GENERATED, - source: this.constructor.name, - action: "token_generated", - object: "invite", - context: sharedContext, - data: { id: invite.id }, - })) - ) + moduleEventBuilderFactory({ + eventName: UserEvents.INVITE_TOKEN_GENERATED, + source: Modules.USER, + action: "token_generated", + object: "invite", + })({ + data: serializedInvites, + sharedContext, + }) return Array.isArray(data) ? serializedInvites : serializedInvites[0] } @@ -364,16 +361,15 @@ export default class UserModuleService populate: true, }) - sharedContext.messageAggregator?.saveRawMessageData( - serializedInvites.map((invite) => ({ - eventName: UserEvents.INVITE_UPDATED, - source: this.constructor.name, - action: CommonEvents.UPDATED, - object: "invite", - context: sharedContext, - data: { id: invite.id }, - })) - ) + moduleEventBuilderFactory({ + eventName: UserEvents.INVITE_UPDATED, + source: Modules.USER, + action: CommonEvents.UPDATED, + object: "invite", + })({ + data: serializedInvites, + sharedContext, + }) return Array.isArray(data) ? serializedInvites : serializedInvites[0] }