diff --git a/.changeset/rude-camels-own.md b/.changeset/rude-camels-own.md new file mode 100644 index 0000000000..83b0a7e2aa --- /dev/null +++ b/.changeset/rude-camels-own.md @@ -0,0 +1,7 @@ +--- +"@medusajs/utils": patch +"@medusajs/deps": patch +"@medusajs/order": patch +--- + +chore(utils): make upsert with replace more efficient diff --git a/package.json b/package.json index a6e65bffbd..2bdfa917e9 100644 --- a/package.json +++ b/package.json @@ -61,7 +61,7 @@ "jest": "jest", "test": "turbo run test --concurrency=50% --no-daemon --no-cache --force", "test:chunk": "./scripts/run-workspace-unit-tests-in-chunks.sh", - "test:integration:packages:fast": "turbo run test:integration --concurrency=2 --no-daemon --no-cache --force --filter='./packages/medusa' --filter='./packages/modules/*' --filter='./packages/modules/providers/*' --filter='!./packages/modules/{workflow-engine-redis,index,product,order,cart}'", + "test:integration:packages:fast": "turbo run test:integration --concurrency=2 --no-daemon --no-cache --force --filter='./packages/core/*' --filter='./packages/medusa' --filter='./packages/modules/*' --filter='./packages/modules/providers/*' --filter='!./packages/modules/{workflow-engine-redis,index,product,order,cart}'", "test:integration:packages:slow": "turbo run test:integration --concurrency=2 --no-daemon --no-cache --force --filter='./packages/modules/{workflow-engine-redis,index,product,order,cart}'", "test:integration:api": "turbo run test:integration:chunk --concurrency=50% --no-daemon --no-cache --force --filter=integration-tests-api", "test:integration:http": "turbo run test:integration:chunk --concurrency=50% --no-daemon --no-cache --force --filter=integration-tests-http", diff --git a/packages/core/utils/src/dal/mikro-orm/integration-tests/__tests__/mikro-orm-repository.spec.ts b/packages/core/utils/src/dal/mikro-orm/integration-tests/__tests__/mikro-orm-repository.spec.ts index f37a5cdff7..c8b074d105 100644 --- a/packages/core/utils/src/dal/mikro-orm/integration-tests/__tests__/mikro-orm-repository.spec.ts +++ b/packages/core/utils/src/dal/mikro-orm/integration-tests/__tests__/mikro-orm-repository.spec.ts @@ -112,6 +112,15 @@ class Entity3 { @ManyToMany(() => Entity1, (entity1) => entity1.entity3) entity1 = new Collection(this) + @OneToMany(() => Entity4, (entity4) => entity4.entity3) + entity4 = new Collection(this) + + @ManyToMany(() => Entity5, "entity3", { + owner: true, + pivotTable: "entity_3_5", + }) + entity5 = new Collection(this) + @OnInit() @BeforeCreate() onInit() { @@ -121,9 +130,115 @@ class Entity3 { } } +@Entity() +class Entity4 { + @PrimaryKey() + id: string + + @Property() + title: string + + @Property() + description: string + + @Property({ nullable: true }) + deleted_at: Date | null + + @ManyToOne(() => Entity3, { + columnType: "text", + nullable: true, + mapToPk: true, + fieldName: "entity3_id", + deleteRule: "set null", + }) + entity3_id: string + + @ManyToOne(() => Entity3, { persist: false, nullable: true }) + entity3: Entity3 | null + + @OnInit() + @BeforeCreate() + onInit() { + if (!this.id) { + this.id = Math.random().toString(36).substring(7) + } + + this.entity3_id ??= this.entity3?.id! + } +} + +@Entity() +class Entity5 { + @PrimaryKey() + id: string + + @Property() + title: string + + @Property() + value: number + + @Property({ nullable: true }) + deleted_at: Date | null + + @ManyToMany(() => Entity3, (entity3) => entity3.entity5) + entity3 = new Collection(this) + + @OneToMany(() => Entity6, (entity6) => entity6.entity5) + entity6 = new Collection(this) + + @OnInit() + @BeforeCreate() + onInit() { + if (!this.id) { + this.id = Math.random().toString(36).substring(7) + } + } +} + +@Entity() +class Entity6 { + @PrimaryKey() + id: string + + @Property() + title: string + + @Property() + metadata: string + + @Property({ nullable: true }) + deleted_at: Date | null + + @ManyToOne(() => Entity5, { + columnType: "text", + nullable: true, + mapToPk: true, + fieldName: "entity5_id", + deleteRule: "set null", + }) + entity5_id: string + + @ManyToOne(() => Entity5, { persist: false, nullable: true }) + entity5: Entity5 | null + + @OnInit() + @BeforeCreate() + onInit() { + if (!this.id) { + this.id = Math.random().toString(36).substring(7) + } + + this.entity5_id ??= this.entity5?.id! + } +} + const Entity1Repository = mikroOrmBaseRepositoryFactory(Entity1) const Entity2Repository = mikroOrmBaseRepositoryFactory(Entity2) const Entity3Repository = mikroOrmBaseRepositoryFactory(Entity3) +const Entity4Repository = mikroOrmBaseRepositoryFactory(Entity4) +const Entity5Repository = mikroOrmBaseRepositoryFactory(Entity5) +const Entity6Repository = mikroOrmBaseRepositoryFactory(Entity6) describe("mikroOrmRepository", () => { let orm!: MikroORM @@ -137,6 +252,17 @@ describe("mikroOrmRepository", () => { const manager3 = () => { return new Entity3Repository({ manager: manager.fork() }) } + // @ts-expect-error + const manager4 = () => { + return new Entity4Repository({ manager: manager.fork() }) + } + const manager5 = () => { + return new Entity5Repository({ manager: manager.fork() }) + } + // @ts-expect-error + const manager6 = () => { + return new Entity6Repository({ manager: manager.fork() }) + } beforeEach(async () => { await dropDatabase( @@ -146,7 +272,7 @@ describe("mikroOrmRepository", () => { orm = await MikroORM.init( defineConfig({ - entities: [Entity1, Entity2], + entities: [Entity1, Entity2, Entity3, Entity4, Entity5, Entity6], clientUrl: getDatabaseURL(dbName), }) ) @@ -1254,6 +1380,7 @@ describe("mikroOrmRepository", () => { 'column "othertitle" of relation "entity3" does not exist' ) }) + it("should map ForeignKeyConstraintViolationException MedusaError on upsertWithReplace", async () => { const entity2 = { title: "en2", @@ -1268,5 +1395,337 @@ describe("mikroOrmRepository", () => { "You tried to set relationship entity1_id: 1, but such entity does not exist" ) }) + + it("should efficiently handle large batches with deeply nested relations", async () => { + const numParentEntities = 25 + const numEntity2PerParent = 10 + const numEntity3PerParent = 8 + const numEntity4PerEntity3 = 5 + const numEntity5PerEntity3 = 4 + const numEntity6PerEntity5 = 3 + + const entity5Manager = manager5() + const entity3Manager = manager3() + const entity1Manager = manager1() + + const qbInsertSpy = jest.fn() + const qbSelectSpy = jest.fn() + const qbDeleteSpy = jest.fn() + + const wrapManagerQb = (activeManager: any) => { + const originalQb = activeManager.qb + jest.spyOn(activeManager, "qb").mockImplementation((...args) => { + const qb = originalQb.apply(activeManager, args) + + const originalInsert = qb.insert + const originalSelect = qb.select + const originalDelete = qb.delete + + qb.insert = jest.fn((...insertArgs) => { + qbInsertSpy(...insertArgs) + return originalInsert.apply(qb, insertArgs) + }) + + qb.select = jest.fn((...selectArgs) => { + qbSelectSpy(...selectArgs) + return originalSelect.apply(qb, selectArgs) + }) + + qb.delete = jest.fn(() => { + qbDeleteSpy() + return originalDelete.apply(qb) + }) + + return qb + }) + } + + let entity5ActiveManager: any = entity5Manager.getActiveManager() + let entity3ActiveManager: any = entity3Manager.getActiveManager() + let entity1ActiveManager: any = entity1Manager.getActiveManager() + + entity5Manager.getActiveManager = jest + .fn(() => entity5ActiveManager) + .mockReturnValue(entity5ActiveManager) + entity3Manager.getActiveManager = jest + .fn(() => entity3ActiveManager) + .mockReturnValue(entity3ActiveManager) + entity1Manager.getActiveManager = jest + .fn(() => entity1ActiveManager) + .mockReturnValue(entity1ActiveManager) + + wrapManagerQb(entity5ActiveManager) + wrapManagerQb(entity3ActiveManager) + wrapManagerQb(entity1ActiveManager) + + const findSpy = jest.spyOn(entity5ActiveManager, "find") + const nativeUpdateManySpy = jest.spyOn( + manager.getDriver(), + "nativeUpdateMany" + ) + const nativeDeleteSpy = jest.spyOn(entity5ActiveManager, "nativeDelete") + + qbInsertSpy.mockClear() + qbSelectSpy.mockClear() + qbDeleteSpy.mockClear() + findSpy.mockClear() + nativeUpdateManySpy.mockClear() + nativeDeleteSpy.mockClear() + + // Create deeply nested dataset + const complexEntities = Array.from( + { length: numParentEntities }, + (_, i) => ({ + id: `parent-${i.toString().padStart(3, "0")}`, + title: `Parent Entity ${i}`, + entity2: Array.from({ length: numEntity2PerParent }, (_, j) => ({ + id: `e2-${i}-${j}`, + title: `Entity2 ${j} of Parent ${i}`, + handle: `handle-${i}-${j}`, + })), + entity3: Array.from({ length: numEntity3PerParent }, (_, k) => ({ + id: `e3-${i}-${k}`, + title: `Entity3 ${k} of Parent ${i}`, + entity4: Array.from({ length: numEntity4PerEntity3 }, (_, l) => ({ + id: `e4-${i}-${k}-${l}`, + title: `Entity4 ${l} of Entity3 ${k}`, + description: `Description for nested entity ${l}`, + })), + entity5: Array.from({ length: numEntity5PerEntity3 }, (_, m) => ({ + id: `e5-${i}-${k}-${m}`, + title: `Entity5 ${m} of Entity3 ${k}`, + value: i * 100 + k * 10 + m, + entity6: Array.from({ length: numEntity6PerEntity5 }, (_, n) => ({ + id: `e6-${i}-${k}-${m}-${n}`, + title: `Entity6 ${n} deeply nested`, + metadata: `{"parent": ${i}, "e3": ${k}, "e5": ${m}, "e6": ${n}}`, + })), + })), + })), + }) + ) + + // First: Create Entity5 with Entity6 relations + const allEntity5Data = complexEntities.flatMap((parent) => + parent.entity3.flatMap((e3) => e3.entity5) + ) + + const { performedActions: e5CreateActions } = + await entity5Manager.upsertWithReplace(allEntity5Data, { + relations: ["entity6"], + }) + + const e5SelectCalls = qbSelectSpy.mock.calls.length + const e5InsertCalls = qbInsertSpy.mock.calls.length + + expect(e5SelectCalls).toBe(2) // One for Entity5, one for Entity6 + expect(e5InsertCalls).toBe(2) // One batch insert for Entity5s, one for Entity6s + + expect(qbInsertSpy.mock.calls[0][0]).toHaveLength(800) // entity5 25 * 8 * 4 + expect(qbInsertSpy.mock.calls[1][0]).toHaveLength(2400) // entity6 25 * 8 * 4 * 3 + + findSpy.mockClear() + qbSelectSpy.mockClear() + qbInsertSpy.mockClear() + qbDeleteSpy.mockClear() + nativeUpdateManySpy.mockClear() + nativeDeleteSpy.mockClear() + + // Second: Create Entity3 with Entity4 and Entity5 relations + const allEntity3Data = complexEntities.flatMap((parent) => + parent.entity3.map((e3) => ({ + ...e3, + // Reference existing Entity5 by ID only + entity5: e3.entity5.map((e5) => ({ id: e5.id })), + })) + ) + + const { performedActions: e3CreateActions } = + await entity3Manager.upsertWithReplace(allEntity3Data, { + relations: ["entity4", "entity5"], + }) + + const e3SelectCalls = qbSelectSpy.mock.calls.length + const e3InsertCalls = qbInsertSpy.mock.calls.length + + expect(e3SelectCalls).toBe(3) // One for Entity3, one for Entity5, One pivot entity3 -> entity5 + expect(e3InsertCalls).toBe(3) // One batch insert for Entity3s, one for Entity4s and one pivot entity3 -> entity5 + + expect(qbInsertSpy.mock.calls[0][0]).toHaveLength(200) // entity3: 25 * 8 + expect(qbInsertSpy.mock.calls[1][0]).toHaveLength(800) // pivot entity3 -> entity5: 25 * 8 * 4 + expect(qbInsertSpy.mock.calls[2][0]).toHaveLength(1000) // entity4: 25 * 8 * 5 + + findSpy.mockClear() + qbSelectSpy.mockClear() + qbInsertSpy.mockClear() + qbDeleteSpy.mockClear() + nativeUpdateManySpy.mockClear() + nativeDeleteSpy.mockClear() + + // Third: Create Entity1 with all relations + const mainEntitiesData = complexEntities.map((parent) => ({ + ...parent, + // Reference existing Entity3 by ID only + entity3: parent.entity3.map((e3) => ({ id: e3.id })), + })) + + const { performedActions: mainCreateActions } = + await entity1Manager.upsertWithReplace(mainEntitiesData, { + relations: ["entity2", "entity3"], + }) + + const mainSelectCalls = qbSelectSpy.mock.calls.length + const mainInsertCalls = qbInsertSpy.mock.calls.length + + expect(mainSelectCalls).toBe(3) // One for Entity1, one for Entity3, one for Entity2 + expect(mainInsertCalls).toBe(3) // One batch insert for Entity1s, one for Entity2s, one for Entity3s + + expect(qbInsertSpy.mock.calls[0][0]).toHaveLength(25) // entity1: 25 + expect(qbInsertSpy.mock.calls[1][0]).toHaveLength(200) // entity3: 25 * 8 + expect(qbInsertSpy.mock.calls[2][0]).toHaveLength(250) // entity2: 25 * 10 + + findSpy.mockClear() + qbSelectSpy.mockClear() + qbInsertSpy.mockClear() + qbDeleteSpy.mockClear() + nativeUpdateManySpy.mockClear() + nativeDeleteSpy.mockClear() + + // Verify creation counts + expect(e5CreateActions.created[Entity5.name]).toHaveLength( + numParentEntities * numEntity3PerParent * numEntity5PerEntity3 + ) + expect(e5CreateActions.created[Entity6.name]).toHaveLength( + numParentEntities * + numEntity3PerParent * + numEntity5PerEntity3 * + numEntity6PerEntity5 + ) + expect(e3CreateActions.created[Entity3.name]).toHaveLength( + numParentEntities * numEntity3PerParent + ) + expect(e3CreateActions.created[Entity4.name]).toHaveLength( + numParentEntities * numEntity3PerParent * numEntity4PerEntity3 + ) + expect(mainCreateActions.created[Entity1.name]).toHaveLength( + numParentEntities + ) + expect(mainCreateActions.created[Entity2.name]).toHaveLength( + numParentEntities * numEntity2PerParent + ) + + // Now test complex updates + + // Modify nested structures + const updatedComplexEntities = complexEntities.map((parent, i) => ({ + ...parent, + title: `Updated ${parent.title}`, + entity2: [ + // Keep first 5 out of 10, update them + ...parent.entity2.slice(0, 5).map((e2) => ({ + ...e2, + title: `Updated ${e2.title}`, + })), + // Add new ones + { + id: `new-e2-${i}-0`, + title: `New Entity2 0`, + handle: `new-handle-${i}-0`, + }, + { + id: `new-e2-${i}-1`, + title: `New Entity2 1`, + handle: `new-handle-${i}-1`, + }, + ], + entity3: parent.entity3.slice(0, 4).map((e3) => ({ id: e3.id })), // Keep only first 4 + })) + + const { performedActions: updateActions } = + await entity1Manager.upsertWithReplace(updatedComplexEntities, { + relations: ["entity2", "entity3"], + }) + + // Validate batching for update operations + const updateSelectCalls = qbSelectSpy.mock.calls.length + const updateInsertCalls = qbInsertSpy.mock.calls.length + const updateManyCalls = nativeUpdateManySpy.mock.calls.length + + expect(updateSelectCalls).toBe(3) // Entity1, Entity2, Entity3 existence checks + + // Should use batch updates for existing entities + expect(updateManyCalls).toBe(3) // 1 for Entity1 (25), 2 for Entity2 (100+25, chunked due to 100 batch limit) + + // Should use batch inserts for new entities and pivot relationships + expect(updateInsertCalls).toBe(2) // pivot Entity1 - Entity3 (with conflict resolution) + new Entity2s + expect(qbInsertSpy.mock.calls[0][0]).toHaveLength(100) // pivot Entity1 - Entity3: 25 parents × 4 entity3s each (uses onConflict().ignore()) + expect(qbInsertSpy.mock.calls[1][0]).toHaveLength(50) // New Entity2s: 25 parents × 2 new each + + // We wont check the deletion which happen through knex directly. It will be accounted for in + // the final state verification + + // Validate that updates are batched correctly with chunking + expect(nativeUpdateManySpy.mock.calls[0][2]).toHaveLength(25) // Entity1: 25 entities + expect(nativeUpdateManySpy.mock.calls[1][2]).toHaveLength(100) // Entity2 chunk 1: 100 entities + expect(nativeUpdateManySpy.mock.calls[2][2]).toHaveLength(25) // Entity2 chunk 2: 25 entities + + // Verify updates + expect(updateActions.updated[Entity1.name]).toHaveLength( + numParentEntities + ) + expect(updateActions.updated[Entity2.name]).toHaveLength( + numParentEntities * 5 + ) // 5 updated per parent + expect(updateActions.created[Entity2.name]).toHaveLength( + numParentEntities * 2 + ) // 2 new per parent + expect(updateActions.deleted[Entity2.name]).toHaveLength( + numParentEntities * 5 + ) // 5 deleted per parent (kept first 5 out of 10, so 5 deleted) + + // Verify order preservation + updateActions.updated[Entity1.name].forEach((entity, index) => { + expect(entity.id).toBe(`parent-${index.toString().padStart(3, "0")}`) + }) + + // Verify final state + const finalEntities = await manager1().find({ + where: {}, + options: { + populate: ["entity2", "entity3"], + orderBy: { id: "ASC" } as any, + }, + }) + + expect(finalEntities).toHaveLength(numParentEntities) + finalEntities.forEach((entity, i) => { + expect(entity.title).toBe(`Updated Parent Entity ${i}`) + expect(entity.entity2).toHaveLength(7) // 5 updated + 2 new + expect(entity.entity3).toHaveLength(4) // Only first 4 kept + }) + + // Verify nested relationships still exist + const sampleEntity3 = await manager3().find({ + where: { id: "e3-0-0" }, + options: { + populate: ["entity4", "entity5"], + }, + }) + + expect(sampleEntity3).toHaveLength(1) + expect(sampleEntity3[0].entity4).toHaveLength(numEntity4PerEntity3) + expect(sampleEntity3[0].entity5).toHaveLength(numEntity5PerEntity3) + + // Verify deeply nested Entity6 relationships + const sampleEntity5 = await manager5().find({ + where: { id: "e5-0-0-0" }, + options: { + populate: ["entity6"], + }, + }) + + expect(sampleEntity5).toHaveLength(1) + expect(sampleEntity5[0].entity6).toHaveLength(numEntity6PerEntity5) + }) }) }) diff --git a/packages/core/utils/src/dal/mikro-orm/mikro-orm-create-connection.ts b/packages/core/utils/src/dal/mikro-orm/mikro-orm-create-connection.ts index 79d91cea48..0b03c041fd 100644 --- a/packages/core/utils/src/dal/mikro-orm/mikro-orm-create-connection.ts +++ b/packages/core/utils/src/dal/mikro-orm/mikro-orm-create-connection.ts @@ -96,6 +96,9 @@ export async function mikroOrmCreateConnection( driverOptions, tsNode: process.env.APP_ENV === "development", filters: database.filters ?? {}, + useBatchInserts: true, + useBatchUpdates: true, + batchSize: 100, assign: { convertCustomTypes: true, }, diff --git a/packages/core/utils/src/dal/mikro-orm/mikro-orm-repository.ts b/packages/core/utils/src/dal/mikro-orm/mikro-orm-repository.ts index 26bf8fcfdf..cee767cee2 100644 --- a/packages/core/utils/src/dal/mikro-orm/mikro-orm-repository.ts +++ b/packages/core/utils/src/dal/mikro-orm/mikro-orm-repository.ts @@ -490,9 +490,14 @@ export function mikroOrmBaseRepositoryFactory( const findOptions_ = { ...findOptions } findOptions_.options ??= {} - Object.assign(findOptions_.options, { - strategy: LoadStrategy.SELECT_IN, - }) + if (!("strategy" in findOptions_.options)) { + if (findOptions_.options.limit != null || findOptions_.options.offset) { + // TODO: from 7+ it will be the default strategy + Object.assign(findOptions_.options, { + strategy: LoadStrategy.BALANCED, + }) + } + } MikroOrmBaseRepository.compensateRelationFieldsSelectionFromLoadStrategy({ findOptions: findOptions_, @@ -535,7 +540,7 @@ export function mikroOrmBaseRepositoryFactory( let allEntities: InferRepositoryReturnType[][] = [] if (primaryKeysCriteria.length) { - allEntities = await Promise.all( + allEntities = await promiseAll( primaryKeysCriteria.map( async (criteria) => await this.find( @@ -683,46 +688,65 @@ export function mikroOrmBaseRepositoryFactory( this.mergePerformedActions(performedActions, performedActions_) - await promiseAll( - upsertedTopLevelEntities + const relationProcessingPromises: Promise[] = [] + + // Group relations by type + const relationsByType = new Map< + string, + { relation: EntityProperty; entities: any[] } + >() + + config.relations?.forEach((relationName) => { + const relation = allRelations?.find((r) => r.name === relationName) + if (!relation) return + + if ( + relation.kind === ReferenceKind.ONE_TO_ONE || + relation.kind === ReferenceKind.MANY_TO_ONE + ) { + return + } + + const entitiesForRelation = upsertedTopLevelEntities .map((entityEntry, i) => { const originalEntry = originalDataMap.get((entityEntry as any).id)! - const reconstructedEntry = reconstructedResponse[i] - - return allRelations?.map(async (relation) => { - const relationName = relation.name as keyof T - if (!config.relations?.includes(relationName)) { - return - } - - // TODO: Handle ONE_TO_ONE - // One to one and Many to one are handled outside of the assignment as they need to happen before the main entity is created - if ( - relation.kind === ReferenceKind.ONE_TO_ONE || - relation.kind === ReferenceKind.MANY_TO_ONE - ) { - return - } - - const { entities, performedActions: performedActions_ } = - await this.assignCollectionRelation_( - manager, - { ...originalEntry, id: (entityEntry as any).id }, - relation - ) - - this.mergePerformedActions(performedActions, performedActions_) - reconstructedEntry[relationName] = entities - return - }) + return { + entity: { ...originalEntry, id: (entityEntry as any).id }, + reconstructedEntry: reconstructedResponse[i], + index: i, + } }) - .flat() - ) + .filter((item) => item.entity[relationName as string] !== undefined) - // // We want to populate the identity map with the data that was written to the DB, and return an entity object - // return reconstructedResponse.map((r) => - // manager.create(entity, r, { persist: false }) - // ) + if (entitiesForRelation.length > 0) { + relationsByType.set(relationName as string, { + relation, + entities: entitiesForRelation, + }) + } + }) + + for (const [relationName, { relation, entities }] of relationsByType) { + relationProcessingPromises.push( + this.assignCollectionRelationBatch_(manager, entities, relation).then( + ({ performedActions: batchPerformedActions, entitiesResults }) => { + this.mergePerformedActions( + performedActions, + batchPerformedActions + ) + + // Update reconstructed response with results + entitiesResults.forEach( + ({ entities: relationEntities, index }) => { + reconstructedResponse[index][relationName] = relationEntities + } + ) + } + ) + ) + } + + await promiseAll(relationProcessingPromises) return { entities: reconstructedResponse, performedActions } } @@ -741,7 +765,226 @@ export function mikroOrmBaseRepositoryFactory( }) } - // FUTURE: We can make this performant by only aggregating the operations, but only executing them at the end. + /** + * Batch processing for multiple entities with same relation + */ + protected async assignCollectionRelationBatch_( + manager: SqlEntityManager, + entitiesData: Array<{ entity: T; index: number }>, + relation: EntityProperty + ): Promise<{ + performedActions: PerformedActions + entitiesResults: Array<{ entities: any[]; index: number }> + }> { + const performedActions: PerformedActions = { + created: {}, + updated: {}, + deleted: {}, + } + + const entitiesResults: Array<{ entities: any[]; index: number }> = [] + + if (relation.kind === ReferenceKind.MANY_TO_MANY) { + await this.assignManyToManyRelationBatch_( + manager, + entitiesData, + relation, + performedActions, + entitiesResults + ) + } else if (relation.kind === ReferenceKind.ONE_TO_MANY) { + await this.assignOneToManyRelationBatch_( + manager, + entitiesData, + relation, + performedActions, + entitiesResults + ) + } else { + // For other relation types, fall back to individual processing + await promiseAll( + entitiesData.map(async ({ entity, index }) => { + const { entities, performedActions: individualActions } = + await this.assignCollectionRelation_(manager, entity, relation) + this.mergePerformedActions(performedActions, individualActions) + entitiesResults.push({ entities, index }) + }) + ) + } + + return { performedActions, entitiesResults } + } + + /** + * Batch processing for many-to-many relations + */ + private async assignManyToManyRelationBatch_( + manager: SqlEntityManager, + entitiesData: Array<{ entity: T; index: number }>, + relation: EntityProperty, + performedActions: PerformedActions, + entitiesResults: Array<{ entities: any[]; index: number }> + ): Promise { + const currentPivotColumn = relation.inverseJoinColumns[0] + const parentPivotColumn = relation.joinColumns[0] + + // Collect all relation data and normalize it + const allNormalizedData: any[] = [] + const entityRelationMap = new Map() + const entitiesToDeletePivots: string[] = [] + + for (const { entity, index } of entitiesData) { + const dataForRelation = entity[relation.name] + + if (dataForRelation === undefined) { + entitiesResults.push({ entities: [], index }) + continue + } + + if (!dataForRelation.length) { + entitiesToDeletePivots.push((entity as any).id) + entitiesResults.push({ entities: [], index }) + continue + } + + const normalizedData = dataForRelation.map((item: any) => + this.getEntityWithId(manager, relation.type, item) + ) + + allNormalizedData.push(...normalizedData) + entityRelationMap.set((entity as any).id, normalizedData) + entitiesResults.push({ entities: normalizedData, index }) + } + + // Batch delete empty pivot relations + if (entitiesToDeletePivots.length) { + await manager.nativeDelete(relation.pivotEntity, { + [parentPivotColumn]: { $in: entitiesToDeletePivots }, + }) + } + + if (allNormalizedData.length) { + const { performedActions: upsertActions } = await this.upsertMany_( + manager, + relation.type, + allNormalizedData, + true + ) + this.mergePerformedActions(performedActions, upsertActions) + + // Collect all pivot data for batch operations + const allPivotData: any[] = [] + const allParentIds: string[] = [] + + for (const [parentId, normalizedData] of entityRelationMap) { + allParentIds.push(parentId) + const pivotData = normalizedData.map((currModel) => ({ + [parentPivotColumn]: parentId, + [currentPivotColumn]: currModel.id, + })) + allPivotData.push(...pivotData) + } + + // Batch insert and delete pivot table entries + await promiseAll([ + manager + .qb(relation.pivotEntity) + .insert(allPivotData) + .onConflict() + .ignore() + .execute(), + manager.nativeDelete(relation.pivotEntity, { + [parentPivotColumn]: { $in: allParentIds }, + [currentPivotColumn]: { + $nin: allPivotData.map((d) => d[currentPivotColumn]), + }, + }), + ]) + } + } + + /** + * Batch processing for one-to-many relations + */ + private async assignOneToManyRelationBatch_( + manager: SqlEntityManager, + entitiesData: Array<{ entity: T; index: number }>, + relation: EntityProperty, + performedActions: PerformedActions, + entitiesResults: Array<{ entities: any[]; index: number }> + ): Promise { + const joinColumns = + relation.targetMeta?.properties[relation.mappedBy]?.joinColumns + + // Collect all relation data and constraints + const allNormalizedData: any[] = [] + const allJoinConstraints: any[] = [] + const allEntityIds: string[] = [] + + for (const { entity, index } of entitiesData) { + const dataForRelation = entity[relation.name] + + if (dataForRelation === undefined) { + entitiesResults.push({ entities: [], index }) + continue + } + + const joinColumnsConstraints = {} + joinColumns?.forEach((joinColumn, index) => { + const referencedColumnName = relation.referencedColumnNames[index] + joinColumnsConstraints[joinColumn] = entity[referencedColumnName] + }) + + const normalizedData = dataForRelation.map((item: any) => { + const normalized = this.getEntityWithId(manager, relation.type, item) + return { ...normalized, ...joinColumnsConstraints } + }) + + allNormalizedData.push(...normalizedData) + allJoinConstraints.push(joinColumnsConstraints) + allEntityIds.push(...normalizedData.map((d: any) => d.id)) + entitiesResults.push({ entities: normalizedData, index }) + } + + // Batch delete orphaned relations + if (allJoinConstraints.length) { + const deletedRelations = await ( + manager.getTransactionContext() ?? manager.getKnex() + ) + .queryBuilder() + .from(relation.targetMeta!.collection) + .delete() + .where((builder) => { + allJoinConstraints.forEach((constraints, index) => { + if (index === 0) { + builder.where(constraints) + } else { + builder.orWhere(constraints) + } + }) + }) + .whereNotIn("id", allEntityIds) + .returning("id") + + if (deletedRelations.length) { + performedActions.deleted[relation.type] ??= [] + performedActions.deleted[relation.type].push( + ...deletedRelations.map((row) => ({ id: row.id })) + ) + } + } + + // Batch upsert all relation entities + if (allNormalizedData.length) { + const { performedActions: upsertActions } = await this.upsertMany_( + manager, + relation.type, + allNormalizedData + ) + this.mergePerformedActions(performedActions, upsertActions) + } + } + protected async assignCollectionRelation_( manager: SqlEntityManager, data: T, @@ -943,41 +1186,68 @@ export function mikroOrmBaseRepositoryFactory( entries: any[], skipUpdate: boolean = false ): Promise<{ orderedEntities: any[]; performedActions: PerformedActions }> { - const selectQb = manager.qb(entityName) - const existingEntities: any[] = await selectQb.select("*").where({ - id: { $in: entries.map((d) => d.id) }, + if (!entries.length) { + return { + orderedEntities: [], + performedActions: { created: {}, updated: {}, deleted: {} }, + } + } + + const uniqueEntriesMap = new Map() + const orderedUniqueEntries: any[] = [] + + entries.forEach((entry) => { + if (!uniqueEntriesMap.has(entry.id)) { + uniqueEntriesMap.set(entry.id, entry) + orderedUniqueEntries.push(entry) + } }) - const existingEntitiesMap = new Map( - existingEntities.map((e) => [e.id, e]) - ) + const existingEntitiesMap = new Map() + + if (orderedUniqueEntries.some((e) => e.id)) { + const existingEntities: any[] = await manager + .qb(entityName) + .select("id") + .where({ + id: { $in: orderedUniqueEntries.map((d) => d.id).filter(Boolean) }, + }) + + existingEntities.forEach((e) => { + existingEntitiesMap.set(e.id, e) + }) + } const orderedEntities: T[] = [] - const performedActions = { created: {}, updated: {}, deleted: {}, } - const promises: Promise[] = [] const toInsert: any[] = [] const toUpdate: any[] = [] + const insertOrderMap = new Map() + const updateOrderMap = new Map() - entries.forEach((data) => { + // Single pass to categorize operations while preserving order + orderedUniqueEntries.forEach((data, index) => { const existingEntity = existingEntitiesMap.get(data.id) orderedEntities.push(data) - if (existingEntity) { - if (skipUpdate) { - return - } - toUpdate.push(data) + if (existingEntity) { + if (!skipUpdate) { + toUpdate.push(data) + updateOrderMap.set(data.id, index) + } } else { toInsert.push(data) + insertOrderMap.set(data.id, index) } }) + const promises: Promise[] = [] + if (toInsert.length > 0) { let insertQb = manager.qb(entityName).insert(toInsert).returning("id") @@ -988,32 +1258,63 @@ export function mikroOrmBaseRepositoryFactory( promises.push( insertQb.execute("all", true).then((res: { id: string }[]) => { performedActions.created[entityName] ??= [] - performedActions.created[entityName].push( - ...res.map((data) => ({ id: data.id })) - ) + + // Sort created entities by their original insertion order + const sortedCreated = res + .map((data) => ({ + ...data, + order: insertOrderMap.get(data.id) ?? Number.MAX_SAFE_INTEGER, + })) + .sort((a, b) => a.order - b.order) + .map(({ order, ...data }) => data) + + performedActions.created[entityName].push(...sortedCreated) }) ) } if (toUpdate.length > 0) { - promises.push( - manager - .getDriver() - .nativeUpdateMany( - entityName, - toUpdate.map((d) => ({ id: d.id })), - toUpdate, - { ctx: manager.getTransactionContext() } - ) - .then((res) => { - const updatedRows = res.rows ?? [] - const updatedRowsMap = new Map(updatedRows.map((d) => [d.id, d])) + // Use batch update but maintain order + const batchSize = 100 // Process in chunks to avoid query size limits + const updatePromises: Promise[] = [] + const allUpdatedEntities: Array<{ id: string; order: number }> = [] - performedActions.updated[entityName] = toUpdate - .map((d) => updatedRowsMap.get(d.id)) - .filter((row) => row !== undefined) - .map((d) => ({ id: d.id })) - }) + for (let i = 0; i < toUpdate.length; i += batchSize) { + const chunk = toUpdate.slice(i, i + batchSize) + + updatePromises.push( + manager + .getDriver() + .nativeUpdateMany( + entityName, + chunk.map((d) => ({ id: d.id })), + chunk, + { ctx: manager.getTransactionContext() } + ) + .then((res) => { + const updatedRows = res.rows ?? [] + + // Add order information for sorting later + const orderedUpdated = updatedRows.map((d: any) => ({ + id: d.id, + order: updateOrderMap.get(d.id) ?? Number.MAX_SAFE_INTEGER, + })) + + allUpdatedEntities.push(...orderedUpdated) + }) + ) + } + + promises.push( + promiseAll(updatePromises).then(() => { + // Sort all updated entities by their original order and add to performedActions + performedActions.updated[entityName] ??= [] + const sortedUpdated = allUpdatedEntities + .sort((a, b) => a.order - b.order) + .map(({ order, ...data }) => data) + + performedActions.updated[entityName].push(...sortedUpdated) + }) ) } diff --git a/packages/core/utils/src/dml/integration-tests/__tests__/entity.spec.ts b/packages/core/utils/src/dml/integration-tests/__tests__/entity.spec.ts index a8b1424072..d820b041a0 100644 --- a/packages/core/utils/src/dml/integration-tests/__tests__/entity.spec.ts +++ b/packages/core/utils/src/dml/integration-tests/__tests__/entity.spec.ts @@ -85,7 +85,7 @@ describe("EntityBuilder", () => { id: user1.id, }) - expect(await mikroOrmSerializer(user)).toEqual({ + expect(mikroOrmSerializer(user)).toEqual({ id: user1.id, username: "User 1", points: 0, @@ -96,7 +96,8 @@ describe("EntityBuilder", () => { }) }) - it("set the points to null when explicitly set to null", async () => { + // TODO: Remove skip after upgrade the latest MikroORM version once https://github.com/mikro-orm/mikro-orm/pull/6880 is merged + it.skip("set the points to null when explicitly set to null", async () => { let manager = orm.em.fork() const user1 = manager.create(User, { @@ -112,7 +113,7 @@ describe("EntityBuilder", () => { id: user1.id, }) - expect(await mikroOrmSerializer(user)).toEqual({ + expect(mikroOrmSerializer(user)).toEqual({ id: user1.id, username: "User 1", points: null, @@ -140,7 +141,7 @@ describe("EntityBuilder", () => { const user = await manager.findOne(User, { id: user1.id, }) - expect(await mikroOrmSerializer(user)).toEqual({ + expect(mikroOrmSerializer(user)).toEqual({ id: user1.id, username: "User 1", points: null, @@ -167,7 +168,7 @@ describe("EntityBuilder", () => { id: user1.id, }) - expect(await mikroOrmSerializer(user)).toEqual({ + expect(mikroOrmSerializer(user)).toEqual({ id: user1.id, username: "User 1", points: 0, diff --git a/packages/deps/package.json b/packages/deps/package.json index e3be467a99..3e89efdc12 100644 --- a/packages/deps/package.json +++ b/packages/deps/package.json @@ -33,11 +33,11 @@ "build": "rimraf dist && tsc --build" }, "dependencies": { - "@mikro-orm/cli": "6.5.4", - "@mikro-orm/core": "6.5.4", - "@mikro-orm/knex": "6.5.4", - "@mikro-orm/migrations": "6.5.4", - "@mikro-orm/postgresql": "6.5.4", + "@mikro-orm/cli": "6.5.5", + "@mikro-orm/core": "6.5.5", + "@mikro-orm/knex": "6.5.5", + "@mikro-orm/migrations": "6.5.5", + "@mikro-orm/postgresql": "6.5.5", "awilix": "^8.0.1", "pg": "^8.16.3" }, diff --git a/packages/modules/order/src/services/order-module-service.ts b/packages/modules/order/src/services/order-module-service.ts index 5211e52864..f60369f075 100644 --- a/packages/modules/order/src/services/order-module-service.ts +++ b/packages/modules/order/src/services/order-module-service.ts @@ -1,3 +1,4 @@ +import { BeforeCreate, OnInit } from "@medusajs/framework/mikro-orm/core" import { BigNumberInput, Context, @@ -43,7 +44,6 @@ import { toMikroORMEntity, transformPropertiesToBigNumber, } from "@medusajs/framework/utils" -import { BeforeCreate, OnInit, rel } from "@medusajs/framework/mikro-orm/core" import { Order, OrderAddress, @@ -145,13 +145,19 @@ const generateMethodForModels = { { const MikroORMEntity = toMikroORMEntity(OrderChangeAction) MikroORMEntity.prototype["onInit_OrderChangeAction"] = function () { - this.version ??= this.order_change?.version ?? null + if (this.order_change) { + this.version ??= this.order_change.version ?? null - this.order_id ??= this.order_change?.order_id ?? null - this.claim_id ??= this.order_change?.claim_id ?? null - this.exchange_id ??= this.order_change?.exchange_id ?? null + this.order_id ??= this.order_change.order_id ?? null + this.claim_id ??= this.order_change.claim_id ?? null + this.exchange_id ??= this.order_change.exchange_id ?? null + } - if (!this.claim_id && !this.exchange_id) { + if ( + !this.claim_id && + !this.exchange_id && + (this.return || this.order_change) + ) { this.return_id = this.return?.id ?? this.order_change?.return_id ?? null } } @@ -161,19 +167,9 @@ const generateMethodForModels = { { const MikroORMEntity = toMikroORMEntity(OrderShipping) MikroORMEntity.prototype["onInit_OrderShipping"] = function () { - this.version ??= this.order?.version ?? null - - this.order ??= rel(toMikroORMEntity(Order), this.order?.id ?? null) - this.return ??= rel(toMikroORMEntity(Return), this.return?.id ?? null) - this.claim ??= rel(toMikroORMEntity(OrderClaim), this.claim?.id ?? null) - this.exchange ??= rel( - toMikroORMEntity(OrderExchange), - this.exchange?.id ?? null - ) - this.shipping_method ??= rel( - toMikroORMEntity(OrderShippingMethod), - this.shipping_method?.id ?? null - ) + if (this.order) { + this.version ??= this.order.version ?? null + } } OnInit()(MikroORMEntity.prototype, "onInit_OrderShipping") BeforeCreate()(MikroORMEntity.prototype, "onInit_OrderShipping") @@ -181,10 +177,9 @@ const generateMethodForModels = { { const MikroORMEntity = toMikroORMEntity(OrderItem) MikroORMEntity.prototype["onInit_OrderItem"] = function () { - this.version ??= this.order?.version ?? null - - this.order ??= rel(toMikroORMEntity(Order), this.order?.id ?? null) - this.item ??= rel(toMikroORMEntity(OrderLineItem), this.item?.id ?? null) + if (this.order) { + this.version ??= this.order.version ?? null + } } OnInit()(MikroORMEntity.prototype, "onInit_OrderItem") BeforeCreate()(MikroORMEntity.prototype, "onInit_OrderItem") diff --git a/yarn.lock b/yarn.lock index da3efec530..717647a3c9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6391,11 +6391,11 @@ __metadata: version: 0.0.0-use.local resolution: "@medusajs/deps@workspace:packages/deps" dependencies: - "@mikro-orm/cli": 6.5.4 - "@mikro-orm/core": 6.5.4 - "@mikro-orm/knex": 6.5.4 - "@mikro-orm/migrations": 6.5.4 - "@mikro-orm/postgresql": 6.5.4 + "@mikro-orm/cli": 6.5.5 + "@mikro-orm/core": 6.5.5 + "@mikro-orm/knex": 6.5.5 + "@mikro-orm/migrations": 6.5.5 + "@mikro-orm/postgresql": 6.5.5 "@swc/core": ^1.7.28 awilix: ^8.0.1 pg: ^8.16.3 @@ -7543,41 +7543,41 @@ __metadata: languageName: unknown linkType: soft -"@mikro-orm/cli@npm:6.5.4": - version: 6.5.4 - resolution: "@mikro-orm/cli@npm:6.5.4" +"@mikro-orm/cli@npm:6.5.5": + version: 6.5.5 + resolution: "@mikro-orm/cli@npm:6.5.5" dependencies: "@jercle/yargonaut": 1.1.5 - "@mikro-orm/core": 6.5.4 - "@mikro-orm/knex": 6.5.4 + "@mikro-orm/core": 6.5.5 + "@mikro-orm/knex": 6.5.5 fs-extra: 11.3.2 tsconfig-paths: 4.2.0 yargs: 17.7.2 bin: mikro-orm: ./cli mikro-orm-esm: ./esm - checksum: 828694dcb8a1c5b8c12e0e0610032bd772f085f45ffa5fc13be5c87c1f28f13539b56204151588af0ab6656cba1dce3fb20966ac03d50815f315ba66bc240acb + checksum: 064986fd89bd893bd1025ea4898699983d424d540fcea48cecbe5bc224d37fc29e969f62f79d535a3bf7024d415a4ca0a4bd5f454d2b27ea370b0d02b364599d languageName: node linkType: hard -"@mikro-orm/core@npm:6.5.4": - version: 6.5.4 - resolution: "@mikro-orm/core@npm:6.5.4" +"@mikro-orm/core@npm:6.5.5": + version: 6.5.5 + resolution: "@mikro-orm/core@npm:6.5.5" dependencies: dataloader: 2.2.3 dotenv: 17.2.2 esprima: 4.0.1 fs-extra: 11.3.2 globby: 11.1.0 - mikro-orm: 6.5.4 + mikro-orm: 6.5.5 reflect-metadata: 0.2.2 - checksum: 20058d8b02afbb73ec850451bf1a388c3e135e99045bab9d268a6375617140d9b4adddcc9064564ee2e55a8d56fd9112ad56349745daf372373fefb6115f18cb + checksum: 2ffe944b2e5f288aab10173789dbb5f96954be307d5d5f313856859c809982ecd9f521ea68e151772a880861b902713fff2637f80303b8ce7025db181c392de2 languageName: node linkType: hard -"@mikro-orm/knex@npm:6.5.4": - version: 6.5.4 - resolution: "@mikro-orm/knex@npm:6.5.4" +"@mikro-orm/knex@npm:6.5.5": + version: 6.5.5 + resolution: "@mikro-orm/knex@npm:6.5.5" dependencies: fs-extra: 11.3.2 knex: 3.1.0 @@ -7594,35 +7594,35 @@ __metadata: optional: true mariadb: optional: true - checksum: e8effea42422ceefec2ce50b74c493a5919e702f39f3167cf3f431f73d9a27d0f77800e20f961302257ec92afb5010068b35cb9632a114e086cf78306fd290b4 + checksum: 0d5b920f2181cd4f1921f2624e07f6401b1d0c434125d7c7db2bab81514f8513fbb2d498292784db2e1fce4cf4a0642b817ae417ae0783081b2c0f7b02437257 languageName: node linkType: hard -"@mikro-orm/migrations@npm:6.5.4": - version: 6.5.4 - resolution: "@mikro-orm/migrations@npm:6.5.4" +"@mikro-orm/migrations@npm:6.5.5": + version: 6.5.5 + resolution: "@mikro-orm/migrations@npm:6.5.5" dependencies: - "@mikro-orm/knex": 6.5.4 + "@mikro-orm/knex": 6.5.5 fs-extra: 11.3.2 umzug: 3.8.2 peerDependencies: "@mikro-orm/core": ^6.0.0 - checksum: 345aea93fb5652f8c78c057d89e55fc2b8f106fad65ac44ee7d39c1f85b3ae402c35dacba0cf2cc5ace1a0abb2074a1de1b45c55e79bbaee2217744df32542bc + checksum: 4441d3505575cffc272666e8a9b85332335163275ce997c7ece36ba10e986d5392e5643a7e3262a2185658b187082d9c549fb45ec4553d8336f949cfc13ff27d languageName: node linkType: hard -"@mikro-orm/postgresql@npm:6.5.4": - version: 6.5.4 - resolution: "@mikro-orm/postgresql@npm:6.5.4" +"@mikro-orm/postgresql@npm:6.5.5": + version: 6.5.5 + resolution: "@mikro-orm/postgresql@npm:6.5.5" dependencies: - "@mikro-orm/knex": 6.5.4 + "@mikro-orm/knex": 6.5.5 pg: 8.16.3 postgres-array: 3.0.4 postgres-date: 2.1.0 postgres-interval: 4.0.2 peerDependencies: "@mikro-orm/core": ^6.0.0 - checksum: a5102550998041ab6b91c528a8a8a69d06f1d851194679c0b27ff54d433798089417555b44e2371caab056eaa84e8865bd4dbd5eecacb04f1c0ae8304aa5a2df + checksum: 72c63aee694dd4606a726c69f504c9680bf700f2954e45d52bba93dd68ab96d71414ec63be16b444d6a151794e56da4c010b724f5a4b3299c646c582d1b617b9 languageName: node linkType: hard @@ -27107,10 +27107,10 @@ __metadata: languageName: node linkType: hard -"mikro-orm@npm:6.5.4": - version: 6.5.4 - resolution: "mikro-orm@npm:6.5.4" - checksum: f2ee9bc9598d88ca086f34d26283355ddd48b1a2095ce90d0c59b8d629309d28759b327dd2dedbea4c72c3bcb189ba37d8a24013edce8402a711e640b138c6d4 +"mikro-orm@npm:6.5.5": + version: 6.5.5 + resolution: "mikro-orm@npm:6.5.5" + checksum: eca6afcac7deeced740c650728ec86b80a627b7631c813eb1076eef8b1618e9aff0adc82f0e7ba57f6d9685bb71058d648ecd5b52fcfe0c32fb0cc0483fded08 languageName: node linkType: hard