From ea402875a574bc0feca7b1a3a15037f79fca90b9 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Mon, 27 Jan 2025 14:56:12 +0100 Subject: [PATCH] Feat/index sync data (#11169) **what** Synchronisation process implementation for configured entity to be indexed --- .../remote-query-object-from-string.ts | 2 +- .../__tests__/data-synchronizer.spec.ts | 486 ++++++++++++++++++ .../migrations/.snapshot-medusa-index.json | 24 +- .../src/migrations/Migration20250127105159.ts | 21 + .../index/src/models/index-relation.ts | 1 + .../index/src/services/postgres-provider.ts | 47 +- .../index/src/utils/sync/data-synchronizer.ts | 137 +++++ 7 files changed, 685 insertions(+), 33 deletions(-) create mode 100644 packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts create mode 100644 packages/modules/index/src/migrations/Migration20250127105159.ts create mode 100644 packages/modules/index/src/utils/sync/data-synchronizer.ts diff --git a/packages/core/types/src/modules-sdk/remote-query-object-from-string.ts b/packages/core/types/src/modules-sdk/remote-query-object-from-string.ts index d91279133a..186dbe009d 100644 --- a/packages/core/types/src/modules-sdk/remote-query-object-from-string.ts +++ b/packages/core/types/src/modules-sdk/remote-query-object-from-string.ts @@ -50,7 +50,7 @@ export type RemoteQueryInput = { /** * The number of items to skip before retrieving the returned items. */ - skip: number + skip?: number /** * The maximum number of items to return. */ diff --git a/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts b/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts new file mode 100644 index 0000000000..c7669bdec1 --- /dev/null +++ b/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts @@ -0,0 +1,486 @@ +import { + configLoader, + container, + logger, + MedusaAppLoader, +} from "@medusajs/framework" +import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk" +import { IndexTypes, InferEntityType } from "@medusajs/framework/types" +import { + ContainerRegistrationKeys, + ModuleRegistrationName, + Modules, + toMikroORMEntity, +} from "@medusajs/framework/utils" +import { initDb, TestDatabaseUtils } from "@medusajs/test-utils" +import { asValue } from "awilix" +import * as path from "path" +import { DataSynchronizer } from "../../src/utils/sync/data-synchronizer" +import { EventBusServiceMock } from "../__fixtures__" +import { dbName } from "../__fixtures__/medusa-config" +import { EntityManager } from "@mikro-orm/postgresql" +import { IndexData, IndexRelation } from "@models" + +const eventBusMock = new EventBusServiceMock() +const queryMock = { + graph: jest.fn(), +} + +const dbUtils = TestDatabaseUtils.dbTestUtilFactory() + +jest.setTimeout(30000) + +const testProductId = "test_prod_1" +const testProductId2 = "test_prod_2" +const testVariantId = "test_var_1" +const testVariantId2 = "test_var_2" + +const mockData = [ + { + id: testProductId, + title: "Test Product", + updated_at: new Date(), + }, + { + id: testProductId2, + title: "Test Product", + updated_at: new Date(), + }, + { + id: testVariantId, + title: "Test Variant", + product: { + id: testProductId, + }, + updated_at: new Date(), + }, + { + id: testVariantId2, + title: "Test Variant 2", + product: { + id: testProductId2, + }, + updated_at: new Date(), + }, +] + +let medusaAppLoader!: MedusaAppLoader +let index!: IndexTypes.IIndexService + +const beforeAll_ = async () => { + try { + await configLoader( + path.join(__dirname, "./../__fixtures__"), + "medusa-config" + ) + + console.log(`Creating database ${dbName}`) + await dbUtils.create(dbName) + dbUtils.pgConnection_ = await initDb() + + container.register({ + [ContainerRegistrationKeys.LOGGER]: asValue(logger), + [ContainerRegistrationKeys.QUERY]: asValue(null), + [ContainerRegistrationKeys.PG_CONNECTION]: asValue(dbUtils.pgConnection_), + }) + + medusaAppLoader = new MedusaAppLoader(container as any) + + // Migrations + await medusaAppLoader.runModulesMigrations() + const linkPlanner = await medusaAppLoader.getLinksExecutionPlanner() + const plan = await linkPlanner.createPlan() + await linkPlanner.executePlan(plan) + + // Clear partially loaded instances + MedusaModule.clearInstances() + + // Bootstrap modules + const globalApp = await medusaAppLoader.load() + + index = container.resolve(Modules.INDEX) + + // Mock event bus the index module + ;(index as any).eventBusModuleService_ = eventBusMock + + await globalApp.onApplicationStart() + ;(index as any).storageProvider_.query_ = queryMock + + return globalApp + } catch (error) { + console.error("Error initializing", error?.message) + throw error + } +} + +describe("DataSynchronizer", () => { + let index: IndexTypes.IIndexService + let dataSynchronizer: DataSynchronizer + let medusaApp: MedusaAppOutput + let onApplicationPrepareShutdown!: () => Promise + let onApplicationShutdown!: () => Promise + let manager: EntityManager + + beforeAll(async () => { + medusaApp = await beforeAll_() + onApplicationPrepareShutdown = medusaApp.onApplicationPrepareShutdown + onApplicationShutdown = medusaApp.onApplicationShutdown + manager = ( + medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) as any + ).container_.manager as EntityManager + }) + + afterAll(async () => { + await onApplicationPrepareShutdown() + await onApplicationShutdown() + await dbUtils.shutdown(dbName) + }) + + beforeEach(async () => { + jest.clearAllMocks() + index = container.resolve(Modules.INDEX) + + const productSchemaObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation = + { + fields: ["id", "title", "updated_at"], + alias: "product", + moduleConfig: { + linkableKeys: { + id: "Product", + product_id: "Product", + product_variant_id: "ProductVariant", + }, + }, + entity: "Product", + parents: [], + listeners: ["product.created"], + } + + const productVariantSchemaObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation = + { + fields: ["id", "title", "product.id", "updated_at"], + alias: "product_variant", + moduleConfig: { + linkableKeys: { + id: "ProductVariant", + product_id: "Product", + product_variant_id: "ProductVariant", + }, + }, + entity: "ProductVariant", + parents: [ + { + ref: productSchemaObjectRepresentation, + inSchemaRef: productSchemaObjectRepresentation, + targetProp: "id", + }, + ], + listeners: ["product-variant.created"], + } + + const mockSchemaRepresentation = { + product: productSchemaObjectRepresentation, + product_variant: productVariantSchemaObjectRepresentation, + } + + dataSynchronizer = new DataSynchronizer({ + storageProvider: (index as any).storageProvider_, + schemaObjectRepresentation: mockSchemaRepresentation, + query: queryMock as any, + }) + }) + + describe("sync", () => { + it("should sync products data correctly", async () => { + // Mock query response for products + queryMock.graph.mockImplementation(async (config) => { + if (Array.isArray(config.filters.id)) { + if (config.filters.id.includes(testProductId)) { + return { + data: [mockData[0]], + } + } else if (config.filters.id.includes(testProductId2)) { + return { + data: [mockData[1]], + } + } + } + + if (Object.keys(config.filters).length === 0) { + return { + data: [mockData[0]], + } + } else if (config.filters.id["$gt"] === mockData[0].id) { + return { + data: [mockData[1]], + } + } + + return { + data: [], + } + }) + + const ackMock = jest.fn() + + const result = await dataSynchronizer.sync({ + entityName: "product", + ack: ackMock, + }) + + // First loop fetching products + expect(queryMock.graph).toHaveBeenNthCalledWith(1, { + entity: "product", + fields: ["id"], + filters: {}, + pagination: { + order: { + id: "asc", + }, + take: 1000, + }, + }) + + // First time fetching product data for creation from the storage provider + expect(queryMock.graph).toHaveBeenNthCalledWith(2, { + entity: "product", + filters: { + id: [testProductId], + }, + fields: ["id", "title", "updated_at"], + }) + + // Second loop fetching products + expect(queryMock.graph).toHaveBeenNthCalledWith(3, { + entity: "product", + fields: ["id"], + filters: { + id: { + $gt: testProductId, + }, + }, + pagination: { + order: { + id: "asc", + }, + take: 1000, + }, + }) + + // Second time fetching product data for creation from the storage provider + expect(queryMock.graph).toHaveBeenNthCalledWith(4, { + entity: "product", + filters: { + id: [testProductId2], + }, + fields: ["id", "title", "updated_at"], + }) + + expect(ackMock).toHaveBeenNthCalledWith(1, { + lastCursor: testProductId, + }) + + expect(ackMock).toHaveBeenNthCalledWith(2, { + lastCursor: testProductId2, + }) + + expect(ackMock).toHaveBeenNthCalledWith(3, { + lastCursor: testProductId2, + done: true, + }) + + expect(result).toEqual({ + lastCursor: testProductId2, + done: true, + }) + + const indexData = await manager.find>( + toMikroORMEntity(IndexData), + {} + ) + const indexRelationData = await manager.find( + toMikroORMEntity(IndexRelation), + {} + ) + + expect(indexData).toHaveLength(2) + expect(indexData[0].id).toEqual(testProductId) + expect(indexData[1].id).toEqual(testProductId2) + + expect(indexRelationData).toHaveLength(0) + }) + }) + + it("should sync products and product variants data correctly", async () => { + // Mock query response for products + queryMock.graph.mockImplementation(async (config) => { + if (config.entity === "product") { + if (Array.isArray(config.filters.id)) { + if (config.filters.id.includes(testProductId)) { + return { + data: [mockData[0]], + } + } else if (config.filters.id.includes(testProductId2)) { + return { + data: [mockData[1]], + } + } + } + + if (Object.keys(config.filters).length === 0) { + return { + data: [mockData[0]], + } + } else if (config.filters.id["$gt"] === mockData[0].id) { + return { + data: [mockData[1]], + } + } + } + + if (config.entity === "product_variant") { + if (Array.isArray(config.filters.id)) { + if (config.filters.id.includes(testVariantId)) { + return { + data: [mockData[2]], + } + } else if (config.filters.id.includes(testVariantId2)) { + return { + data: [mockData[3]], + } + } + } + + if (Object.keys(config.filters).length === 0) { + return { + data: [mockData[2]], + } + } else if (config.filters.id["$gt"] === mockData[2].id) { + return { + data: [mockData[3]], + } + } + } + + return { + data: [], + } + }) + + const ackMock = jest.fn() + + await dataSynchronizer.sync({ + entityName: "product", + ack: ackMock, + }) + + jest.clearAllMocks() + + const result = await dataSynchronizer.sync({ + entityName: "product_variant", + ack: ackMock, + }) + + // First loop fetching product variants + expect(queryMock.graph).toHaveBeenNthCalledWith(1, { + entity: "product_variant", + fields: ["id"], + filters: {}, + pagination: { + order: { + id: "asc", + }, + take: 1000, + }, + }) + + // First time fetching product variant data for creation from the storage provider + expect(queryMock.graph).toHaveBeenNthCalledWith(2, { + entity: "product_variant", + filters: { + id: [testVariantId], + }, + fields: ["id", "title", "product.id", "updated_at"], + }) + + // Second loop fetching product variants + expect(queryMock.graph).toHaveBeenNthCalledWith(3, { + entity: "product_variant", + fields: ["id"], + filters: { + id: { + $gt: testVariantId, + }, + }, + pagination: { + order: { + id: "asc", + }, + take: 1000, + }, + }) + + // Second time fetching product variant data for creation from the storage provider + expect(queryMock.graph).toHaveBeenNthCalledWith(4, { + entity: "product_variant", + filters: { + id: [testVariantId2], + }, + fields: ["id", "title", "product.id", "updated_at"], + }) + + expect(ackMock).toHaveBeenNthCalledWith(1, { + lastCursor: testVariantId, + }) + + expect(ackMock).toHaveBeenNthCalledWith(2, { + lastCursor: testVariantId2, + }) + + expect(ackMock).toHaveBeenNthCalledWith(3, { + lastCursor: testVariantId2, + done: true, + }) + + expect(result).toEqual({ + lastCursor: testVariantId2, + done: true, + }) + + const indexData = await manager.find>( + toMikroORMEntity(IndexData), + {} + ) + const indexRelationData = await manager.find< + InferEntityType + >(toMikroORMEntity(IndexRelation), {}) + + expect(indexData).toHaveLength(4) + expect(indexData[0].id).toEqual(testProductId) + expect(indexData[1].id).toEqual(testProductId2) + expect(indexData[2].id).toEqual(testVariantId) + expect(indexData[3].id).toEqual(testVariantId2) + + expect(indexRelationData).toHaveLength(2) + expect(indexRelationData[0]).toEqual( + expect.objectContaining({ + parent_id: testProductId, + child_id: testVariantId, + parent_name: "Product", + child_name: "ProductVariant", + pivot: "Product-ProductVariant", + }) + ) + expect(indexRelationData[1]).toEqual( + expect.objectContaining({ + parent_id: testProductId2, + child_id: testVariantId2, + parent_name: "Product", + child_name: "ProductVariant", + pivot: "Product-ProductVariant", + }) + ) + }) + + // TODO: Add tests for errors handling and failure handling +}) diff --git a/packages/modules/index/src/migrations/.snapshot-medusa-index.json b/packages/modules/index/src/migrations/.snapshot-medusa-index.json index ec888d2c3c..41169e8932 100644 --- a/packages/modules/index/src/migrations/.snapshot-medusa-index.json +++ b/packages/modules/index/src/migrations/.snapshot-medusa-index.json @@ -1,7 +1,5 @@ { - "namespaces": [ - "public" - ], + "namespaces": ["public"], "name": "public", "tables": [ { @@ -108,10 +106,7 @@ }, { "keyName": "index_data_pkey", - "columnNames": [ - "id", - "name" - ], + "columnNames": ["id", "name"], "composite": true, "constraint": true, "primary": true, @@ -168,12 +163,7 @@ "primary": false, "nullable": false, "default": "'pending'", - "enumItems": [ - "pending", - "processing", - "done", - "error" - ], + "enumItems": ["pending", "processing", "done", "error"], "mappedType": "enum" }, "created_at": { @@ -232,9 +222,7 @@ }, { "keyName": "index_metadata_pkey", - "columnNames": [ - "id" - ], + "columnNames": ["id"], "composite": false, "constraint": true, "primary": true, @@ -366,9 +354,7 @@ }, { "keyName": "index_relation_pkey", - "columnNames": [ - "id" - ], + "columnNames": ["id"], "composite": false, "constraint": true, "primary": true, diff --git a/packages/modules/index/src/migrations/Migration20250127105159.ts b/packages/modules/index/src/migrations/Migration20250127105159.ts new file mode 100644 index 0000000000..6dc8b95e07 --- /dev/null +++ b/packages/modules/index/src/migrations/Migration20250127105159.ts @@ -0,0 +1,21 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20250127105159 extends Migration { + override async up(): Promise { + this.addSql( + `alter table if exists "index_relation" alter column "id" set not null;` + ) + this.addSql( + `alter table if exists "index_relation" add constraint "IDX_index_relation_id_pivot_parent_name_child_name_parent_id_child_id_unique" unique ("parent_id", "child_id", "child_name", "parent_name", "pivot");` + ) + } + + override async down(): Promise { + this.addSql( + `alter table if exists "index_relation" drop constraint "IDX_index_relation_id_pivot_parent_name_child_name_parent_id_child_id_unique";` + ) + this.addSql( + `alter table if exists "index_relation" alter column "id" drop not null;` + ) + } +} diff --git a/packages/modules/index/src/models/index-relation.ts b/packages/modules/index/src/models/index-relation.ts index bf983bcf41..f163c01b7b 100644 --- a/packages/modules/index/src/models/index-relation.ts +++ b/packages/modules/index/src/models/index-relation.ts @@ -8,4 +8,5 @@ const IndexRelation = model.define("IndexRelation", { child_name: model.text(), child_id: model.text().index("IDX_index_relation_child_id"), }) + export default IndexRelation diff --git a/packages/modules/index/src/services/postgres-provider.ts b/packages/modules/index/src/services/postgres-provider.ts index c0dbe177c2..684db9928c 100644 --- a/packages/modules/index/src/services/postgres-provider.ts +++ b/packages/modules/index/src/services/postgres-provider.ts @@ -14,7 +14,11 @@ import { MedusaContext, toMikroORMEntity, } from "@medusajs/framework/utils" -import { EntityManager, SqlEntityManager } from "@mikro-orm/postgresql" +import { + EntityManager, + EntityRepository, + SqlEntityManager, +} from "@mikro-orm/postgresql" import { IndexData, IndexRelation } from "@models" import { createPartitions, QueryBuilder } from "../utils" import { flattenObjectKeys } from "../utils/flatten-object-keys" @@ -204,13 +208,14 @@ export class PostgresProvider implements IndexTypes.StorageProvider { } const { fields, alias } = schemaEntityObjectRepresentation - const { data: entityData } = await this.query_.graph({ + const graphResult = await this.query_.graph({ entity: alias, filters: { id: ids, }, fields: [...new Set(["id", ...fields])], }) + const { data: entityData } = graphResult const argument = { entity: schemaEntityObjectRepresentation.entity, @@ -340,7 +345,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { transactionManager: SqlEntityManager } const indexRepository = em.getRepository(toMikroORMEntity(IndexData)) - const indexRelationRepository = em.getRepository( + const indexRelationRepository: EntityRepository = em.getRepository( toMikroORMEntity(IndexRelation) ) @@ -369,6 +374,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { id: cleanedEntityData.id, name: entity, data: cleanedEntityData, + // stale: false, }) /** @@ -394,18 +400,29 @@ export class PostgresProvider implements IndexTypes.StorageProvider { id: (parentData_ as any).id, name: parentEntity, data: parentData_, + // stale: false, }) - const parentIndexRelationEntry = indexRelationRepository.create({ - parent_id: (parentData_ as any).id, - parent_name: parentEntity, - child_id: cleanedEntityData.id, - child_name: entity, - pivot: `${parentEntity}-${entity}`, - }) - indexRelationRepository - .getEntityManager() - .persist(parentIndexRelationEntry) + await indexRelationRepository.upsert( + { + parent_id: (parentData_ as any).id, + parent_name: parentEntity, + child_id: cleanedEntityData.id, + child_name: entity, + pivot: `${parentEntity}-${entity}`, + // stale: false, + }, + { + onConflictAction: "merge", + onConflictFields: [ + "pivot", + "parent_id", + "child_id", + "parent_name", + "child_name", + ], + } + ) } } } @@ -453,6 +470,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { acc[property] = entityData[property] return acc }, {}), + // stale: false, } }) ) @@ -608,6 +626,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { id: cleanedEntityData.id, name: entity, data: cleanedEntityData, + // stale: false, }) /** @@ -620,6 +639,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { child_id: cleanedEntityData.id, child_name: entity, pivot: `${parentEntityName}-${entity}`, + // stale: false, }) const childIndexRelationEntry = indexRelationRepository.create({ @@ -628,6 +648,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { child_id: entityData[childPropertyId] as string, child_name: childEntityName, pivot: `${entity}-${childEntityName}`, + // stale: false, }) indexRelationRepository diff --git a/packages/modules/index/src/utils/sync/data-synchronizer.ts b/packages/modules/index/src/utils/sync/data-synchronizer.ts new file mode 100644 index 0000000000..85739962d4 --- /dev/null +++ b/packages/modules/index/src/utils/sync/data-synchronizer.ts @@ -0,0 +1,137 @@ +import { + IndexTypes, + RemoteQueryFunction, + SchemaObjectEntityRepresentation, + Event, +} from "@medusajs/framework/types" +import { CommonEvents } from "@medusajs/framework/utils" + +export class DataSynchronizer { + #storageProvider: IndexTypes.StorageProvider + #schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation + #query: RemoteQueryFunction + + constructor({ + storageProvider, + schemaObjectRepresentation, + query, + }: { + storageProvider: IndexTypes.StorageProvider + schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation + query: RemoteQueryFunction + }) { + this.#storageProvider = storageProvider + this.#schemaObjectRepresentation = schemaObjectRepresentation + this.#query = query + } + + async sync({ + entityName, + pagination = {}, + ack, + }: { + entityName: string + pagination?: { + cursor?: string + updated_at?: string | Date + limit?: number + batchSize?: number + } + ack: (ack: { + lastCursor: string | null + done?: boolean + err?: Error + }) => Promise + }) { + const schemaEntityObjectRepresentation = this.#schemaObjectRepresentation[ + entityName + ] as SchemaObjectEntityRepresentation + + const { fields, alias, moduleConfig } = schemaEntityObjectRepresentation + + const entityPrimaryKey = fields.find( + (field) => !!moduleConfig.linkableKeys?.[field] + ) + + if (!entityPrimaryKey) { + void ack({ + lastCursor: pagination.cursor ?? null, + err: new Error( + `Entity ${entityName} does not have a linkable primary key` + ), + }) + return + } + + let processed = 0 + let currentCursor = pagination.cursor! + const batchSize = pagination.batchSize ?? 1000 + const limit = pagination.limit ?? Infinity + let done = false + let error = null + + while (processed < limit || !done) { + const filters: Record = {} + + if (currentCursor) { + filters[entityPrimaryKey] = { $gt: currentCursor } + } + + if (pagination.updated_at) { + filters["updated_at"] = { $gt: pagination.updated_at } + } + + const { data } = await this.#query.graph({ + entity: alias, + fields: [entityPrimaryKey], + filters, + pagination: { + order: { + [entityPrimaryKey]: "asc", + }, + take: batchSize, + }, + }) + + done = !data.length + if (done) { + break + } + + const envelop: Event = { + data, + name: `*.${CommonEvents.CREATED}`, + } + + try { + await this.#storageProvider.consumeEvent( + schemaEntityObjectRepresentation + )(envelop) + currentCursor = data[data.length - 1][entityPrimaryKey] + processed += data.length + + void ack({ lastCursor: currentCursor }) + } catch (err) { + error = err + break + } + } + + let acknoledgement: { lastCursor: string; done?: boolean; err?: Error } = { + lastCursor: currentCursor, + done: true, + } + + if (error) { + acknoledgement = { + lastCursor: currentCursor, + err: error, + } + void ack(acknoledgement) + return acknoledgement + } + + void ack(acknoledgement) + return acknoledgement + } +}