chore(utils): make upsert with replace more efficient (#13580)

PARTIALLY RESOLVES CORE-1156

**What**
Improve upsertWithReplace to batch as much as possible what can be batched. Performance of this method will be much greater specially for cases with maybe entities and batch (e.g we seen many cases where they bulk product with hundreds variants and options etc)
for example let take the following object:
- entity 1
  - entity 2 []
    - entity 3 []
  - entity 2 []
    - entity 3 []

here all entity 3 will be batched and all entity 2 will be batched

I ve also added a pretty detail test that check all the stage and what is batched or not with many comments so that it is less harder to consume and remember in the future


Also includes:
- mikro orm upgade (issues found and fixes)
- order module hooks fixes

**NOTE**
It was easier for now to do this instead of rewriting the different areas where it is being used, also, maybe it means that we will have closer performance to what we would expect to have natively

**NOTE 2**
Also fix the fact that integration tests of the core packages never ran 😂
This commit is contained in:
Adrien de Peretti
2025-09-26 10:06:43 +02:00
committed by GitHub
parent 5ea32aaa44
commit fc67fd0b36
9 changed files with 910 additions and 144 deletions

View File

@@ -0,0 +1,7 @@
---
"@medusajs/utils": patch
"@medusajs/deps": patch
"@medusajs/order": patch
---
chore(utils): make upsert with replace more efficient

View File

@@ -61,7 +61,7 @@
"jest": "jest",
"test": "turbo run test --concurrency=50% --no-daemon --no-cache --force",
"test:chunk": "./scripts/run-workspace-unit-tests-in-chunks.sh",
"test:integration:packages:fast": "turbo run test:integration --concurrency=2 --no-daemon --no-cache --force --filter='./packages/medusa' --filter='./packages/modules/*' --filter='./packages/modules/providers/*' --filter='!./packages/modules/{workflow-engine-redis,index,product,order,cart}'",
"test:integration:packages:fast": "turbo run test:integration --concurrency=2 --no-daemon --no-cache --force --filter='./packages/core/*' --filter='./packages/medusa' --filter='./packages/modules/*' --filter='./packages/modules/providers/*' --filter='!./packages/modules/{workflow-engine-redis,index,product,order,cart}'",
"test:integration:packages:slow": "turbo run test:integration --concurrency=2 --no-daemon --no-cache --force --filter='./packages/modules/{workflow-engine-redis,index,product,order,cart}'",
"test:integration:api": "turbo run test:integration:chunk --concurrency=50% --no-daemon --no-cache --force --filter=integration-tests-api",
"test:integration:http": "turbo run test:integration:chunk --concurrency=50% --no-daemon --no-cache --force --filter=integration-tests-http",

View File

@@ -112,6 +112,15 @@ class Entity3 {
@ManyToMany(() => Entity1, (entity1) => entity1.entity3)
entity1 = new Collection<Entity1>(this)
@OneToMany(() => Entity4, (entity4) => entity4.entity3)
entity4 = new Collection<Entity4>(this)
@ManyToMany(() => Entity5, "entity3", {
owner: true,
pivotTable: "entity_3_5",
})
entity5 = new Collection<Entity5>(this)
@OnInit()
@BeforeCreate()
onInit() {
@@ -121,9 +130,115 @@ class Entity3 {
}
}
@Entity()
class Entity4 {
@PrimaryKey()
id: string
@Property()
title: string
@Property()
description: string
@Property({ nullable: true })
deleted_at: Date | null
@ManyToOne(() => Entity3, {
columnType: "text",
nullable: true,
mapToPk: true,
fieldName: "entity3_id",
deleteRule: "set null",
})
entity3_id: string
@ManyToOne(() => Entity3, { persist: false, nullable: true })
entity3: Entity3 | null
@OnInit()
@BeforeCreate()
onInit() {
if (!this.id) {
this.id = Math.random().toString(36).substring(7)
}
this.entity3_id ??= this.entity3?.id!
}
}
@Entity()
class Entity5 {
@PrimaryKey()
id: string
@Property()
title: string
@Property()
value: number
@Property({ nullable: true })
deleted_at: Date | null
@ManyToMany(() => Entity3, (entity3) => entity3.entity5)
entity3 = new Collection<Entity3>(this)
@OneToMany(() => Entity6, (entity6) => entity6.entity5)
entity6 = new Collection<Entity6>(this)
@OnInit()
@BeforeCreate()
onInit() {
if (!this.id) {
this.id = Math.random().toString(36).substring(7)
}
}
}
@Entity()
class Entity6 {
@PrimaryKey()
id: string
@Property()
title: string
@Property()
metadata: string
@Property({ nullable: true })
deleted_at: Date | null
@ManyToOne(() => Entity5, {
columnType: "text",
nullable: true,
mapToPk: true,
fieldName: "entity5_id",
deleteRule: "set null",
})
entity5_id: string
@ManyToOne(() => Entity5, { persist: false, nullable: true })
entity5: Entity5 | null
@OnInit()
@BeforeCreate()
onInit() {
if (!this.id) {
this.id = Math.random().toString(36).substring(7)
}
this.entity5_id ??= this.entity5?.id!
}
}
const Entity1Repository = mikroOrmBaseRepositoryFactory(Entity1)
const Entity2Repository = mikroOrmBaseRepositoryFactory(Entity2)
const Entity3Repository = mikroOrmBaseRepositoryFactory(Entity3)
const Entity4Repository = mikroOrmBaseRepositoryFactory(Entity4)
const Entity5Repository = mikroOrmBaseRepositoryFactory(Entity5)
const Entity6Repository = mikroOrmBaseRepositoryFactory(Entity6)
describe("mikroOrmRepository", () => {
let orm!: MikroORM
@@ -137,6 +252,17 @@ describe("mikroOrmRepository", () => {
const manager3 = () => {
return new Entity3Repository({ manager: manager.fork() })
}
// @ts-expect-error
const manager4 = () => {
return new Entity4Repository({ manager: manager.fork() })
}
const manager5 = () => {
return new Entity5Repository({ manager: manager.fork() })
}
// @ts-expect-error
const manager6 = () => {
return new Entity6Repository({ manager: manager.fork() })
}
beforeEach(async () => {
await dropDatabase(
@@ -146,7 +272,7 @@ describe("mikroOrmRepository", () => {
orm = await MikroORM.init(
defineConfig({
entities: [Entity1, Entity2],
entities: [Entity1, Entity2, Entity3, Entity4, Entity5, Entity6],
clientUrl: getDatabaseURL(dbName),
})
)
@@ -1254,6 +1380,7 @@ describe("mikroOrmRepository", () => {
'column "othertitle" of relation "entity3" does not exist'
)
})
it("should map ForeignKeyConstraintViolationException MedusaError on upsertWithReplace", async () => {
const entity2 = {
title: "en2",
@@ -1268,5 +1395,337 @@ describe("mikroOrmRepository", () => {
"You tried to set relationship entity1_id: 1, but such entity does not exist"
)
})
it("should efficiently handle large batches with deeply nested relations", async () => {
const numParentEntities = 25
const numEntity2PerParent = 10
const numEntity3PerParent = 8
const numEntity4PerEntity3 = 5
const numEntity5PerEntity3 = 4
const numEntity6PerEntity5 = 3
const entity5Manager = manager5()
const entity3Manager = manager3()
const entity1Manager = manager1()
const qbInsertSpy = jest.fn()
const qbSelectSpy = jest.fn()
const qbDeleteSpy = jest.fn()
const wrapManagerQb = (activeManager: any) => {
const originalQb = activeManager.qb
jest.spyOn(activeManager, "qb").mockImplementation((...args) => {
const qb = originalQb.apply(activeManager, args)
const originalInsert = qb.insert
const originalSelect = qb.select
const originalDelete = qb.delete
qb.insert = jest.fn((...insertArgs) => {
qbInsertSpy(...insertArgs)
return originalInsert.apply(qb, insertArgs)
})
qb.select = jest.fn((...selectArgs) => {
qbSelectSpy(...selectArgs)
return originalSelect.apply(qb, selectArgs)
})
qb.delete = jest.fn(() => {
qbDeleteSpy()
return originalDelete.apply(qb)
})
return qb
})
}
let entity5ActiveManager: any = entity5Manager.getActiveManager()
let entity3ActiveManager: any = entity3Manager.getActiveManager()
let entity1ActiveManager: any = entity1Manager.getActiveManager()
entity5Manager.getActiveManager = jest
.fn(() => entity5ActiveManager)
.mockReturnValue(entity5ActiveManager)
entity3Manager.getActiveManager = jest
.fn(() => entity3ActiveManager)
.mockReturnValue(entity3ActiveManager)
entity1Manager.getActiveManager = jest
.fn(() => entity1ActiveManager)
.mockReturnValue(entity1ActiveManager)
wrapManagerQb(entity5ActiveManager)
wrapManagerQb(entity3ActiveManager)
wrapManagerQb(entity1ActiveManager)
const findSpy = jest.spyOn(entity5ActiveManager, "find")
const nativeUpdateManySpy = jest.spyOn(
manager.getDriver(),
"nativeUpdateMany"
)
const nativeDeleteSpy = jest.spyOn(entity5ActiveManager, "nativeDelete")
qbInsertSpy.mockClear()
qbSelectSpy.mockClear()
qbDeleteSpy.mockClear()
findSpy.mockClear()
nativeUpdateManySpy.mockClear()
nativeDeleteSpy.mockClear()
// Create deeply nested dataset
const complexEntities = Array.from(
{ length: numParentEntities },
(_, i) => ({
id: `parent-${i.toString().padStart(3, "0")}`,
title: `Parent Entity ${i}`,
entity2: Array.from({ length: numEntity2PerParent }, (_, j) => ({
id: `e2-${i}-${j}`,
title: `Entity2 ${j} of Parent ${i}`,
handle: `handle-${i}-${j}`,
})),
entity3: Array.from({ length: numEntity3PerParent }, (_, k) => ({
id: `e3-${i}-${k}`,
title: `Entity3 ${k} of Parent ${i}`,
entity4: Array.from({ length: numEntity4PerEntity3 }, (_, l) => ({
id: `e4-${i}-${k}-${l}`,
title: `Entity4 ${l} of Entity3 ${k}`,
description: `Description for nested entity ${l}`,
})),
entity5: Array.from({ length: numEntity5PerEntity3 }, (_, m) => ({
id: `e5-${i}-${k}-${m}`,
title: `Entity5 ${m} of Entity3 ${k}`,
value: i * 100 + k * 10 + m,
entity6: Array.from({ length: numEntity6PerEntity5 }, (_, n) => ({
id: `e6-${i}-${k}-${m}-${n}`,
title: `Entity6 ${n} deeply nested`,
metadata: `{"parent": ${i}, "e3": ${k}, "e5": ${m}, "e6": ${n}}`,
})),
})),
})),
})
)
// First: Create Entity5 with Entity6 relations
const allEntity5Data = complexEntities.flatMap((parent) =>
parent.entity3.flatMap((e3) => e3.entity5)
)
const { performedActions: e5CreateActions } =
await entity5Manager.upsertWithReplace(allEntity5Data, {
relations: ["entity6"],
})
const e5SelectCalls = qbSelectSpy.mock.calls.length
const e5InsertCalls = qbInsertSpy.mock.calls.length
expect(e5SelectCalls).toBe(2) // One for Entity5, one for Entity6
expect(e5InsertCalls).toBe(2) // One batch insert for Entity5s, one for Entity6s
expect(qbInsertSpy.mock.calls[0][0]).toHaveLength(800) // entity5 25 * 8 * 4
expect(qbInsertSpy.mock.calls[1][0]).toHaveLength(2400) // entity6 25 * 8 * 4 * 3
findSpy.mockClear()
qbSelectSpy.mockClear()
qbInsertSpy.mockClear()
qbDeleteSpy.mockClear()
nativeUpdateManySpy.mockClear()
nativeDeleteSpy.mockClear()
// Second: Create Entity3 with Entity4 and Entity5 relations
const allEntity3Data = complexEntities.flatMap((parent) =>
parent.entity3.map((e3) => ({
...e3,
// Reference existing Entity5 by ID only
entity5: e3.entity5.map((e5) => ({ id: e5.id })),
}))
)
const { performedActions: e3CreateActions } =
await entity3Manager.upsertWithReplace(allEntity3Data, {
relations: ["entity4", "entity5"],
})
const e3SelectCalls = qbSelectSpy.mock.calls.length
const e3InsertCalls = qbInsertSpy.mock.calls.length
expect(e3SelectCalls).toBe(3) // One for Entity3, one for Entity5, One pivot entity3 -> entity5
expect(e3InsertCalls).toBe(3) // One batch insert for Entity3s, one for Entity4s and one pivot entity3 -> entity5
expect(qbInsertSpy.mock.calls[0][0]).toHaveLength(200) // entity3: 25 * 8
expect(qbInsertSpy.mock.calls[1][0]).toHaveLength(800) // pivot entity3 -> entity5: 25 * 8 * 4
expect(qbInsertSpy.mock.calls[2][0]).toHaveLength(1000) // entity4: 25 * 8 * 5
findSpy.mockClear()
qbSelectSpy.mockClear()
qbInsertSpy.mockClear()
qbDeleteSpy.mockClear()
nativeUpdateManySpy.mockClear()
nativeDeleteSpy.mockClear()
// Third: Create Entity1 with all relations
const mainEntitiesData = complexEntities.map((parent) => ({
...parent,
// Reference existing Entity3 by ID only
entity3: parent.entity3.map((e3) => ({ id: e3.id })),
}))
const { performedActions: mainCreateActions } =
await entity1Manager.upsertWithReplace(mainEntitiesData, {
relations: ["entity2", "entity3"],
})
const mainSelectCalls = qbSelectSpy.mock.calls.length
const mainInsertCalls = qbInsertSpy.mock.calls.length
expect(mainSelectCalls).toBe(3) // One for Entity1, one for Entity3, one for Entity2
expect(mainInsertCalls).toBe(3) // One batch insert for Entity1s, one for Entity2s, one for Entity3s
expect(qbInsertSpy.mock.calls[0][0]).toHaveLength(25) // entity1: 25
expect(qbInsertSpy.mock.calls[1][0]).toHaveLength(200) // entity3: 25 * 8
expect(qbInsertSpy.mock.calls[2][0]).toHaveLength(250) // entity2: 25 * 10
findSpy.mockClear()
qbSelectSpy.mockClear()
qbInsertSpy.mockClear()
qbDeleteSpy.mockClear()
nativeUpdateManySpy.mockClear()
nativeDeleteSpy.mockClear()
// Verify creation counts
expect(e5CreateActions.created[Entity5.name]).toHaveLength(
numParentEntities * numEntity3PerParent * numEntity5PerEntity3
)
expect(e5CreateActions.created[Entity6.name]).toHaveLength(
numParentEntities *
numEntity3PerParent *
numEntity5PerEntity3 *
numEntity6PerEntity5
)
expect(e3CreateActions.created[Entity3.name]).toHaveLength(
numParentEntities * numEntity3PerParent
)
expect(e3CreateActions.created[Entity4.name]).toHaveLength(
numParentEntities * numEntity3PerParent * numEntity4PerEntity3
)
expect(mainCreateActions.created[Entity1.name]).toHaveLength(
numParentEntities
)
expect(mainCreateActions.created[Entity2.name]).toHaveLength(
numParentEntities * numEntity2PerParent
)
// Now test complex updates
// Modify nested structures
const updatedComplexEntities = complexEntities.map((parent, i) => ({
...parent,
title: `Updated ${parent.title}`,
entity2: [
// Keep first 5 out of 10, update them
...parent.entity2.slice(0, 5).map((e2) => ({
...e2,
title: `Updated ${e2.title}`,
})),
// Add new ones
{
id: `new-e2-${i}-0`,
title: `New Entity2 0`,
handle: `new-handle-${i}-0`,
},
{
id: `new-e2-${i}-1`,
title: `New Entity2 1`,
handle: `new-handle-${i}-1`,
},
],
entity3: parent.entity3.slice(0, 4).map((e3) => ({ id: e3.id })), // Keep only first 4
}))
const { performedActions: updateActions } =
await entity1Manager.upsertWithReplace(updatedComplexEntities, {
relations: ["entity2", "entity3"],
})
// Validate batching for update operations
const updateSelectCalls = qbSelectSpy.mock.calls.length
const updateInsertCalls = qbInsertSpy.mock.calls.length
const updateManyCalls = nativeUpdateManySpy.mock.calls.length
expect(updateSelectCalls).toBe(3) // Entity1, Entity2, Entity3 existence checks
// Should use batch updates for existing entities
expect(updateManyCalls).toBe(3) // 1 for Entity1 (25), 2 for Entity2 (100+25, chunked due to 100 batch limit)
// Should use batch inserts for new entities and pivot relationships
expect(updateInsertCalls).toBe(2) // pivot Entity1 - Entity3 (with conflict resolution) + new Entity2s
expect(qbInsertSpy.mock.calls[0][0]).toHaveLength(100) // pivot Entity1 - Entity3: 25 parents × 4 entity3s each (uses onConflict().ignore())
expect(qbInsertSpy.mock.calls[1][0]).toHaveLength(50) // New Entity2s: 25 parents × 2 new each
// We wont check the deletion which happen through knex directly. It will be accounted for in
// the final state verification
// Validate that updates are batched correctly with chunking
expect(nativeUpdateManySpy.mock.calls[0][2]).toHaveLength(25) // Entity1: 25 entities
expect(nativeUpdateManySpy.mock.calls[1][2]).toHaveLength(100) // Entity2 chunk 1: 100 entities
expect(nativeUpdateManySpy.mock.calls[2][2]).toHaveLength(25) // Entity2 chunk 2: 25 entities
// Verify updates
expect(updateActions.updated[Entity1.name]).toHaveLength(
numParentEntities
)
expect(updateActions.updated[Entity2.name]).toHaveLength(
numParentEntities * 5
) // 5 updated per parent
expect(updateActions.created[Entity2.name]).toHaveLength(
numParentEntities * 2
) // 2 new per parent
expect(updateActions.deleted[Entity2.name]).toHaveLength(
numParentEntities * 5
) // 5 deleted per parent (kept first 5 out of 10, so 5 deleted)
// Verify order preservation
updateActions.updated[Entity1.name].forEach((entity, index) => {
expect(entity.id).toBe(`parent-${index.toString().padStart(3, "0")}`)
})
// Verify final state
const finalEntities = await manager1().find({
where: {},
options: {
populate: ["entity2", "entity3"],
orderBy: { id: "ASC" } as any,
},
})
expect(finalEntities).toHaveLength(numParentEntities)
finalEntities.forEach((entity, i) => {
expect(entity.title).toBe(`Updated Parent Entity ${i}`)
expect(entity.entity2).toHaveLength(7) // 5 updated + 2 new
expect(entity.entity3).toHaveLength(4) // Only first 4 kept
})
// Verify nested relationships still exist
const sampleEntity3 = await manager3().find({
where: { id: "e3-0-0" },
options: {
populate: ["entity4", "entity5"],
},
})
expect(sampleEntity3).toHaveLength(1)
expect(sampleEntity3[0].entity4).toHaveLength(numEntity4PerEntity3)
expect(sampleEntity3[0].entity5).toHaveLength(numEntity5PerEntity3)
// Verify deeply nested Entity6 relationships
const sampleEntity5 = await manager5().find({
where: { id: "e5-0-0-0" },
options: {
populate: ["entity6"],
},
})
expect(sampleEntity5).toHaveLength(1)
expect(sampleEntity5[0].entity6).toHaveLength(numEntity6PerEntity5)
})
})
})

View File

@@ -96,6 +96,9 @@ export async function mikroOrmCreateConnection(
driverOptions,
tsNode: process.env.APP_ENV === "development",
filters: database.filters ?? {},
useBatchInserts: true,
useBatchUpdates: true,
batchSize: 100,
assign: {
convertCustomTypes: true,
},

View File

@@ -490,9 +490,14 @@ export function mikroOrmBaseRepositoryFactory<const T extends object>(
const findOptions_ = { ...findOptions }
findOptions_.options ??= {}
Object.assign(findOptions_.options, {
strategy: LoadStrategy.SELECT_IN,
})
if (!("strategy" in findOptions_.options)) {
if (findOptions_.options.limit != null || findOptions_.options.offset) {
// TODO: from 7+ it will be the default strategy
Object.assign(findOptions_.options, {
strategy: LoadStrategy.BALANCED,
})
}
}
MikroOrmBaseRepository.compensateRelationFieldsSelectionFromLoadStrategy({
findOptions: findOptions_,
@@ -535,7 +540,7 @@ export function mikroOrmBaseRepositoryFactory<const T extends object>(
let allEntities: InferRepositoryReturnType<T>[][] = []
if (primaryKeysCriteria.length) {
allEntities = await Promise.all(
allEntities = await promiseAll(
primaryKeysCriteria.map(
async (criteria) =>
await this.find(
@@ -683,46 +688,65 @@ export function mikroOrmBaseRepositoryFactory<const T extends object>(
this.mergePerformedActions(performedActions, performedActions_)
await promiseAll(
upsertedTopLevelEntities
const relationProcessingPromises: Promise<void>[] = []
// Group relations by type
const relationsByType = new Map<
string,
{ relation: EntityProperty; entities: any[] }
>()
config.relations?.forEach((relationName) => {
const relation = allRelations?.find((r) => r.name === relationName)
if (!relation) return
if (
relation.kind === ReferenceKind.ONE_TO_ONE ||
relation.kind === ReferenceKind.MANY_TO_ONE
) {
return
}
const entitiesForRelation = upsertedTopLevelEntities
.map((entityEntry, i) => {
const originalEntry = originalDataMap.get((entityEntry as any).id)!
const reconstructedEntry = reconstructedResponse[i]
return allRelations?.map(async (relation) => {
const relationName = relation.name as keyof T
if (!config.relations?.includes(relationName)) {
return
}
// TODO: Handle ONE_TO_ONE
// One to one and Many to one are handled outside of the assignment as they need to happen before the main entity is created
if (
relation.kind === ReferenceKind.ONE_TO_ONE ||
relation.kind === ReferenceKind.MANY_TO_ONE
) {
return
}
const { entities, performedActions: performedActions_ } =
await this.assignCollectionRelation_(
manager,
{ ...originalEntry, id: (entityEntry as any).id },
relation
)
this.mergePerformedActions(performedActions, performedActions_)
reconstructedEntry[relationName] = entities
return
})
return {
entity: { ...originalEntry, id: (entityEntry as any).id },
reconstructedEntry: reconstructedResponse[i],
index: i,
}
})
.flat()
)
.filter((item) => item.entity[relationName as string] !== undefined)
// // We want to populate the identity map with the data that was written to the DB, and return an entity object
// return reconstructedResponse.map((r) =>
// manager.create(entity, r, { persist: false })
// )
if (entitiesForRelation.length > 0) {
relationsByType.set(relationName as string, {
relation,
entities: entitiesForRelation,
})
}
})
for (const [relationName, { relation, entities }] of relationsByType) {
relationProcessingPromises.push(
this.assignCollectionRelationBatch_(manager, entities, relation).then(
({ performedActions: batchPerformedActions, entitiesResults }) => {
this.mergePerformedActions(
performedActions,
batchPerformedActions
)
// Update reconstructed response with results
entitiesResults.forEach(
({ entities: relationEntities, index }) => {
reconstructedResponse[index][relationName] = relationEntities
}
)
}
)
)
}
await promiseAll(relationProcessingPromises)
return { entities: reconstructedResponse, performedActions }
}
@@ -741,7 +765,226 @@ export function mikroOrmBaseRepositoryFactory<const T extends object>(
})
}
// FUTURE: We can make this performant by only aggregating the operations, but only executing them at the end.
/**
* Batch processing for multiple entities with same relation
*/
protected async assignCollectionRelationBatch_(
manager: SqlEntityManager,
entitiesData: Array<{ entity: T; index: number }>,
relation: EntityProperty
): Promise<{
performedActions: PerformedActions
entitiesResults: Array<{ entities: any[]; index: number }>
}> {
const performedActions: PerformedActions = {
created: {},
updated: {},
deleted: {},
}
const entitiesResults: Array<{ entities: any[]; index: number }> = []
if (relation.kind === ReferenceKind.MANY_TO_MANY) {
await this.assignManyToManyRelationBatch_(
manager,
entitiesData,
relation,
performedActions,
entitiesResults
)
} else if (relation.kind === ReferenceKind.ONE_TO_MANY) {
await this.assignOneToManyRelationBatch_(
manager,
entitiesData,
relation,
performedActions,
entitiesResults
)
} else {
// For other relation types, fall back to individual processing
await promiseAll(
entitiesData.map(async ({ entity, index }) => {
const { entities, performedActions: individualActions } =
await this.assignCollectionRelation_(manager, entity, relation)
this.mergePerformedActions(performedActions, individualActions)
entitiesResults.push({ entities, index })
})
)
}
return { performedActions, entitiesResults }
}
/**
* Batch processing for many-to-many relations
*/
private async assignManyToManyRelationBatch_(
manager: SqlEntityManager,
entitiesData: Array<{ entity: T; index: number }>,
relation: EntityProperty,
performedActions: PerformedActions,
entitiesResults: Array<{ entities: any[]; index: number }>
): Promise<void> {
const currentPivotColumn = relation.inverseJoinColumns[0]
const parentPivotColumn = relation.joinColumns[0]
// Collect all relation data and normalize it
const allNormalizedData: any[] = []
const entityRelationMap = new Map<string, any[]>()
const entitiesToDeletePivots: string[] = []
for (const { entity, index } of entitiesData) {
const dataForRelation = entity[relation.name]
if (dataForRelation === undefined) {
entitiesResults.push({ entities: [], index })
continue
}
if (!dataForRelation.length) {
entitiesToDeletePivots.push((entity as any).id)
entitiesResults.push({ entities: [], index })
continue
}
const normalizedData = dataForRelation.map((item: any) =>
this.getEntityWithId(manager, relation.type, item)
)
allNormalizedData.push(...normalizedData)
entityRelationMap.set((entity as any).id, normalizedData)
entitiesResults.push({ entities: normalizedData, index })
}
// Batch delete empty pivot relations
if (entitiesToDeletePivots.length) {
await manager.nativeDelete(relation.pivotEntity, {
[parentPivotColumn]: { $in: entitiesToDeletePivots },
})
}
if (allNormalizedData.length) {
const { performedActions: upsertActions } = await this.upsertMany_(
manager,
relation.type,
allNormalizedData,
true
)
this.mergePerformedActions(performedActions, upsertActions)
// Collect all pivot data for batch operations
const allPivotData: any[] = []
const allParentIds: string[] = []
for (const [parentId, normalizedData] of entityRelationMap) {
allParentIds.push(parentId)
const pivotData = normalizedData.map((currModel) => ({
[parentPivotColumn]: parentId,
[currentPivotColumn]: currModel.id,
}))
allPivotData.push(...pivotData)
}
// Batch insert and delete pivot table entries
await promiseAll([
manager
.qb(relation.pivotEntity)
.insert(allPivotData)
.onConflict()
.ignore()
.execute(),
manager.nativeDelete(relation.pivotEntity, {
[parentPivotColumn]: { $in: allParentIds },
[currentPivotColumn]: {
$nin: allPivotData.map((d) => d[currentPivotColumn]),
},
}),
])
}
}
/**
* Batch processing for one-to-many relations
*/
private async assignOneToManyRelationBatch_(
manager: SqlEntityManager,
entitiesData: Array<{ entity: T; index: number }>,
relation: EntityProperty,
performedActions: PerformedActions,
entitiesResults: Array<{ entities: any[]; index: number }>
): Promise<void> {
const joinColumns =
relation.targetMeta?.properties[relation.mappedBy]?.joinColumns
// Collect all relation data and constraints
const allNormalizedData: any[] = []
const allJoinConstraints: any[] = []
const allEntityIds: string[] = []
for (const { entity, index } of entitiesData) {
const dataForRelation = entity[relation.name]
if (dataForRelation === undefined) {
entitiesResults.push({ entities: [], index })
continue
}
const joinColumnsConstraints = {}
joinColumns?.forEach((joinColumn, index) => {
const referencedColumnName = relation.referencedColumnNames[index]
joinColumnsConstraints[joinColumn] = entity[referencedColumnName]
})
const normalizedData = dataForRelation.map((item: any) => {
const normalized = this.getEntityWithId(manager, relation.type, item)
return { ...normalized, ...joinColumnsConstraints }
})
allNormalizedData.push(...normalizedData)
allJoinConstraints.push(joinColumnsConstraints)
allEntityIds.push(...normalizedData.map((d: any) => d.id))
entitiesResults.push({ entities: normalizedData, index })
}
// Batch delete orphaned relations
if (allJoinConstraints.length) {
const deletedRelations = await (
manager.getTransactionContext() ?? manager.getKnex()
)
.queryBuilder()
.from(relation.targetMeta!.collection)
.delete()
.where((builder) => {
allJoinConstraints.forEach((constraints, index) => {
if (index === 0) {
builder.where(constraints)
} else {
builder.orWhere(constraints)
}
})
})
.whereNotIn("id", allEntityIds)
.returning("id")
if (deletedRelations.length) {
performedActions.deleted[relation.type] ??= []
performedActions.deleted[relation.type].push(
...deletedRelations.map((row) => ({ id: row.id }))
)
}
}
// Batch upsert all relation entities
if (allNormalizedData.length) {
const { performedActions: upsertActions } = await this.upsertMany_(
manager,
relation.type,
allNormalizedData
)
this.mergePerformedActions(performedActions, upsertActions)
}
}
protected async assignCollectionRelation_(
manager: SqlEntityManager,
data: T,
@@ -943,41 +1186,68 @@ export function mikroOrmBaseRepositoryFactory<const T extends object>(
entries: any[],
skipUpdate: boolean = false
): Promise<{ orderedEntities: any[]; performedActions: PerformedActions }> {
const selectQb = manager.qb(entityName)
const existingEntities: any[] = await selectQb.select("*").where({
id: { $in: entries.map((d) => d.id) },
if (!entries.length) {
return {
orderedEntities: [],
performedActions: { created: {}, updated: {}, deleted: {} },
}
}
const uniqueEntriesMap = new Map<string, any>()
const orderedUniqueEntries: any[] = []
entries.forEach((entry) => {
if (!uniqueEntriesMap.has(entry.id)) {
uniqueEntriesMap.set(entry.id, entry)
orderedUniqueEntries.push(entry)
}
})
const existingEntitiesMap = new Map(
existingEntities.map((e) => [e.id, e])
)
const existingEntitiesMap = new Map()
if (orderedUniqueEntries.some((e) => e.id)) {
const existingEntities: any[] = await manager
.qb(entityName)
.select("id")
.where({
id: { $in: orderedUniqueEntries.map((d) => d.id).filter(Boolean) },
})
existingEntities.forEach((e) => {
existingEntitiesMap.set(e.id, e)
})
}
const orderedEntities: T[] = []
const performedActions = {
created: {},
updated: {},
deleted: {},
}
const promises: Promise<any>[] = []
const toInsert: any[] = []
const toUpdate: any[] = []
const insertOrderMap = new Map<string, number>()
const updateOrderMap = new Map<string, number>()
entries.forEach((data) => {
// Single pass to categorize operations while preserving order
orderedUniqueEntries.forEach((data, index) => {
const existingEntity = existingEntitiesMap.get(data.id)
orderedEntities.push(data)
if (existingEntity) {
if (skipUpdate) {
return
}
toUpdate.push(data)
if (existingEntity) {
if (!skipUpdate) {
toUpdate.push(data)
updateOrderMap.set(data.id, index)
}
} else {
toInsert.push(data)
insertOrderMap.set(data.id, index)
}
})
const promises: Promise<any>[] = []
if (toInsert.length > 0) {
let insertQb = manager.qb(entityName).insert(toInsert).returning("id")
@@ -988,32 +1258,63 @@ export function mikroOrmBaseRepositoryFactory<const T extends object>(
promises.push(
insertQb.execute("all", true).then((res: { id: string }[]) => {
performedActions.created[entityName] ??= []
performedActions.created[entityName].push(
...res.map((data) => ({ id: data.id }))
)
// Sort created entities by their original insertion order
const sortedCreated = res
.map((data) => ({
...data,
order: insertOrderMap.get(data.id) ?? Number.MAX_SAFE_INTEGER,
}))
.sort((a, b) => a.order - b.order)
.map(({ order, ...data }) => data)
performedActions.created[entityName].push(...sortedCreated)
})
)
}
if (toUpdate.length > 0) {
promises.push(
manager
.getDriver()
.nativeUpdateMany(
entityName,
toUpdate.map((d) => ({ id: d.id })),
toUpdate,
{ ctx: manager.getTransactionContext() }
)
.then((res) => {
const updatedRows = res.rows ?? []
const updatedRowsMap = new Map(updatedRows.map((d) => [d.id, d]))
// Use batch update but maintain order
const batchSize = 100 // Process in chunks to avoid query size limits
const updatePromises: Promise<any>[] = []
const allUpdatedEntities: Array<{ id: string; order: number }> = []
performedActions.updated[entityName] = toUpdate
.map((d) => updatedRowsMap.get(d.id))
.filter((row) => row !== undefined)
.map((d) => ({ id: d.id }))
})
for (let i = 0; i < toUpdate.length; i += batchSize) {
const chunk = toUpdate.slice(i, i + batchSize)
updatePromises.push(
manager
.getDriver()
.nativeUpdateMany(
entityName,
chunk.map((d) => ({ id: d.id })),
chunk,
{ ctx: manager.getTransactionContext() }
)
.then((res) => {
const updatedRows = res.rows ?? []
// Add order information for sorting later
const orderedUpdated = updatedRows.map((d: any) => ({
id: d.id,
order: updateOrderMap.get(d.id) ?? Number.MAX_SAFE_INTEGER,
}))
allUpdatedEntities.push(...orderedUpdated)
})
)
}
promises.push(
promiseAll(updatePromises).then(() => {
// Sort all updated entities by their original order and add to performedActions
performedActions.updated[entityName] ??= []
const sortedUpdated = allUpdatedEntities
.sort((a, b) => a.order - b.order)
.map(({ order, ...data }) => data)
performedActions.updated[entityName].push(...sortedUpdated)
})
)
}

View File

@@ -85,7 +85,7 @@ describe("EntityBuilder", () => {
id: user1.id,
})
expect(await mikroOrmSerializer(user)).toEqual({
expect(mikroOrmSerializer(user)).toEqual({
id: user1.id,
username: "User 1",
points: 0,
@@ -96,7 +96,8 @@ describe("EntityBuilder", () => {
})
})
it("set the points to null when explicitly set to null", async () => {
// TODO: Remove skip after upgrade the latest MikroORM version once https://github.com/mikro-orm/mikro-orm/pull/6880 is merged
it.skip("set the points to null when explicitly set to null", async () => {
let manager = orm.em.fork()
const user1 = manager.create(User, {
@@ -112,7 +113,7 @@ describe("EntityBuilder", () => {
id: user1.id,
})
expect(await mikroOrmSerializer(user)).toEqual({
expect(mikroOrmSerializer(user)).toEqual({
id: user1.id,
username: "User 1",
points: null,
@@ -140,7 +141,7 @@ describe("EntityBuilder", () => {
const user = await manager.findOne(User, {
id: user1.id,
})
expect(await mikroOrmSerializer(user)).toEqual({
expect(mikroOrmSerializer(user)).toEqual({
id: user1.id,
username: "User 1",
points: null,
@@ -167,7 +168,7 @@ describe("EntityBuilder", () => {
id: user1.id,
})
expect(await mikroOrmSerializer(user)).toEqual({
expect(mikroOrmSerializer(user)).toEqual({
id: user1.id,
username: "User 1",
points: 0,

View File

@@ -33,11 +33,11 @@
"build": "rimraf dist && tsc --build"
},
"dependencies": {
"@mikro-orm/cli": "6.5.4",
"@mikro-orm/core": "6.5.4",
"@mikro-orm/knex": "6.5.4",
"@mikro-orm/migrations": "6.5.4",
"@mikro-orm/postgresql": "6.5.4",
"@mikro-orm/cli": "6.5.5",
"@mikro-orm/core": "6.5.5",
"@mikro-orm/knex": "6.5.5",
"@mikro-orm/migrations": "6.5.5",
"@mikro-orm/postgresql": "6.5.5",
"awilix": "^8.0.1",
"pg": "^8.16.3"
},

View File

@@ -1,3 +1,4 @@
import { BeforeCreate, OnInit } from "@medusajs/framework/mikro-orm/core"
import {
BigNumberInput,
Context,
@@ -43,7 +44,6 @@ import {
toMikroORMEntity,
transformPropertiesToBigNumber,
} from "@medusajs/framework/utils"
import { BeforeCreate, OnInit, rel } from "@medusajs/framework/mikro-orm/core"
import {
Order,
OrderAddress,
@@ -145,13 +145,19 @@ const generateMethodForModels = {
{
const MikroORMEntity = toMikroORMEntity(OrderChangeAction)
MikroORMEntity.prototype["onInit_OrderChangeAction"] = function () {
this.version ??= this.order_change?.version ?? null
if (this.order_change) {
this.version ??= this.order_change.version ?? null
this.order_id ??= this.order_change?.order_id ?? null
this.claim_id ??= this.order_change?.claim_id ?? null
this.exchange_id ??= this.order_change?.exchange_id ?? null
this.order_id ??= this.order_change.order_id ?? null
this.claim_id ??= this.order_change.claim_id ?? null
this.exchange_id ??= this.order_change.exchange_id ?? null
}
if (!this.claim_id && !this.exchange_id) {
if (
!this.claim_id &&
!this.exchange_id &&
(this.return || this.order_change)
) {
this.return_id = this.return?.id ?? this.order_change?.return_id ?? null
}
}
@@ -161,19 +167,9 @@ const generateMethodForModels = {
{
const MikroORMEntity = toMikroORMEntity(OrderShipping)
MikroORMEntity.prototype["onInit_OrderShipping"] = function () {
this.version ??= this.order?.version ?? null
this.order ??= rel(toMikroORMEntity(Order), this.order?.id ?? null)
this.return ??= rel(toMikroORMEntity(Return), this.return?.id ?? null)
this.claim ??= rel(toMikroORMEntity(OrderClaim), this.claim?.id ?? null)
this.exchange ??= rel(
toMikroORMEntity(OrderExchange),
this.exchange?.id ?? null
)
this.shipping_method ??= rel(
toMikroORMEntity(OrderShippingMethod),
this.shipping_method?.id ?? null
)
if (this.order) {
this.version ??= this.order.version ?? null
}
}
OnInit()(MikroORMEntity.prototype, "onInit_OrderShipping")
BeforeCreate()(MikroORMEntity.prototype, "onInit_OrderShipping")
@@ -181,10 +177,9 @@ const generateMethodForModels = {
{
const MikroORMEntity = toMikroORMEntity(OrderItem)
MikroORMEntity.prototype["onInit_OrderItem"] = function () {
this.version ??= this.order?.version ?? null
this.order ??= rel(toMikroORMEntity(Order), this.order?.id ?? null)
this.item ??= rel(toMikroORMEntity(OrderLineItem), this.item?.id ?? null)
if (this.order) {
this.version ??= this.order.version ?? null
}
}
OnInit()(MikroORMEntity.prototype, "onInit_OrderItem")
BeforeCreate()(MikroORMEntity.prototype, "onInit_OrderItem")

View File

@@ -6391,11 +6391,11 @@ __metadata:
version: 0.0.0-use.local
resolution: "@medusajs/deps@workspace:packages/deps"
dependencies:
"@mikro-orm/cli": 6.5.4
"@mikro-orm/core": 6.5.4
"@mikro-orm/knex": 6.5.4
"@mikro-orm/migrations": 6.5.4
"@mikro-orm/postgresql": 6.5.4
"@mikro-orm/cli": 6.5.5
"@mikro-orm/core": 6.5.5
"@mikro-orm/knex": 6.5.5
"@mikro-orm/migrations": 6.5.5
"@mikro-orm/postgresql": 6.5.5
"@swc/core": ^1.7.28
awilix: ^8.0.1
pg: ^8.16.3
@@ -7543,41 +7543,41 @@ __metadata:
languageName: unknown
linkType: soft
"@mikro-orm/cli@npm:6.5.4":
version: 6.5.4
resolution: "@mikro-orm/cli@npm:6.5.4"
"@mikro-orm/cli@npm:6.5.5":
version: 6.5.5
resolution: "@mikro-orm/cli@npm:6.5.5"
dependencies:
"@jercle/yargonaut": 1.1.5
"@mikro-orm/core": 6.5.4
"@mikro-orm/knex": 6.5.4
"@mikro-orm/core": 6.5.5
"@mikro-orm/knex": 6.5.5
fs-extra: 11.3.2
tsconfig-paths: 4.2.0
yargs: 17.7.2
bin:
mikro-orm: ./cli
mikro-orm-esm: ./esm
checksum: 828694dcb8a1c5b8c12e0e0610032bd772f085f45ffa5fc13be5c87c1f28f13539b56204151588af0ab6656cba1dce3fb20966ac03d50815f315ba66bc240acb
checksum: 064986fd89bd893bd1025ea4898699983d424d540fcea48cecbe5bc224d37fc29e969f62f79d535a3bf7024d415a4ca0a4bd5f454d2b27ea370b0d02b364599d
languageName: node
linkType: hard
"@mikro-orm/core@npm:6.5.4":
version: 6.5.4
resolution: "@mikro-orm/core@npm:6.5.4"
"@mikro-orm/core@npm:6.5.5":
version: 6.5.5
resolution: "@mikro-orm/core@npm:6.5.5"
dependencies:
dataloader: 2.2.3
dotenv: 17.2.2
esprima: 4.0.1
fs-extra: 11.3.2
globby: 11.1.0
mikro-orm: 6.5.4
mikro-orm: 6.5.5
reflect-metadata: 0.2.2
checksum: 20058d8b02afbb73ec850451bf1a388c3e135e99045bab9d268a6375617140d9b4adddcc9064564ee2e55a8d56fd9112ad56349745daf372373fefb6115f18cb
checksum: 2ffe944b2e5f288aab10173789dbb5f96954be307d5d5f313856859c809982ecd9f521ea68e151772a880861b902713fff2637f80303b8ce7025db181c392de2
languageName: node
linkType: hard
"@mikro-orm/knex@npm:6.5.4":
version: 6.5.4
resolution: "@mikro-orm/knex@npm:6.5.4"
"@mikro-orm/knex@npm:6.5.5":
version: 6.5.5
resolution: "@mikro-orm/knex@npm:6.5.5"
dependencies:
fs-extra: 11.3.2
knex: 3.1.0
@@ -7594,35 +7594,35 @@ __metadata:
optional: true
mariadb:
optional: true
checksum: e8effea42422ceefec2ce50b74c493a5919e702f39f3167cf3f431f73d9a27d0f77800e20f961302257ec92afb5010068b35cb9632a114e086cf78306fd290b4
checksum: 0d5b920f2181cd4f1921f2624e07f6401b1d0c434125d7c7db2bab81514f8513fbb2d498292784db2e1fce4cf4a0642b817ae417ae0783081b2c0f7b02437257
languageName: node
linkType: hard
"@mikro-orm/migrations@npm:6.5.4":
version: 6.5.4
resolution: "@mikro-orm/migrations@npm:6.5.4"
"@mikro-orm/migrations@npm:6.5.5":
version: 6.5.5
resolution: "@mikro-orm/migrations@npm:6.5.5"
dependencies:
"@mikro-orm/knex": 6.5.4
"@mikro-orm/knex": 6.5.5
fs-extra: 11.3.2
umzug: 3.8.2
peerDependencies:
"@mikro-orm/core": ^6.0.0
checksum: 345aea93fb5652f8c78c057d89e55fc2b8f106fad65ac44ee7d39c1f85b3ae402c35dacba0cf2cc5ace1a0abb2074a1de1b45c55e79bbaee2217744df32542bc
checksum: 4441d3505575cffc272666e8a9b85332335163275ce997c7ece36ba10e986d5392e5643a7e3262a2185658b187082d9c549fb45ec4553d8336f949cfc13ff27d
languageName: node
linkType: hard
"@mikro-orm/postgresql@npm:6.5.4":
version: 6.5.4
resolution: "@mikro-orm/postgresql@npm:6.5.4"
"@mikro-orm/postgresql@npm:6.5.5":
version: 6.5.5
resolution: "@mikro-orm/postgresql@npm:6.5.5"
dependencies:
"@mikro-orm/knex": 6.5.4
"@mikro-orm/knex": 6.5.5
pg: 8.16.3
postgres-array: 3.0.4
postgres-date: 2.1.0
postgres-interval: 4.0.2
peerDependencies:
"@mikro-orm/core": ^6.0.0
checksum: a5102550998041ab6b91c528a8a8a69d06f1d851194679c0b27ff54d433798089417555b44e2371caab056eaa84e8865bd4dbd5eecacb04f1c0ae8304aa5a2df
checksum: 72c63aee694dd4606a726c69f504c9680bf700f2954e45d52bba93dd68ab96d71414ec63be16b444d6a151794e56da4c010b724f5a4b3299c646c582d1b617b9
languageName: node
linkType: hard
@@ -27107,10 +27107,10 @@ __metadata:
languageName: node
linkType: hard
"mikro-orm@npm:6.5.4":
version: 6.5.4
resolution: "mikro-orm@npm:6.5.4"
checksum: f2ee9bc9598d88ca086f34d26283355ddd48b1a2095ce90d0c59b8d629309d28759b327dd2dedbea4c72c3bcb189ba37d8a24013edce8402a711e640b138c6d4
"mikro-orm@npm:6.5.5":
version: 6.5.5
resolution: "mikro-orm@npm:6.5.5"
checksum: eca6afcac7deeced740c650728ec86b80a627b7631c813eb1076eef8b1618e9aff0adc82f0e7ba57f6d9685bb71058d648ecd5b52fcfe0c32fb0cc0483fded08
languageName: node
linkType: hard