diff --git a/.changeset/honest-emus-smell.md b/.changeset/honest-emus-smell.md new file mode 100644 index 0000000000..3191ad17b4 --- /dev/null +++ b/.changeset/honest-emus-smell.md @@ -0,0 +1,7 @@ +--- +"@medusajs/modules-sdk": patch +"@medusajs/index": patch +"@medusajs/utils": patch +--- + +feat(indes): full sync diff --git a/integration-tests/modules/__tests__/index/sync.spec.ts b/integration-tests/modules/__tests__/index/sync.spec.ts new file mode 100644 index 0000000000..15e696346b --- /dev/null +++ b/integration-tests/modules/__tests__/index/sync.spec.ts @@ -0,0 +1,213 @@ +import { medusaIntegrationTestRunner } from "@medusajs/test-utils" +import { IndexTypes } from "@medusajs/types" +import { defaultCurrencies, Modules } from "@medusajs/utils" +import { setTimeout } from "timers/promises" +import { + adminHeaders, + createAdminUser, +} from "../../../helpers/create-admin-user" + +jest.setTimeout(300000) + +process.env.ENABLE_INDEX_MODULE = "true" + +async function populateData( + api: any, + { productCount = 1, variantCount = 1, priceCount = 1 } = {} +) { + const shippingProfile = ( + await api.post( + `/admin/shipping-profiles`, + { name: "Test", type: "default" }, + adminHeaders + ) + ).data.shipping_profile + + for (let i = 0; i < productCount; i++) { + const payload = { + title: "Test Giftcard " + i, + shipping_profile_id: shippingProfile.id, + description: "test-giftcard-description " + i, + options: [{ title: "Denominations", values: ["100"] }], + variants: new Array(variantCount).fill(0).map((_, j) => ({ + title: `Test variant ${i} ${j}`, + sku: `test-variant-${i}-${j}`, + prices: new Array(priceCount).fill(0).map((_, k) => ({ + currency_code: Object.values(defaultCurrencies)[k].code, + amount: 10 * k, + })), + options: { + Denominations: "100", + }, + })), + } + + await api.post("/admin/products", payload, adminHeaders) + } +} + +medusaIntegrationTestRunner({ + testSuite: ({ getContainer, dbConnection, api, dbConfig }) => { + let indexEngine: IndexTypes.IIndexService + let appContainer + + beforeAll(() => { + appContainer = getContainer() + indexEngine = appContainer.resolve(Modules.INDEX) + }) + + afterAll(() => { + process.env.ENABLE_INDEX_MODULE = "false" + }) + + beforeEach(async () => { + await createAdminUser(dbConnection, adminHeaders, appContainer) + }) + + describe("Index engine syncing", () => { + it("should sync the data to the index based on the indexation configuration", async () => { + console.info("[Index engine] Creating products") + + await populateData(api, { + productCount: 2, + variantCount: 2, + priceCount: 2, + }) + + console.info("[Index engine] Creating products done") + + await setTimeout(1000) + await dbConnection.raw('TRUNCATE TABLE "index_data";') + await dbConnection.raw('TRUNCATE TABLE "index_relation";') + await dbConnection.raw('TRUNCATE TABLE "index_metadata";') + await dbConnection.raw('TRUNCATE TABLE "index_sync";') + + const { data: indexedDataAfterCreation } = + await indexEngine.query<"product">({ + fields: [ + "product.*", + "product.variants.*", + "product.variants.prices.*", + ], + }) + + expect(indexedDataAfterCreation.length).toBe(0) + + // Prevent storage provider to be triggered though + ;(indexEngine as any).storageProvider_.onApplicationStart = jest.fn() + + console.info("[Index engine] Triggering sync") + // Trigger a sync + await (indexEngine as any).onApplicationStart_() + + console.info("[Index engine] Sync done") + + // 28 ms - 6511 records + const { data: results } = await indexEngine.query<"product">({ + fields: [ + "product.*", + "product.variants.*", + "product.variants.prices.*", + ], + }) + + expect(results.length).toBe(2) + for (const result of results) { + expect(result.variants.length).toBe(2) + for (const variant of result.variants) { + expect(variant.prices.length).toBe(2) + } + } + }) + }) + + it("should sync the data to the index based on the updated indexation configuration", async () => { + console.info("[Index engine] Creating products") + + await populateData(api) + + console.info("[Index engine] Creating products done") + + await setTimeout(1000) + await dbConnection.raw('TRUNCATE TABLE "index_data";') + await dbConnection.raw('TRUNCATE TABLE "index_relation";') + await dbConnection.raw('TRUNCATE TABLE "index_metadata";') + await dbConnection.raw('TRUNCATE TABLE "index_sync";') + + const { data: indexedDataAfterCreation } = + await indexEngine.query<"product">({ + fields: [ + "product.*", + "product.variants.*", + "product.variants.prices.*", + ], + }) + + expect(indexedDataAfterCreation.length).toBe(0) + + // Prevent storage provider to be triggered though + ;(indexEngine as any).storageProvider_.onApplicationStart = jest.fn() + + console.info("[Index engine] Triggering sync") + // Trigger a sync + await (indexEngine as any).onApplicationStart_() + + console.info("[Index engine] Sync done") + + const { data: results } = await indexEngine.query<"product">({ + fields: [ + "product.*", + "product.variants.*", + "product.variants.prices.*", + ], + }) + + expect(results.length).toBe(1) + expect(results[0].variants.length).toBe(1) + expect(results[0].variants[0].prices.length).toBe(1) + + // Manually change the indexation configuration + ;(indexEngine as any).schemaObjectRepresentation_ = null + ;(indexEngine as any).moduleOptions_ = { + ...(indexEngine as any).moduleOptions_, + schema: ` + type Product @Listeners(values: ["product.created", "product.updated", "product.deleted"]) { + id: String + title: String + handle: String + variants: [ProductVariant] + } + + type ProductVariant @Listeners(values: ["variant.created", "variant.updated", "variant.deleted"]) { + id: String + product_id: String + sku: String + description: String + } + `, + } + + // Trigger a sync + await (indexEngine as any).onApplicationStart_() + await setTimeout(1000) + + const { data: updatedResults } = await indexEngine.query<"product">({ + fields: ["product.*", "product.variants.*"], + }) + + expect(updatedResults.length).toBe(1) + expect(updatedResults[0].variants.length).toBe(1) + + let staledRaws = await dbConnection.raw( + 'SELECT * FROM "index_data" WHERE "staled_at" IS NOT NULL' + ) + + expect(staledRaws.rows.length).toBe(0) + + staledRaws = await dbConnection.raw( + 'SELECT * FROM "index_relation" WHERE "staled_at" IS NOT NULL' + ) + expect(staledRaws.rows.length).toBe(0) + }) + }, +}) diff --git a/packages/core/modules-sdk/src/definitions.ts b/packages/core/modules-sdk/src/definitions.ts index e429682597..2193a23167 100644 --- a/packages/core/modules-sdk/src/definitions.ts +++ b/packages/core/modules-sdk/src/definitions.ts @@ -271,6 +271,7 @@ export const ModulesDefinition: { isQueryable: false, dependencies: [ Modules.EVENT_BUS, + Modules.LOCKING, ContainerRegistrationKeys.LOGGER, ContainerRegistrationKeys.REMOTE_QUERY, ContainerRegistrationKeys.QUERY, diff --git a/packages/core/utils/src/modules-sdk/build-query.ts b/packages/core/utils/src/modules-sdk/build-query.ts index 3dbbd22924..cb66b6a604 100644 --- a/packages/core/utils/src/modules-sdk/build-query.ts +++ b/packages/core/utils/src/modules-sdk/build-query.ts @@ -68,6 +68,10 @@ function buildWhere( } if (["$or", "$and"].includes(prop)) { + if (!Array.isArray(value)) { + throw new Error(`Expected array for ${prop} but got ${value}`) + } + where[prop] = value.map((val) => { const deepWhere = {} buildWhere(val, deepWhere, flags) diff --git a/packages/core/utils/src/modules-sdk/medusa-internal-service.ts b/packages/core/utils/src/modules-sdk/medusa-internal-service.ts index d18fe53d29..a953f5c1da 100644 --- a/packages/core/utils/src/modules-sdk/medusa-internal-service.ts +++ b/packages/core/utils/src/modules-sdk/medusa-internal-service.ts @@ -273,6 +273,7 @@ export function MedusaInternalService< {}, sharedContext ) + // Create a pair of entity and data to update entitiesToUpdate.forEach((entity) => { toUpdateData.push({ @@ -345,6 +346,10 @@ export function MedusaInternalService< } } + if (!toUpdateData.length) { + return [] + } + return await this[propertyRepositoryName].update( toUpdateData, sharedContext @@ -458,6 +463,10 @@ export function MedusaInternalService< }) } + if (!deleteCriteria.$or.length) { + return + } + await this[propertyRepositoryName].delete(deleteCriteria, sharedContext) } diff --git a/packages/modules/index/integration-tests/__fixtures__/medusa-config.js b/packages/modules/index/integration-tests/__fixtures__/medusa-config.js index 81f729d5ce..d06a47164b 100644 --- a/packages/modules/index/integration-tests/__fixtures__/medusa-config.js +++ b/packages/modules/index/integration-tests/__fixtures__/medusa-config.js @@ -1,4 +1,8 @@ -const { defineConfig, Modules } = require("@medusajs/framework/utils") +const { + defineConfig, + Modules, + ContainerRegistrationKeys, +} = require("@medusajs/framework/utils") const { schema } = require("./schema") export const dbName = "medusa-index-integration-2024" @@ -26,13 +30,19 @@ Object.keys(config.modules).forEach((key) => { config.modules[Modules.INDEX] = { resolve: "@medusajs/index", - dependencies: [Modules.EVENT_BUS], + dependencies: [ + Modules.EVENT_BUS, + Modules.LOCKING, + ContainerRegistrationKeys.REMOTE_QUERY, + ContainerRegistrationKeys.QUERY, + ], options: { schema, }, } config.modules[Modules.PRODUCT] = true +config.modules[Modules.LOCKING] = true config.modules[Modules.PRICING] = true export default config diff --git a/packages/modules/index/integration-tests/__fixtures__/update-removed-schema.ts b/packages/modules/index/integration-tests/__fixtures__/update-removed-schema.ts index 5d177d32d0..038ab388f9 100644 --- a/packages/modules/index/integration-tests/__fixtures__/update-removed-schema.ts +++ b/packages/modules/index/integration-tests/__fixtures__/update-removed-schema.ts @@ -10,12 +10,6 @@ export const updateRemovedSchema = ` id: String product_id: String sku: String - prices: [Price] description: String } - - type Price @Listeners(values: ["price.created", "price.updated", "price.deleted"]) { - amount: Float - currency_code: String - } ` diff --git a/packages/modules/index/integration-tests/__tests__/config-sync.spec.ts b/packages/modules/index/integration-tests/__tests__/config-sync.spec.ts index 5a157b2a8d..259858357c 100644 --- a/packages/modules/index/integration-tests/__tests__/config-sync.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/config-sync.spec.ts @@ -5,13 +5,10 @@ import { MedusaAppLoader, } from "@medusajs/framework" import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk" -import { - ContainerRegistrationKeys, - ModuleRegistrationName, - Modules, -} from "@medusajs/framework/utils" +import { ContainerRegistrationKeys, Modules } from "@medusajs/framework/utils" import { initDb, TestDatabaseUtils } from "@medusajs/test-utils" -import { EntityManager } from "@mikro-orm/postgresql" +import { IndexTypes, ModulesSdkTypes } from "@medusajs/types" +import { Configuration } from "@utils" import { asValue } from "awilix" import path from "path" import { setTimeout } from "timers/promises" @@ -21,9 +18,9 @@ import { updateRemovedSchema } from "../__fixtures__/update-removed-schema" import { updatedSchema } from "../__fixtures__/updated-schema" const eventBusMock = new EventBusServiceMock() -const queryMock = jest.fn().mockReturnValue({ - graph: jest.fn(), -}) +const queryMock = { + graph: jest.fn().mockImplementation(async () => ({ data: [] })), +} const dbUtils = TestDatabaseUtils.dbTestUtilFactory() @@ -31,6 +28,7 @@ jest.setTimeout(300000) let isFirstTime = true let medusaAppLoader!: MedusaAppLoader +let index: IndexTypes.IIndexService const beforeAll_ = async () => { try { @@ -45,11 +43,10 @@ const beforeAll_ = async () => { container.register({ [ContainerRegistrationKeys.LOGGER]: asValue(logger), - [ContainerRegistrationKeys.QUERY]: asValue(null), [ContainerRegistrationKeys.PG_CONNECTION]: asValue(dbUtils.pgConnection_), }) - medusaAppLoader = new MedusaAppLoader(container as any) + medusaAppLoader = new MedusaAppLoader() // Migrations await medusaAppLoader.runModulesMigrations() @@ -62,14 +59,16 @@ const beforeAll_ = async () => { // Bootstrap modules const globalApp = await medusaAppLoader.load() + container.register({ + [ContainerRegistrationKeys.QUERY]: asValue(queryMock), + [ContainerRegistrationKeys.REMOTE_QUERY]: asValue(queryMock), + [Modules.EVENT_BUS]: asValue(eventBusMock), + }) - const index = container.resolve(Modules.INDEX) - - // Mock event bus the index module - ;(index as any).eventBusModuleService_ = eventBusMock + index = container.resolve(Modules.INDEX) await globalApp.onApplicationStart() - ;(index as any).storageProvider_.query_ = queryMock + await setTimeout(1000) return globalApp } catch (error) { @@ -105,8 +104,9 @@ const afterEach_ = async () => { describe("IndexModuleService syncIndexConfig", function () { let medusaApp: MedusaAppOutput - let module: any - let manager: EntityManager + let indexMetadataService: ModulesSdkTypes.IMedusaInternalService + let indexSyncService: ModulesSdkTypes.IMedusaInternalService + let dataSynchronizer: ModulesSdkTypes.IMedusaInternalService let onApplicationPrepareShutdown!: () => Promise let onApplicationShutdown!: () => Promise @@ -125,8 +125,10 @@ describe("IndexModuleService syncIndexConfig", function () { beforeEach(async () => { await beforeEach_() - module = medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) - manager = module.container_.manager as EntityManager + index = container.resolve(Modules.INDEX) + indexMetadataService = (index as any).indexMetadataService_ + indexSyncService = (index as any).indexSyncService_ + dataSynchronizer = (index as any).dataSynchronizer_ }) afterEach(afterEach_) @@ -134,7 +136,7 @@ describe("IndexModuleService syncIndexConfig", function () { it("should full sync all entities when the config has changed", async () => { await setTimeout(1000) - const currentMetadata = await module.listIndexMetadata() + const currentMetadata = await indexMetadataService.list() expect(currentMetadata).toHaveLength(7) expect(currentMetadata).toEqual( @@ -177,13 +179,25 @@ describe("IndexModuleService syncIndexConfig", function () { ]) ) - // update config schema - module.schemaObjectRepresentation_ = null - module.moduleOptions_ ??= {} - module.moduleOptions_.schema = updatedSchema - module.buildSchemaObjectRepresentation_() + let indexSync = await indexSyncService.list({ + last_key: null, + }) + expect(indexSync).toHaveLength(7) - const syncRequired = await module.syncIndexConfig() + // update config schema + ;(index as any).schemaObjectRepresentation_ = null + ;(index as any).moduleOptions_ ??= {} + ;(index as any).moduleOptions_.schema = updatedSchema + ;(index as any).buildSchemaObjectRepresentation_() + + let configurationChecker = new Configuration({ + schemaObjectRepresentation: (index as any).schemaObjectRepresentation_, + indexMetadataService, + indexSyncService, + dataSynchronizer, + }) + + const syncRequired = await configurationChecker.checkChanges() expect(syncRequired).toHaveLength(2) expect(syncRequired).toEqual( @@ -201,7 +215,12 @@ describe("IndexModuleService syncIndexConfig", function () { ]) ) - const updatedMetadata = await module.listIndexMetadata() + indexSync = await indexSyncService.list({ + last_key: null, + }) + expect(indexSync).toHaveLength(7) + + const updatedMetadata = await indexMetadataService.list() expect(updatedMetadata).toHaveLength(7) expect(updatedMetadata).toEqual( @@ -243,15 +262,28 @@ describe("IndexModuleService syncIndexConfig", function () { }), ]) ) - await module.syncEntities(syncRequired) + + await (index as any).dataSynchronizer_.syncEntities(syncRequired) // Sync again removing entities not linked - module.schemaObjectRepresentation_ = null - module.moduleOptions_ ??= {} - module.moduleOptions_.schema = updateRemovedSchema - module.buildSchemaObjectRepresentation_() + ;(index as any).schemaObjectRepresentation_ = null + ;(index as any).moduleOptions_ ??= {} + ;(index as any).moduleOptions_.schema = updateRemovedSchema + ;(index as any).buildSchemaObjectRepresentation_() - const syncRequired2 = await module.syncIndexConfig() + const spyDataSynchronizer_ = jest.spyOn( + (index as any).dataSynchronizer_, + "removeEntities" + ) + + configurationChecker = new Configuration({ + schemaObjectRepresentation: (index as any).schemaObjectRepresentation_, + indexMetadataService, + indexSyncService, + dataSynchronizer, + }) + + const syncRequired2 = await configurationChecker.checkChanges() expect(syncRequired2).toHaveLength(1) expect(syncRequired2).toEqual( expect.arrayContaining([ @@ -263,8 +295,8 @@ describe("IndexModuleService syncIndexConfig", function () { ]) ) - const updatedMetadata2 = await module.listIndexMetadata() - expect(updatedMetadata2).toHaveLength(5) + const updatedMetadata2 = await indexMetadataService.list() + expect(updatedMetadata2).toHaveLength(2) expect(updatedMetadata2).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -272,27 +304,14 @@ describe("IndexModuleService syncIndexConfig", function () { fields: "handle,id,title", status: "done", }), - expect.objectContaining({ - entity: "PriceSet", - fields: "id", - status: "done", - }), - expect.objectContaining({ - entity: "Price", - fields: "amount,currency_code,price_set.id", - status: "done", - }), expect.objectContaining({ entity: "ProductVariant", fields: "description,id,product.id,product_id,sku", status: "pending", }), - expect.objectContaining({ - entity: "LinkProductVariantPriceSet", - fields: "id,price_set_id,variant_id", - status: "done", - }), ]) ) + + expect(spyDataSynchronizer_).toHaveBeenCalledTimes(1) }) }) diff --git a/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts b/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts index c7669bdec1..684fd33af2 100644 --- a/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/data-synchronizer.spec.ts @@ -8,18 +8,18 @@ import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk" import { IndexTypes, InferEntityType } from "@medusajs/framework/types" import { ContainerRegistrationKeys, - ModuleRegistrationName, Modules, toMikroORMEntity, } from "@medusajs/framework/utils" import { initDb, TestDatabaseUtils } from "@medusajs/test-utils" -import { asValue } from "awilix" -import * as path from "path" -import { DataSynchronizer } from "../../src/utils/sync/data-synchronizer" -import { EventBusServiceMock } from "../__fixtures__" -import { dbName } from "../__fixtures__/medusa-config" import { EntityManager } from "@mikro-orm/postgresql" import { IndexData, IndexRelation } from "@models" +import { DataSynchronizer } from "@services" +import { asValue } from "awilix" +import * as path from "path" +import { setTimeout } from "timers/promises" +import { EventBusServiceMock } from "../__fixtures__" +import { dbName } from "../__fixtures__/medusa-config" const eventBusMock = new EventBusServiceMock() const queryMock = { @@ -28,7 +28,7 @@ const queryMock = { const dbUtils = TestDatabaseUtils.dbTestUtilFactory() -jest.setTimeout(30000) +jest.setTimeout(300000) const testProductId = "test_prod_1" const testProductId2 = "test_prod_2" @@ -80,11 +80,10 @@ const beforeAll_ = async () => { container.register({ [ContainerRegistrationKeys.LOGGER]: asValue(logger), - [ContainerRegistrationKeys.QUERY]: asValue(null), [ContainerRegistrationKeys.PG_CONNECTION]: asValue(dbUtils.pgConnection_), }) - medusaAppLoader = new MedusaAppLoader(container as any) + medusaAppLoader = new MedusaAppLoader() // Migrations await medusaAppLoader.runModulesMigrations() @@ -97,14 +96,16 @@ const beforeAll_ = async () => { // Bootstrap modules const globalApp = await medusaAppLoader.load() + container.register({ + [ContainerRegistrationKeys.QUERY]: asValue(queryMock), + [ContainerRegistrationKeys.REMOTE_QUERY]: asValue(queryMock), + [Modules.EVENT_BUS]: asValue(eventBusMock), + }) index = container.resolve(Modules.INDEX) - // Mock event bus the index module - ;(index as any).eventBusModuleService_ = eventBusMock - await globalApp.onApplicationStart() - ;(index as any).storageProvider_.query_ = queryMock + await setTimeout(1000) return globalApp } catch (error) { @@ -125,9 +126,6 @@ describe("DataSynchronizer", () => { medusaApp = await beforeAll_() onApplicationPrepareShutdown = medusaApp.onApplicationPrepareShutdown onApplicationShutdown = medusaApp.onApplicationShutdown - manager = ( - medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) as any - ).container_.manager as EntityManager }) afterAll(async () => { @@ -139,55 +137,9 @@ describe("DataSynchronizer", () => { beforeEach(async () => { jest.clearAllMocks() index = container.resolve(Modules.INDEX) + manager = (index as any).container_.manager as EntityManager - const productSchemaObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation = - { - fields: ["id", "title", "updated_at"], - alias: "product", - moduleConfig: { - linkableKeys: { - id: "Product", - product_id: "Product", - product_variant_id: "ProductVariant", - }, - }, - entity: "Product", - parents: [], - listeners: ["product.created"], - } - - const productVariantSchemaObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation = - { - fields: ["id", "title", "product.id", "updated_at"], - alias: "product_variant", - moduleConfig: { - linkableKeys: { - id: "ProductVariant", - product_id: "Product", - product_variant_id: "ProductVariant", - }, - }, - entity: "ProductVariant", - parents: [ - { - ref: productSchemaObjectRepresentation, - inSchemaRef: productSchemaObjectRepresentation, - targetProp: "id", - }, - ], - listeners: ["product-variant.created"], - } - - const mockSchemaRepresentation = { - product: productSchemaObjectRepresentation, - product_variant: productVariantSchemaObjectRepresentation, - } - - dataSynchronizer = new DataSynchronizer({ - storageProvider: (index as any).storageProvider_, - schemaObjectRepresentation: mockSchemaRepresentation, - query: queryMock as any, - }) + dataSynchronizer = (index as any).dataSynchronizer_ }) describe("sync", () => { @@ -223,8 +175,8 @@ describe("DataSynchronizer", () => { const ackMock = jest.fn() - const result = await dataSynchronizer.sync({ - entityName: "product", + const result = await dataSynchronizer.syncEntity({ + entityName: "Product", ack: ackMock, }) @@ -237,7 +189,7 @@ describe("DataSynchronizer", () => { order: { id: "asc", }, - take: 1000, + take: 100, }, }) @@ -247,7 +199,7 @@ describe("DataSynchronizer", () => { filters: { id: [testProductId], }, - fields: ["id", "title", "updated_at"], + fields: ["id", "title"], }) // Second loop fetching products @@ -263,7 +215,7 @@ describe("DataSynchronizer", () => { order: { id: "asc", }, - take: 1000, + take: 100, }, }) @@ -273,7 +225,7 @@ describe("DataSynchronizer", () => { filters: { id: [testProductId2], }, - fields: ["id", "title", "updated_at"], + fields: ["id", "title"], }) expect(ackMock).toHaveBeenNthCalledWith(1, { @@ -304,8 +256,16 @@ describe("DataSynchronizer", () => { ) expect(indexData).toHaveLength(2) - expect(indexData[0].id).toEqual(testProductId) - expect(indexData[1].id).toEqual(testProductId2) + expect(indexData).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + id: testProductId, + }), + expect.objectContaining({ + id: testProductId2, + }), + ]) + ) expect(indexRelationData).toHaveLength(0) }) @@ -369,15 +329,15 @@ describe("DataSynchronizer", () => { const ackMock = jest.fn() - await dataSynchronizer.sync({ - entityName: "product", + await dataSynchronizer.syncEntity({ + entityName: "Product", ack: ackMock, }) jest.clearAllMocks() - const result = await dataSynchronizer.sync({ - entityName: "product_variant", + const result = await dataSynchronizer.syncEntity({ + entityName: "ProductVariant", ack: ackMock, }) @@ -390,7 +350,7 @@ describe("DataSynchronizer", () => { order: { id: "asc", }, - take: 1000, + take: 100, }, }) @@ -400,7 +360,7 @@ describe("DataSynchronizer", () => { filters: { id: [testVariantId], }, - fields: ["id", "title", "product.id", "updated_at"], + fields: ["id", "product.id", "product_id", "sku"], }) // Second loop fetching product variants @@ -416,7 +376,7 @@ describe("DataSynchronizer", () => { order: { id: "asc", }, - take: 1000, + take: 100, }, }) @@ -426,7 +386,7 @@ describe("DataSynchronizer", () => { filters: { id: [testVariantId2], }, - fields: ["id", "title", "product.id", "updated_at"], + fields: ["id", "product.id", "product_id", "sku"], }) expect(ackMock).toHaveBeenNthCalledWith(1, { @@ -456,29 +416,33 @@ describe("DataSynchronizer", () => { >(toMikroORMEntity(IndexRelation), {}) expect(indexData).toHaveLength(4) - expect(indexData[0].id).toEqual(testProductId) - expect(indexData[1].id).toEqual(testProductId2) - expect(indexData[2].id).toEqual(testVariantId) - expect(indexData[3].id).toEqual(testVariantId2) + expect(indexData).toEqual( + expect.arrayContaining([ + expect.objectContaining({ id: testProductId }), + expect.objectContaining({ id: testProductId2 }), + expect.objectContaining({ id: testVariantId }), + expect.objectContaining({ id: testVariantId2 }), + ]) + ) expect(indexRelationData).toHaveLength(2) - expect(indexRelationData[0]).toEqual( - expect.objectContaining({ - parent_id: testProductId, - child_id: testVariantId, - parent_name: "Product", - child_name: "ProductVariant", - pivot: "Product-ProductVariant", - }) - ) - expect(indexRelationData[1]).toEqual( - expect.objectContaining({ - parent_id: testProductId2, - child_id: testVariantId2, - parent_name: "Product", - child_name: "ProductVariant", - pivot: "Product-ProductVariant", - }) + expect(indexRelationData).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + parent_id: testProductId, + child_id: testVariantId, + parent_name: "Product", + child_name: "ProductVariant", + pivot: "Product-ProductVariant", + }), + expect.objectContaining({ + parent_id: testProductId2, + child_id: testVariantId2, + parent_name: "Product", + child_name: "ProductVariant", + pivot: "Product-ProductVariant", + }), + ]) ) }) diff --git a/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts b/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts index 69c6396333..10b5a20c05 100644 --- a/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts @@ -8,7 +8,6 @@ import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk" import { EventBusTypes, IndexTypes } from "@medusajs/framework/types" import { ContainerRegistrationKeys, - ModuleRegistrationName, Modules, toMikroORMEntity, } from "@medusajs/framework/utils" @@ -17,6 +16,7 @@ import { EntityManager } from "@mikro-orm/postgresql" import { IndexData, IndexRelation } from "@models" import { asValue } from "awilix" import * as path from "path" +import { setTimeout } from "timers/promises" import { EventBusServiceMock } from "../__fixtures__" import { dbName } from "../__fixtures__/medusa-config" @@ -151,6 +151,7 @@ const beforeEach_ = async (eventDataToEmit) => { if (isFirstTime) { isFirstTime = false await sendEvents(eventDataToEmit) + return } @@ -241,11 +242,11 @@ describe("IndexModuleService", function () { ] beforeEach(async () => { + await setTimeout(1000) await beforeEach_(eventDataToEmit) - manager = ( - medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) as any - ).container_.manager as EntityManager + manager = (medusaApp.sharedContainer!.resolve(Modules.INDEX) as any) + .container_.manager as EntityManager }) afterEach(afterEach_) @@ -404,9 +405,8 @@ describe("IndexModuleService", function () { beforeEach(async () => { await beforeEach_(eventDataToEmit) - manager = ( - medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) as any - ).container_.manager as EntityManager + manager = (medusaApp.sharedContainer!.resolve(Modules.INDEX) as any) + .container_.manager as EntityManager }) afterEach(afterEach_) @@ -565,9 +565,8 @@ describe("IndexModuleService", function () { beforeEach(async () => { await beforeEach_(eventDataToEmit) - manager = ( - medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) as any - ).container_.manager as EntityManager + manager = (medusaApp.sharedContainer!.resolve(Modules.INDEX) as any) + .container_.manager as EntityManager await updateData(manager) @@ -686,9 +685,8 @@ describe("IndexModuleService", function () { beforeEach(async () => { await beforeEach_(eventDataToEmit) - manager = ( - medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) as any - ).container_.manager as EntityManager + manager = (medusaApp.sharedContainer!.resolve(Modules.INDEX) as any) + .container_.manager as EntityManager queryMock.graph = jest.fn().mockImplementation((query) => { const entity = query.entity diff --git a/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts b/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts index b560d17aff..d0a6446ae8 100644 --- a/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts @@ -1,7 +1,7 @@ import { asValue } from "awilix" import { container } from "@medusajs/framework" import type { IndexTypes } from "@medusajs/types" -import { Orchestrator } from "../../src/orchestrator" +import { Orchestrator } from "@utils" function creatingFakeLockingModule() { return { @@ -46,15 +46,19 @@ describe("Orchestrator", () => { locking: asValue(lockingModule), }) - const orchestrator = new Orchestrator(container, entities, { - lockDuration: 60 * 1000, - async taskRunner(entity) { - expect(orchestrator.state).toEqual("processing") - processedEntities.push(entity.entity) - }, - }) + async function taskRunner(entity: string) { + processedEntities.push(entity) + } - await orchestrator.process() + const orchestrator = new Orchestrator( + container.resolve("locking"), + entities.map((e) => e.entity), + { + lockDuration: 60 * 1000, + } + ) + + await orchestrator.process(taskRunner) expect(lockingModule.lockEntities.size).toEqual(0) expect(orchestrator.state).toEqual("completed") expect(processedEntities).toEqual(["brand", "product"]) @@ -92,14 +96,19 @@ describe("Orchestrator", () => { }), }) - const orchestrator = new Orchestrator(container, entities, { - lockDuration: 60 * 1000, - async taskRunner(entity) { - processedEntities.push(entity.entity) - }, - }) + const orchestrator = new Orchestrator( + container.resolve("locking"), + entities.map((e) => e.entity), + { + lockDuration: 60 * 1000, + } + ) - await orchestrator.process() + async function taskRunner(entity: string) { + processedEntities.push(entity) + } + + await orchestrator.process(taskRunner) expect(processedEntities).toEqual([]) }) @@ -130,20 +139,36 @@ describe("Orchestrator", () => { locking: asValue(lockingModule), }) - const orchestrator = new Orchestrator(container, entities, { - lockDuration: 60 * 1000, - async taskRunner(entity) { - processedEntities.push({ entity: entity.entity, owner: "instance-1" }) - }, - }) - const orchestrator1 = new Orchestrator(container, entities, { - lockDuration: 60 * 1000, - async taskRunner(entity) { - processedEntities.push({ entity: entity.entity, owner: "instance-2" }) - }, - }) + const entityNames = entities.map((e) => e.entity) - await Promise.all([orchestrator.process(), orchestrator1.process()]) + async function taskRunner(entity: string) { + processedEntities.push({ entity: entity, owner: "instance-1" }) + } + + const orchestrator = new Orchestrator( + container.resolve("locking"), + entityNames, + { + lockDuration: 60 * 1000, + } + ) + + async function taskRunner2(entity: string) { + processedEntities.push({ entity: entity, owner: "instance-2" }) + } + + const orchestrator1 = new Orchestrator( + container.resolve("locking"), + entityNames, + { + lockDuration: 60 * 1000, + } + ) + + await Promise.all([ + orchestrator.process(taskRunner), + orchestrator1.process(taskRunner2), + ]) expect(processedEntities).toEqual([ { entity: "brand", @@ -184,17 +209,24 @@ describe("Orchestrator", () => { locking: asValue(lockingModule), }) - const orchestrator = new Orchestrator(container, entities, { - lockDuration: 60 * 1000, - async taskRunner(entity) { - if (entity.entity === "product") { - throw new Error("Cannot process") - } - processedEntities.push(entity.entity) - }, - }) + async function taskRunner(entity: string) { + if (entity === "product") { + throw new Error("Cannot process") + } + processedEntities.push(entity) + } - await expect(orchestrator.process()).rejects.toThrow("Cannot process") + const orchestrator = new Orchestrator( + container.resolve("locking"), + entities.map((e) => e.entity), + { + lockDuration: 60 * 1000, + } + ) + + await expect(orchestrator.process(taskRunner)).rejects.toThrow( + "Cannot process" + ) expect(orchestrator.state).toEqual("error") expect(processedEntities).toEqual(["brand"]) expect(lockingModule.lockEntities.size).toEqual(0) @@ -227,16 +259,24 @@ describe("Orchestrator", () => { locking: asValue(lockingModule), }) - const orchestrator = new Orchestrator(container, entities, { - lockDuration: 60 * 1000, - async taskRunner(entity) { - expect(orchestrator.state).toEqual("processing") - processedEntities.push(entity.entity) - }, - }) + async function taskRunner(entity: string) { + expect(orchestrator.state).toEqual("processing") + processedEntities.push(entity) + } + + const orchestrator = new Orchestrator( + container.resolve("locking"), + entities.map((e) => e.entity), + { + lockDuration: 60 * 1000, + } + ) await expect( - Promise.all([orchestrator.process(), orchestrator.process()]) + Promise.all([ + orchestrator.process(taskRunner), + orchestrator.process(taskRunner), + ]) ).rejects.toThrow("Cannot re-run an already running orchestrator instance") expect(lockingModule.lockEntities.size).toEqual(0) diff --git a/packages/modules/index/integration-tests/__tests__/query-builder.spec.ts b/packages/modules/index/integration-tests/__tests__/query-builder.spec.ts index 7a5549637a..d1b945be83 100644 --- a/packages/modules/index/integration-tests/__tests__/query-builder.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/query-builder.spec.ts @@ -6,11 +6,7 @@ import { } from "@medusajs/framework" import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk" import { IndexTypes } from "@medusajs/framework/types" -import { - ContainerRegistrationKeys, - ModuleRegistrationName, - Modules, -} from "@medusajs/framework/utils" +import { ContainerRegistrationKeys, Modules } from "@medusajs/framework/utils" import { initDb, TestDatabaseUtils } from "@medusajs/test-utils" import { EntityManager } from "@mikro-orm/postgresql" import { IndexData, IndexRelation } from "@models" @@ -123,11 +119,11 @@ describe("IndexModuleService query", function () { beforeEach(async () => { await beforeEach_() - module = medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) + module = medusaApp.sharedContainer!.resolve(Modules.INDEX) const manager = ( - (medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) as any) - .container_.manager as EntityManager + (medusaApp.sharedContainer!.resolve(Modules.INDEX) as any).container_ + .manager as EntityManager ).fork() const indexRepository = manager.getRepository(IndexData) @@ -343,6 +339,65 @@ describe("IndexModuleService query", function () { ]) }) + it("should query all products ordered by sku DESC with specific fields", async () => { + const { data } = await module.query({ + fields: [ + "product.*", + "product.variants.sku", + "product.variants.prices.amount", + ], + pagination: { + order: { + product: { + variants: { + sku: "DESC", + }, + }, + }, + }, + }) + + expect(data).toEqual([ + { + id: "prod_2", + title: "Product 2 title", + deep: { + a: 1, + obj: { + b: 15, + }, + }, + variants: [], + }, + { + id: "prod_1", + variants: [ + { + id: "var_2", + sku: "sku 123", + prices: [ + { + id: "money_amount_2", + amount: 10, + }, + ], + }, + + { + id: "var_1", + sku: "aaa test aaa", + prices: [ + { + id: "money_amount_1", + amount: 100, + }, + ], + }, + ], + }, + ]) + }) + it("should query all products ordered by price", async () => { const { data } = await module.query({ fields: ["product.*", "product.variants.*", "product.variants.prices.*"], @@ -484,6 +539,35 @@ describe("IndexModuleService query", function () { ]) }) + it("should query products filtering by variant sku", async () => { + const { data } = await module.query({ + fields: ["product.*", "product.variants.*", "product.variants.prices.*"], + joinFilters: { + "product.variants.prices.amount": { $gt: 110 }, + }, + filters: { + product: { + variants: { + sku: { $like: "aaa%" }, + }, + }, + }, + }) + + expect(data).toEqual([ + { + id: "prod_1", + variants: [ + { + id: "var_1", + sku: "aaa test aaa", + prices: [], + }, + ], + }, + ]) + }) + it("should query products filtering by price and returning the complete entity", async () => { const { data } = await module.query({ fields: ["product.*", "product.variants.*", "product.variants.prices.*"], diff --git a/packages/modules/index/src/migrations/.snapshot-medusa-index.json b/packages/modules/index/src/migrations/.snapshot-medusa-index.json index 41169e8932..77519e8e48 100644 --- a/packages/modules/index/src/migrations/.snapshot-medusa-index.json +++ b/packages/modules/index/src/migrations/.snapshot-medusa-index.json @@ -1,5 +1,7 @@ { - "namespaces": ["public"], + "namespaces": [ + "public" + ], "name": "public", "tables": [ { @@ -32,6 +34,16 @@ "default": "'{}'", "mappedType": "json" }, + "staled_at": { + "name": "staled_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + }, "created_at": { "name": "created_at", "type": "timestamptz", @@ -77,36 +89,12 @@ "unique": false, "expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_data_deleted_at\" ON \"index_data\" (deleted_at) WHERE deleted_at IS NULL" }, - { - "keyName": "IDX_index_data_gin", - "columnNames": [], - "composite": false, - "constraint": false, - "primary": false, - "unique": false, - "expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_data_gin\" ON \"index_data\" USING GIN (data) WHERE deleted_at IS NULL" - }, - { - "keyName": "IDX_index_data_id", - "columnNames": [], - "composite": false, - "constraint": false, - "primary": false, - "unique": false, - "expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_data_id\" ON \"index_data\" (id) WHERE deleted_at IS NULL" - }, - { - "keyName": "IDX_index_data_name", - "columnNames": [], - "composite": false, - "constraint": false, - "primary": false, - "unique": false, - "expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_data_name\" ON \"index_data\" (name) WHERE deleted_at IS NULL" - }, { "keyName": "index_data_pkey", - "columnNames": ["id", "name"], + "columnNames": [ + "id", + "name" + ], "composite": true, "constraint": true, "primary": true, @@ -163,7 +151,12 @@ "primary": false, "nullable": false, "default": "'pending'", - "enumItems": ["pending", "processing", "done", "error"], + "enumItems": [ + "pending", + "processing", + "done", + "error" + ], "mappedType": "enum" }, "created_at": { @@ -222,7 +215,9 @@ }, { "keyName": "index_metadata_pkey", - "columnNames": ["id"], + "columnNames": [ + "id" + ], "composite": false, "constraint": true, "primary": true, @@ -289,6 +284,16 @@ "nullable": false, "mappedType": "text" }, + "staled_at": { + "name": "staled_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + }, "created_at": { "name": "created_at", "type": "timestamptz", @@ -325,24 +330,6 @@ "name": "index_relation", "schema": "public", "indexes": [ - { - "keyName": "IDX_index_relation_parent_id", - "columnNames": [], - "composite": false, - "constraint": false, - "primary": false, - "unique": false, - "expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_relation_parent_id\" ON \"index_relation\" (parent_id) WHERE deleted_at IS NULL" - }, - { - "keyName": "IDX_index_relation_child_id", - "columnNames": [], - "composite": false, - "constraint": false, - "primary": false, - "unique": false, - "expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_relation_child_id\" ON \"index_relation\" (child_id) WHERE deleted_at IS NULL" - }, { "keyName": "IDX_index_relation_deleted_at", "columnNames": [], @@ -354,7 +341,107 @@ }, { "keyName": "index_relation_pkey", - "columnNames": ["id"], + "columnNames": [ + "id" + ], + "composite": false, + "constraint": true, + "primary": true, + "unique": true + } + ], + "checks": [], + "foreignKeys": {}, + "nativeEnums": {} + }, + { + "columns": { + "id": { + "name": "id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "entity": { + "name": "entity", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "last_key": { + "name": "last_key", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "text" + }, + "created_at": { + "name": "created_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + } + }, + "name": "index_sync", + "schema": "public", + "indexes": [ + { + "keyName": "IDX_index_sync_deleted_at", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_sync_deleted_at\" ON \"index_sync\" (deleted_at) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_index_sync_entity", + "columnNames": [], + "composite": false, + "constraint": false, + "primary": false, + "unique": false, + "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_index_sync_entity\" ON \"index_sync\" (entity) WHERE deleted_at IS NULL" + }, + { + "keyName": "index_sync_pkey", + "columnNames": [ + "id" + ], "composite": false, "constraint": true, "primary": true, diff --git a/packages/modules/index/src/migrations/Migration20231019174230.ts b/packages/modules/index/src/migrations/Migration20231019174230.ts index ce109dafe5..5c49765a55 100644 --- a/packages/modules/index/src/migrations/Migration20231019174230.ts +++ b/packages/modules/index/src/migrations/Migration20231019174230.ts @@ -5,17 +5,9 @@ export class Migration20231019174230 extends Migration { this.addSql( `create table "index_data" ("id" text not null, "name" text not null, "data" jsonb not null default '{}', constraint "index_data_pkey" primary key ("id", "name")) PARTITION BY LIST("name");` ) - this.addSql(`create index "IDX_index_data_id" on "index_data" ("id");`) - this.addSql(`create index "IDX_index_data_name" on "index_data" ("name");`) - this.addSql( - `create index "IDX_index_data_gin" on "index_data" using GIN ("data");` - ) this.addSql( `create table "index_relation" ("id" bigserial, "pivot" text not null, "parent_id" text not null, "parent_name" text not null, "child_id" text not null, "child_name" text not null, constraint "index_relation_pkey" primary key ("id", "pivot")) PARTITION BY LIST("pivot");` ) - this.addSql( - `create index "IDX_index_relation_child_id" on "index_relation" ("child_id");` - ) } } diff --git a/packages/modules/index/src/migrations/Migration20250127144442.ts b/packages/modules/index/src/migrations/Migration20250127144442.ts new file mode 100644 index 0000000000..659d500072 --- /dev/null +++ b/packages/modules/index/src/migrations/Migration20250127144442.ts @@ -0,0 +1,23 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20250127144442 extends Migration { + override async up(): Promise { + this.addSql( + `alter table if exists "index_data" add column if not exists "staled_at" timestamptz null;` + ) + + this.addSql( + `alter table if exists "index_relation" add column if not exists "staled_at" timestamptz null;` + ) + } + + override async down(): Promise { + this.addSql( + `alter table if exists "index_data" drop column if exists "staled_at";` + ) + + this.addSql( + `alter table if exists "index_relation" drop column if exists "staled_at";` + ) + } +} diff --git a/packages/modules/index/src/migrations/Migration20250128132404.ts b/packages/modules/index/src/migrations/Migration20250128132404.ts new file mode 100644 index 0000000000..6339eeba6e --- /dev/null +++ b/packages/modules/index/src/migrations/Migration20250128132404.ts @@ -0,0 +1,19 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20250128132404 extends Migration { + override async up(): Promise { + this.addSql( + `create table if not exists "index_sync" ("id" text not null, "entity" text not null, "last_key" text null, "created_at" timestamptz not null default now(), "updated_at" timestamptz not null default now(), "deleted_at" timestamptz null, constraint "index_sync_pkey" primary key ("id"));` + ) + this.addSql( + `CREATE INDEX IF NOT EXISTS "IDX_index_sync_deleted_at" ON "index_sync" (deleted_at) WHERE deleted_at IS NULL;` + ) + this.addSql( + `CREATE UNIQUE INDEX IF NOT EXISTS "IDX_index_sync_entity" ON "index_sync" (entity) WHERE deleted_at IS NULL;` + ) + } + + override async down(): Promise { + this.addSql(`drop table if exists "index_sync" cascade;`) + } +} diff --git a/packages/modules/index/src/models/index-data.ts b/packages/modules/index/src/models/index-data.ts index 0326e34f14..580de3d93b 100644 --- a/packages/modules/index/src/models/index-data.ts +++ b/packages/modules/index/src/models/index-data.ts @@ -1,25 +1,10 @@ import { model } from "@medusajs/framework/utils" -const IndexData = model - .define("IndexData", { - id: model.text().primaryKey(), - name: model.text().primaryKey(), - data: model.json().default({}), - }) - .indexes([ - { - name: "IDX_index_data_gin", - type: "GIN", - on: ["data"], - }, - { - name: "IDX_index_data_id", - on: ["id"], - }, - { - name: "IDX_index_data_name", - on: ["name"], - }, - ]) +const IndexData = model.define("IndexData", { + id: model.text().primaryKey(), + name: model.text().primaryKey(), + data: model.json().default({}), + staled_at: model.dateTime().nullable(), +}) export default IndexData diff --git a/packages/modules/index/src/models/index-metadata.ts b/packages/modules/index/src/models/index-metadata.ts index 9964653c9c..d6db66f580 100644 --- a/packages/modules/index/src/models/index-metadata.ts +++ b/packages/modules/index/src/models/index-metadata.ts @@ -3,7 +3,7 @@ import { IndexMetadataStatus } from "../utils/index-metadata-status" const IndexMetadata = model .define("IndexMetadata", { - id: model.id().primaryKey(), + id: model.id({ prefix: "idxmeta" }).primaryKey(), entity: model.text(), fields: model.text(), fields_hash: model.text(), diff --git a/packages/modules/index/src/models/index-relation.ts b/packages/modules/index/src/models/index-relation.ts index f163c01b7b..40cb5b8894 100644 --- a/packages/modules/index/src/models/index-relation.ts +++ b/packages/modules/index/src/models/index-relation.ts @@ -4,9 +4,10 @@ const IndexRelation = model.define("IndexRelation", { id: model.autoincrement().primaryKey(), pivot: model.text(), parent_name: model.text(), - parent_id: model.text().index("IDX_index_relation_parent_id"), + parent_id: model.text(), child_name: model.text(), - child_id: model.text().index("IDX_index_relation_child_id"), + child_id: model.text(), + staled_at: model.dateTime().nullable(), }) export default IndexRelation diff --git a/packages/modules/index/src/models/index-sync.ts b/packages/modules/index/src/models/index-sync.ts new file mode 100644 index 0000000000..0a927d04fe --- /dev/null +++ b/packages/modules/index/src/models/index-sync.ts @@ -0,0 +1,17 @@ +import { model } from "@medusajs/framework/utils" + +const IndexSync = model + .define("IndexSync", { + id: model.id({ prefix: "idxsync" }).primaryKey(), + entity: model.text(), + last_key: model.text().nullable(), + }) + .indexes([ + { + name: "IDX_index_sync_entity", + on: ["entity"], + unique: true, + }, + ]) + +export default IndexSync diff --git a/packages/modules/index/src/models/index.ts b/packages/modules/index/src/models/index.ts index d111e288a5..b4f5f696df 100644 --- a/packages/modules/index/src/models/index.ts +++ b/packages/modules/index/src/models/index.ts @@ -1,3 +1,4 @@ export { default as IndexData } from "./index-data" export { default as IndexMetadata } from "./index-metadata" export { default as IndexRelation } from "./index-relation" +export { default as IndexSync } from "./index-sync" diff --git a/packages/modules/index/src/services/data-synchronizer.ts b/packages/modules/index/src/services/data-synchronizer.ts new file mode 100644 index 0000000000..e794f57bdb --- /dev/null +++ b/packages/modules/index/src/services/data-synchronizer.ts @@ -0,0 +1,356 @@ +import { + CommonEvents, + ContainerRegistrationKeys, + groupBy, + Modules, + promiseAll, +} from "@medusajs/framework/utils" +import { + Event, + ILockingModule, + IndexTypes, + Logger, + ModulesSdkTypes, + RemoteQueryFunction, + SchemaObjectEntityRepresentation, +} from "@medusajs/types" +import { IndexMetadataStatus, Orchestrator } from "@utils" + +export class DataSynchronizer { + #container: Record + #isReady: boolean = false + #schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation + #storageProvider: IndexTypes.StorageProvider + #orchestrator!: Orchestrator + + get #query() { + return this.#container[ + ContainerRegistrationKeys.QUERY + ] as RemoteQueryFunction + } + + get #locking() { + return this.#container[Modules.LOCKING] as ILockingModule + } + + get #indexMetadataService(): ModulesSdkTypes.IMedusaInternalService { + return this.#container.indexMetadataService + } + + get #indexSyncService(): ModulesSdkTypes.IMedusaInternalService { + return this.#container.indexSyncService + } + + get #indexDataService(): ModulesSdkTypes.IMedusaInternalService { + return this.#container.indexDataService + } + + // @ts-ignore + get #indexRelationService(): ModulesSdkTypes.IMedusaInternalService { + return this.#container.indexRelationService + } + + get #logger(): Logger { + try { + return this.#container.logger + } catch (err) { + return console as unknown as Logger + } + } + + constructor(container: Record) { + this.#container = container + } + + #isReadyOrThrow() { + if (!this.#isReady) { + throw new Error( + "DataSynchronizer is not ready. Call onApplicationStart first." + ) + } + } + + onApplicationStart({ + schemaObjectRepresentation, + storageProvider, + }: { + lockDuration?: number + schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation + storageProvider: IndexTypes.StorageProvider + }) { + this.#storageProvider = storageProvider + this.#schemaObjectRepresentation = schemaObjectRepresentation + + this.#isReady = true + } + + async syncEntities( + entities: { + entity: string + fields: string + fields_hash: string + }[], + lockDuration: number = 1000 * 60 * 5 + ) { + this.#isReadyOrThrow() + const entitiesToSync = entities.map((entity) => entity.entity) + this.#orchestrator = new Orchestrator(this.#locking, entitiesToSync, { + lockDuration, + }) + await this.#orchestrator.process(this.#taskRunner.bind(this)) + } + + async removeEntities(entities: string[], staleOnly: boolean = false) { + this.#isReadyOrThrow() + + const staleCondition = staleOnly ? { staled_at: { $ne: null } } : {} + + const dataToDelete = await this.#indexDataService.list({ + ...staleCondition, + name: entities, + }) + + const toDeleteByEntity = groupBy(dataToDelete, "name") + + for (const entity of toDeleteByEntity.keys()) { + const records = toDeleteByEntity.get(entity) + const ids = records?.map( + (record: { data: { id: string } }) => record.data.id + ) + if (!ids?.length) { + continue + } + + if (this.#schemaObjectRepresentation[entity]) { + // Here we assume that some data have been deleted from from the source and we are cleaning since they are still staled in the index and we remove them from the index + + // TODO: expand storage provider interface + await (this.#storageProvider as any).onDelete({ + entity, + data: ids, + schemaEntityObjectRepresentation: + this.#schemaObjectRepresentation[entity], + }) + } else { + // Here we assume that the entity is not indexed anymore as it is not part of the schema object representation and we are cleaning the index + await promiseAll([ + this.#indexDataService.delete({ + selector: { + name: entity, + }, + }), + this.#indexRelationService.delete({ + selector: { + $or: [{ parent_id: entity }, { child_id: entity }], + }, + }), + ]) + } + } + } + + async #updatedStatus(entity: string, status: IndexMetadataStatus) { + await this.#indexMetadataService.update({ + data: { + status, + }, + selector: { + entity, + }, + }) + } + + async #taskRunner(entity: string) { + const [[lastCursor]] = await promiseAll([ + this.#indexSyncService.list( + { + entity, + }, + { + select: ["last_key"], + } + ), + this.#updatedStatus(entity, IndexMetadataStatus.PROCESSING), + this.#indexDataService.update({ + data: { + staled_at: new Date(), + }, + selector: { + name: entity, + }, + }), + ]) + + const finalAcknoledgement = await this.syncEntity({ + entityName: entity, + pagination: { + cursor: lastCursor?.last_key, + }, + ack: async (ack) => { + const promises: Promise[] = [] + + if (ack.lastCursor) { + promises.push( + this.#indexSyncService.update({ + data: { + last_key: ack.lastCursor, + }, + selector: { + entity, + }, + }) + ) + + if (!ack.done && !ack.err) { + promises.push(this.#orchestrator.renewLock(entity)) + } + } + + await promiseAll(promises) + }, + }) + + if (finalAcknoledgement.done) { + await promiseAll([ + this.#updatedStatus(entity, IndexMetadataStatus.DONE), + this.#indexSyncService.update({ + data: { + last_key: finalAcknoledgement.lastCursor, + }, + selector: { + entity, + }, + }), + this.removeEntities([entity], true), + ]) + } + + if (finalAcknoledgement.err) { + await this.#updatedStatus(entity, IndexMetadataStatus.ERROR) + } + } + + async syncEntity({ + entityName, + pagination = {}, + ack, + }: { + entityName: string + pagination?: { + cursor?: string + updated_at?: string | Date + limit?: number + batchSize?: number + } + ack: (ack: { + lastCursor: string | null + done?: boolean + err?: Error + }) => Promise + }): Promise<{ + lastCursor: string | null + done?: boolean + err?: Error + }> { + this.#isReadyOrThrow() + + const schemaEntityObjectRepresentation = this.#schemaObjectRepresentation[ + entityName + ] as SchemaObjectEntityRepresentation + + const { fields, alias, moduleConfig } = schemaEntityObjectRepresentation + const isLink = !!moduleConfig?.isLink + + const entityPrimaryKey = fields.find( + (field) => !!moduleConfig?.primaryKeys?.includes(field) + ) + + if (!entityPrimaryKey) { + // TODO: for now these are skiped + const acknoledgement = { + lastCursor: pagination.cursor ?? null, + done: true, + } + + await ack(acknoledgement) + return acknoledgement + } + + let processed = 0 + let currentCursor = pagination.cursor! + const batchSize = Math.min(pagination.batchSize ?? 100, 100) + const limit = pagination.limit ?? Infinity + let done = false + let error = null + + while (processed < limit || !done) { + const filters: Record = {} + + if (currentCursor) { + filters[entityPrimaryKey] = { $gt: currentCursor } + } + + if (pagination.updated_at) { + filters["updated_at"] = { $gt: pagination.updated_at } + } + + const { data } = await this.#query.graph({ + entity: alias, + fields: [entityPrimaryKey], + filters, + pagination: { + order: { + [entityPrimaryKey]: "asc", + }, + take: batchSize, + }, + }) + + done = !data.length + if (done) { + break + } + + const envelop: Event = { + data, + name: !isLink + ? `*.${CommonEvents.CREATED}` + : `*.${CommonEvents.ATTACHED}`, + } + + try { + await this.#storageProvider.consumeEvent( + schemaEntityObjectRepresentation + )(envelop) + currentCursor = data[data.length - 1][entityPrimaryKey] + processed += data.length + + await ack({ lastCursor: currentCursor }) + } catch (err) { + this.#logger.error( + `Index engine] sync failed for entity ${entityName}`, + err + ) + error = err + break + } + } + + let acknoledgement: { lastCursor: string; done?: boolean; err?: Error } = { + lastCursor: currentCursor, + done: true, + } + + if (error) { + acknoledgement = { + lastCursor: currentCursor, + err: error, + } + await ack(acknoledgement) + return acknoledgement + } + + await ack(acknoledgement) + return acknoledgement + } +} diff --git a/packages/modules/index/src/services/index-data.ts b/packages/modules/index/src/services/index-data.ts new file mode 100644 index 0000000000..0f9cb46abd --- /dev/null +++ b/packages/modules/index/src/services/index-data.ts @@ -0,0 +1,4 @@ +import { MedusaInternalService } from "@medusajs/framework/utils" +import { IndexData } from "@models" + +export class IndexDataService extends MedusaInternalService(IndexData) {} diff --git a/packages/modules/index/src/services/index-metadata.ts b/packages/modules/index/src/services/index-metadata.ts new file mode 100644 index 0000000000..069c406cad --- /dev/null +++ b/packages/modules/index/src/services/index-metadata.ts @@ -0,0 +1,6 @@ +import { MedusaInternalService } from "@medusajs/framework/utils" +import { IndexMetadata } from "@models" + +export class IndexMetadataService extends MedusaInternalService( + IndexMetadata +) {} diff --git a/packages/modules/index/src/services/index-module-service.ts b/packages/modules/index/src/services/index-module-service.ts index 4c48d452c5..bf399176c4 100644 --- a/packages/modules/index/src/services/index-module-service.ts +++ b/packages/modules/index/src/services/index-module-service.ts @@ -3,6 +3,7 @@ import { IEventBusModuleService, IndexTypes, InternalModuleDeclaration, + Logger, ModulesSdkTypes, RemoteQueryFunction, } from "@medusajs/framework/types" @@ -11,28 +12,30 @@ import { ContainerRegistrationKeys, Modules, ModulesSdkUtils, - simpleHash, } from "@medusajs/framework/utils" -import { IndexMetadata } from "@models" import { schemaObjectRepresentationPropertiesToOmit } from "@types" -import { buildSchemaObjectRepresentation } from "../utils/build-config" -import { defaultSchema } from "../utils/default-schema" -import { gqlSchemaToTypes } from "../utils/gql-to-types" -import { IndexMetadataStatus } from "../utils/index-metadata-status" +import { + buildSchemaObjectRepresentation, + Configuration, + defaultSchema, + gqlSchemaToTypes, +} from "@utils" +import { DataSynchronizer } from "./data-synchronizer" type InjectedDependencies = { + logger: Logger [Modules.EVENT_BUS]: IEventBusModuleService storageProviderCtr: Constructor [ContainerRegistrationKeys.QUERY]: RemoteQueryFunction storageProviderCtrOptions: unknown baseRepository: BaseRepository indexMetadataService: ModulesSdkTypes.IMedusaInternalService + indexSyncService: ModulesSdkTypes.IMedusaInternalService + dataSynchronizer: DataSynchronizer } export default class IndexModuleService - extends ModulesSdkUtils.MedusaService({ - IndexMetadata, - }) + extends ModulesSdkUtils.MedusaService({}) implements IndexTypes.IIndexService { private readonly container_: InjectedDependencies @@ -48,7 +51,25 @@ export default class IndexModuleService protected storageProvider_: IndexTypes.StorageProvider - private indexMetadataService_: ModulesSdkTypes.IMedusaInternalService + private get indexMetadataService_(): ModulesSdkTypes.IMedusaInternalService { + return this.container_.indexMetadataService + } + + private get indexSyncService_(): ModulesSdkTypes.IMedusaInternalService { + return this.container_.indexSyncService + } + + private get dataSynchronizer_(): DataSynchronizer { + return this.container_.dataSynchronizer + } + + private get logger_(): Logger { + try { + return this.container_.logger + } catch (e) { + return console as unknown as Logger + } + } constructor( container: InjectedDependencies, @@ -62,7 +83,6 @@ export default class IndexModuleService const { [Modules.EVENT_BUS]: eventBusModuleService, - indexMetadataService, storageProviderCtr, storageProviderCtrOptions, } = container @@ -70,8 +90,6 @@ export default class IndexModuleService this.eventBusModuleService_ = eventBusModuleService this.storageProviderCtr_ = storageProviderCtr this.storageProviderCtrOptions_ = storageProviderCtrOptions - this.indexMetadataService_ = indexMetadataService - if (!this.eventBusModuleService_) { throw new Error( "EventBusModuleService is required for the IndexModule to work" @@ -106,124 +124,24 @@ export default class IndexModuleService await gqlSchemaToTypes(this.moduleOptions_.schema ?? defaultSchema) - const fullSyncRequired = await this.syncIndexConfig() - if (fullSyncRequired.length > 0) { - await this.syncEntities(fullSyncRequired) + this.dataSynchronizer_.onApplicationStart({ + schemaObjectRepresentation: this.schemaObjectRepresentation_, + storageProvider: this.storageProvider_, + }) + + const configurationChecker = new Configuration({ + schemaObjectRepresentation: this.schemaObjectRepresentation_, + indexMetadataService: this.indexMetadataService_, + indexSyncService: this.indexSyncService_, + dataSynchronizer: this.dataSynchronizer_, + }) + const entitiesMetadataChanged = await configurationChecker.checkChanges() + + if (entitiesMetadataChanged.length) { + await this.dataSynchronizer_.syncEntities(entitiesMetadataChanged) } } catch (e) { - console.log(e) - } - } - - private async syncIndexConfig() { - const schemaObjectRepresentation = (this.schemaObjectRepresentation_ ?? - {}) as IndexTypes.SchemaObjectRepresentation - - const currentConfig = await this.indexMetadataService_.list() - const currentConfigMap = new Map( - currentConfig.map((c) => [c.entity, c] as const) - ) - - type modifiedConfig = { - id?: string - entity: string - fields: string[] - fields_hash: string - status?: IndexMetadataStatus - }[] - const entityPresent = new Set() - const newConfig: modifiedConfig = [] - const updatedConfig: modifiedConfig = [] - const deletedConfig: { entity: string }[] = [] - - for (const [entityName, schemaEntityObjectRepresentation] of Object.entries( - schemaObjectRepresentation - )) { - if (schemaObjectRepresentationPropertiesToOmit.includes(entityName)) { - continue - } - - const entity = schemaEntityObjectRepresentation.entity - const fields = schemaEntityObjectRepresentation.fields.sort().join(",") - const fields_hash = simpleHash(fields) - - const existingEntityConfig = currentConfigMap.get(entity) - - entityPresent.add(entity) - if ( - !existingEntityConfig || - existingEntityConfig.fields_hash !== fields_hash - ) { - const meta = { - id: existingEntityConfig?.id, - entity, - fields, - fields_hash, - } - - if (!existingEntityConfig) { - newConfig.push(meta) - } else { - updatedConfig.push({ - ...meta, - status: IndexMetadataStatus.PENDING, - }) - } - } - } - - for (const [entity] of currentConfigMap) { - if (!entityPresent.has(entity)) { - deletedConfig.push({ entity }) - } - } - - if (newConfig.length > 0) { - await this.indexMetadataService_.create(newConfig) - } - if (updatedConfig.length > 0) { - await this.indexMetadataService_.update(updatedConfig) - } - if (deletedConfig.length > 0) { - await this.indexMetadataService_.delete(deletedConfig) - } - - return await this.indexMetadataService_.list({ - status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING], - }) - } - - private async syncEntities( - entities: { - entity: string - fields: string[] - fields_hash: string - }[] - ) { - const updatedStatus = async ( - entity: string, - status: IndexMetadataStatus - ) => { - await this.indexMetadataService_.update({ - data: { - status, - }, - selector: { - entity, - }, - }) - } - - for (const entity of entities) { - await updatedStatus(entity.entity, IndexMetadataStatus.PROCESSING) - - try { - // await this.syncEntity(entity) - - await updatedStatus(entity.entity, IndexMetadataStatus.DONE) - } catch (e) { - await updatedStatus(entity.entity, IndexMetadataStatus.ERROR) - } + this.logger_.error(e) } } diff --git a/packages/modules/index/src/services/index-relation.ts b/packages/modules/index/src/services/index-relation.ts new file mode 100644 index 0000000000..1ee0176e3c --- /dev/null +++ b/packages/modules/index/src/services/index-relation.ts @@ -0,0 +1,6 @@ +import { MedusaInternalService } from "@medusajs/framework/utils" +import { IndexRelation } from "@models" + +export class IndexRelationService extends MedusaInternalService( + IndexRelation +) {} diff --git a/packages/modules/index/src/services/index-sync.ts b/packages/modules/index/src/services/index-sync.ts new file mode 100644 index 0000000000..f839a4e710 --- /dev/null +++ b/packages/modules/index/src/services/index-sync.ts @@ -0,0 +1,4 @@ +import { MedusaInternalService } from "@medusajs/framework/utils" +import { IndexSync } from "@models" + +export class IndexSyncService extends MedusaInternalService(IndexSync) {} diff --git a/packages/modules/index/src/services/postgres-provider.ts b/packages/modules/index/src/services/postgres-provider.ts index 684db9928c..4b689f65ba 100644 --- a/packages/modules/index/src/services/postgres-provider.ts +++ b/packages/modules/index/src/services/postgres-provider.ts @@ -208,14 +208,13 @@ export class PostgresProvider implements IndexTypes.StorageProvider { } const { fields, alias } = schemaEntityObjectRepresentation - const graphResult = await this.query_.graph({ + const { data: entityData } = await this.query_.graph({ entity: alias, filters: { id: ids, }, fields: [...new Set(["id", ...fields])], }) - const { data: entityData } = graphResult const argument = { entity: schemaEntityObjectRepresentation.entity, @@ -327,9 +326,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { * @protected */ @InjectTransactionManager() - protected async onCreate< - TData extends { id: string; [key: string]: unknown } - >( + async onCreate( { entity, data, @@ -339,13 +336,11 @@ export class PostgresProvider implements IndexTypes.StorageProvider { data: TData | TData[] schemaEntityObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation }, - @MedusaContext() sharedContext: Context = {} + @MedusaContext() sharedContext: Context = {} ) { - const { transactionManager: em } = sharedContext as { - transactionManager: SqlEntityManager - } - const indexRepository = em.getRepository(toMikroORMEntity(IndexData)) - const indexRelationRepository: EntityRepository = em.getRepository( + const { transactionManager: em } = sharedContext + const indexRepository = em!.getRepository(toMikroORMEntity(IndexData)) + const indexRelationRepository: EntityRepository = em!.getRepository( toMikroORMEntity(IndexRelation) ) @@ -374,7 +369,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { id: cleanedEntityData.id, name: entity, data: cleanedEntityData, - // stale: false, + staled_at: null, }) /** @@ -400,7 +395,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { id: (parentData_ as any).id, name: parentEntity, data: parentData_, - // stale: false, + staled_at: null, }) await indexRelationRepository.upsert( @@ -410,7 +405,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { child_id: cleanedEntityData.id, child_name: entity, pivot: `${parentEntity}-${entity}`, - // stale: false, + staled_at: null, }, { onConflictAction: "merge", @@ -437,9 +432,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { * @protected */ @InjectTransactionManager() - protected async onUpdate< - TData extends { id: string; [key: string]: unknown } - >( + async onUpdate( { entity, data, @@ -449,12 +442,10 @@ export class PostgresProvider implements IndexTypes.StorageProvider { data: TData | TData[] schemaEntityObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation }, - @MedusaContext() sharedContext: Context = {} + @MedusaContext() sharedContext: Context = {} ) { - const { transactionManager: em } = sharedContext as { - transactionManager: SqlEntityManager - } - const indexRepository = em.getRepository(toMikroORMEntity(IndexData)) + const { transactionManager: em } = sharedContext + const indexRepository = em!.getRepository(toMikroORMEntity(IndexData)) const { data: data_, entityProperties } = PostgresProvider.parseData( data, @@ -470,7 +461,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { acc[property] = entityData[property] return acc }, {}), - // stale: false, + staled_at: null, } }) ) @@ -485,9 +476,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { * @protected */ @InjectTransactionManager() - protected async onDelete< - TData extends { id: string; [key: string]: unknown } - >( + async onDelete( { entity, data, @@ -497,13 +486,11 @@ export class PostgresProvider implements IndexTypes.StorageProvider { data: TData | TData[] schemaEntityObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation }, - @MedusaContext() sharedContext: Context = {} + @MedusaContext() sharedContext: Context = {} ) { - const { transactionManager: em } = sharedContext as { - transactionManager: SqlEntityManager - } - const indexRepository = em.getRepository(toMikroORMEntity(IndexData)) - const indexRelationRepository = em.getRepository( + const { transactionManager: em } = sharedContext + const indexRepository = em!.getRepository(toMikroORMEntity(IndexData)) + const indexRelationRepository = em!.getRepository( toMikroORMEntity(IndexRelation) ) @@ -541,9 +528,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { * @protected */ @InjectTransactionManager() - protected async onAttach< - TData extends { id: string; [key: string]: unknown } - >( + async onAttach( { entity, data, @@ -553,13 +538,11 @@ export class PostgresProvider implements IndexTypes.StorageProvider { data: TData | TData[] schemaEntityObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation }, - @MedusaContext() sharedContext: Context = {} + @MedusaContext() sharedContext: Context = {} ) { - const { transactionManager: em } = sharedContext as { - transactionManager: SqlEntityManager - } - const indexRepository = em.getRepository(toMikroORMEntity(IndexData)) - const indexRelationRepository = em.getRepository( + const { transactionManager: em } = sharedContext + const indexRepository = em!.getRepository(toMikroORMEntity(IndexData)) + const indexRelationRepository = em!.getRepository( toMikroORMEntity(IndexRelation) ) @@ -626,34 +609,54 @@ export class PostgresProvider implements IndexTypes.StorageProvider { id: cleanedEntityData.id, name: entity, data: cleanedEntityData, - // stale: false, + staled_at: null, }) /** * Create the index relation entries for the parent entity and the child entity */ - const parentIndexRelationEntry = indexRelationRepository.create({ - parent_id: entityData[parentPropertyId] as string, - parent_name: parentEntityName, - child_id: cleanedEntityData.id, - child_name: entity, - pivot: `${parentEntityName}-${entity}`, - // stale: false, - }) + await indexRelationRepository.upsert( + { + parent_id: entityData[parentPropertyId] as string, + parent_name: parentEntityName, + child_id: cleanedEntityData.id, + child_name: entity, + pivot: `${parentEntityName}-${entity}`, + staled_at: null, + }, + { + onConflictAction: "merge", + onConflictFields: [ + "pivot", + "parent_id", + "child_id", + "parent_name", + "child_name", + ], + } + ) - const childIndexRelationEntry = indexRelationRepository.create({ - parent_id: cleanedEntityData.id, - parent_name: entity, - child_id: entityData[childPropertyId] as string, - child_name: childEntityName, - pivot: `${entity}-${childEntityName}`, - // stale: false, - }) - - indexRelationRepository - .getEntityManager() - .persist([parentIndexRelationEntry, childIndexRelationEntry]) + await indexRelationRepository.upsert( + { + parent_id: cleanedEntityData.id, + parent_name: entity, + child_id: entityData[childPropertyId] as string, + child_name: childEntityName, + pivot: `${entity}-${childEntityName}`, + staled_at: null, + }, + { + onConflictAction: "merge", + onConflictFields: [ + "pivot", + "parent_id", + "child_id", + "parent_name", + "child_name", + ], + } + ) } } @@ -666,9 +669,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider { * @protected */ @InjectTransactionManager() - protected async onDetach< - TData extends { id: string; [key: string]: unknown } - >( + async onDetach( { entity, data, @@ -678,13 +679,11 @@ export class PostgresProvider implements IndexTypes.StorageProvider { data: TData | TData[] schemaEntityObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation }, - @MedusaContext() sharedContext: Context = {} + @MedusaContext() sharedContext: Context = {} ) { - const { transactionManager: em } = sharedContext as { - transactionManager: SqlEntityManager - } - const indexRepository = em.getRepository(toMikroORMEntity(IndexData)) - const indexRelationRepository = em.getRepository( + const { transactionManager: em } = sharedContext + const indexRepository = em!.getRepository(toMikroORMEntity(IndexData)) + const indexRelationRepository = em!.getRepository( toMikroORMEntity(IndexRelation) ) diff --git a/packages/modules/index/src/utils/create-partitions.ts b/packages/modules/index/src/utils/create-partitions.ts index 44f6930a2a..7fc21b6d13 100644 --- a/packages/modules/index/src/utils/create-partitions.ts +++ b/packages/modules/index/src/utils/create-partitions.ts @@ -37,6 +37,45 @@ export async function createPartitions( return } + await manager.execute(partitions.join("; ")) + + // Create indexes for each partition + const indexCreationCommands = Object.keys(schemaObjectRepresentation) + .filter( + (key) => + !schemaObjectRepresentationPropertiesToOmit.includes(key) && + schemaObjectRepresentation[key].listeners.length > 0 + ) + .map((key) => { + const cName = key.toLowerCase() + const part: string[] = [] + + part.push( + `CREATE INDEX CONCURRENTLY IF NOT EXISTS "IDX_cat_${cName}_data_gin" ON ${activeSchema}cat_${cName} USING GIN ("data" jsonb_path_ops)` + ) + + // create child id index on pivot partitions + for (const parent of schemaObjectRepresentation[key].parents) { + const pName = `${parent.ref.entity}${key}`.toLowerCase() + part.push( + `CREATE INDEX CONCURRENTLY IF NOT EXISTS "IDX_cat_pivot_${pName}_child_id" ON ${activeSchema}cat_pivot_${pName} ("child_id")` + ) + } + + return part + }) + .flat() + + // Execute index creation commands separately to avoid blocking + for (const cmd of indexCreationCommands) { + try { + await manager.execute(cmd) + } catch (error) { + // Log error but continue with other indexes + console.error(`Failed to create index: ${error.message}`) + } + } + partitions.push(`analyse ${activeSchema}index_data`) partitions.push(`analyse ${activeSchema}index_relation`) diff --git a/packages/modules/index/src/utils/default-schema.ts b/packages/modules/index/src/utils/default-schema.ts index b6c370a14b..313e996c78 100644 --- a/packages/modules/index/src/utils/default-schema.ts +++ b/packages/modules/index/src/utils/default-schema.ts @@ -5,6 +5,7 @@ export const defaultSchema = ` id: String title: String variants: [ProductVariant] + sales_channels: [SalesChannel] } type ProductVariant @Listeners(values: ["${Modules.PRODUCT}.product-variant.created", "${Modules.PRODUCT}.product-variant.updated", "${Modules.PRODUCT}.product-variant.deleted"]) { @@ -15,7 +16,13 @@ export const defaultSchema = ` } type Price @Listeners(values: ["${Modules.PRICING}.price.created", "${Modules.PRICING}.price.updated", "${Modules.PRICING}.price.deleted"]) { - amount: Int + id: String + amount: Float currency_code: String - } + } + + type SalesChannel @Listeners(values: ["${Modules.SALES_CHANNEL}.sales_channel.created", "${Modules.SALES_CHANNEL}.sales_channel.updated", "${Modules.SALES_CHANNEL}.sales_channel.deleted"]) { + id: String + is_disabled: Boolean + } ` diff --git a/packages/modules/index/src/utils/index.ts b/packages/modules/index/src/utils/index.ts index 783f01d37b..5690bf3e90 100644 --- a/packages/modules/index/src/utils/index.ts +++ b/packages/modules/index/src/utils/index.ts @@ -1,3 +1,8 @@ export * from "./query-builder" export * from "./create-partitions" export * from "./build-config" +export * from "./sync/orchestrator" +export * from "./sync/configuration" +export * from "./index-metadata-status" +export * from "./gql-to-types" +export * from "./default-schema" diff --git a/packages/modules/index/src/utils/query-builder.ts b/packages/modules/index/src/utils/query-builder.ts index b6cbec4fc9..15908f1e1c 100644 --- a/packages/modules/index/src/utils/query-builder.ts +++ b/packages/modules/index/src/utils/query-builder.ts @@ -23,6 +23,7 @@ export class QueryBuilder { private readonly selector: QueryFormat private readonly options?: QueryOptions private readonly schema: IndexTypes.SchemaObjectRepresentation + private readonly allSchemaFields: Set constructor(args: { schema: IndexTypes.SchemaObjectRepresentation @@ -37,14 +38,24 @@ export class QueryBuilder { this.options = args.options this.knex = args.knex this.structure = this.selector.select + this.allSchemaFields = new Set( + Object.values(this.schema).flatMap((entity) => entity.fields ?? []) + ) } private getStructureKeys(structure) { return Object.keys(structure ?? {}).filter((key) => key !== "entity") } - private getEntity(path): IndexTypes.SchemaPropertiesMap[0] { + private getEntity( + path, + throwWhenNotFound = true + ): IndexTypes.SchemaPropertiesMap[0] | undefined { if (!this.schema._schemaPropertiesMap[path]) { + if (!throwWhenNotFound) { + return + } + throw new Error(`Could not find entity for path: ${path}`) } @@ -52,7 +63,7 @@ export class QueryBuilder { } private getGraphQLType(path, field) { - const entity = this.getEntity(path)?.ref?.entity + const entity = this.getEntity(path)?.ref?.entity! const fieldRef = this.entityMap[entity]._fields[field] if (!fieldRef) { throw new Error(`Field ${field} not found in the entityMap.`) @@ -270,6 +281,18 @@ export class QueryBuilder { return builder } + private getShortAlias(aliasMapping, alias: string) { + aliasMapping.__aliasIndex ??= 0 + + if (aliasMapping[alias]) { + return aliasMapping[alias] + } + + aliasMapping[alias] = "t_" + aliasMapping.__aliasIndex++ + "_" + + return aliasMapping[alias] + } + private buildQueryParts( structure: Select, parentAlias: string, @@ -281,10 +304,17 @@ export class QueryBuilder { ): string[] { const currentAliasPath = [...aliasPath, parentProperty].join(".") - const entities = this.getEntity(currentAliasPath) + const isSelectableField = this.allSchemaFields.has(parentProperty) + const entities = this.getEntity(currentAliasPath, false) + if (isSelectableField || !entities) { + // We are currently selecting a specific field of the parent entity or the entity is not found on the index schema + // We don't need to build the query parts for this as there is no join + return [] + } const mainEntity = entities.ref.entity - const mainAlias = mainEntity.toLowerCase() + level + const mainAlias = + this.getShortAlias(aliasMapping, mainEntity.toLowerCase()) + level const allEntities: any[] = [] if (!entities.shortCutOf) { @@ -298,7 +328,13 @@ export class QueryBuilder { const intermediateAlias = entities.shortCutOf.split(".") for (let i = intermediateAlias.length - 1, x = 0; i >= 0; i--, x++) { - const intermediateEntity = this.getEntity(intermediateAlias.join(".")) + const intermediateEntity = this.getEntity( + intermediateAlias.join("."), + false + ) + if (!intermediateEntity) { + break + } intermediateAlias.pop() @@ -308,14 +344,24 @@ export class QueryBuilder { const parentIntermediateEntity = this.getEntity( intermediateAlias.join(".") - ) + )! const alias = - intermediateEntity.ref.entity.toLowerCase() + level + "_" + x + this.getShortAlias( + aliasMapping, + intermediateEntity.ref.entity.toLowerCase() + ) + + level + + "_" + + x + const parAlias = parentIntermediateEntity.ref.entity === parentEntity ? parentAlias - : parentIntermediateEntity.ref.entity.toLowerCase() + + : this.getShortAlias( + aliasMapping, + parentIntermediateEntity.ref.entity.toLowerCase() + ) + level + "_" + (x + 1) @@ -335,62 +381,68 @@ export class QueryBuilder { let queryParts: string[] = [] for (const join of allEntities) { + const joinBuilder = this.knex.queryBuilder() const { alias, entity, parEntity, parAlias } = join aliasMapping[currentAliasPath] = alias if (level > 0) { - const subQuery = this.knex.queryBuilder() - const knex = this.knex - subQuery - .select(`${alias}.id`, `${alias}.data`) - .from("index_data AS " + alias) - .join(`index_relation AS ${alias}_ref`, function () { - this.on( - `${alias}_ref.pivot`, - "=", - knex.raw("?", [`${parEntity}-${entity}`]) - ) - .andOn(`${alias}_ref.parent_id`, "=", `${parAlias}.id`) - .andOn(`${alias}.id`, "=", `${alias}_ref.child_id`) - }) - .where(`${alias}.name`, "=", knex.raw("?", [entity])) + const cName = entity.toLowerCase() + const pName = `${parEntity}${entity}`.toLowerCase() + + let joinTable = `cat_${cName} AS ${alias}` + + const pivotTable = `cat_pivot_${pName}` + joinBuilder.leftJoin( + `${pivotTable} AS ${alias}_ref`, + `${alias}_ref.parent_id`, + `${parAlias}.id` + ) + joinBuilder.leftJoin(joinTable, `${alias}.id`, `${alias}_ref.child_id`) const joinWhere = this.selector.joinWhere ?? {} const joinKey = Object.keys(joinWhere).find((key) => { const k = key.split(".") k.pop() - return k.join(".") === currentAliasPath + const curPath = k.join(".") + if (curPath === currentAliasPath) { + const relEntity = this.getEntity(curPath, false) + return relEntity?.ref?.entity === entity + } + + return false }) if (joinKey) { this.parseWhere( aliasMapping, { [joinKey]: joinWhere[joinKey] }, - subQuery + joinBuilder ) } - queryParts.push(`LEFT JOIN LATERAL ( - ${subQuery.toQuery()} - ) ${alias} ON TRUE`) + queryParts.push( + joinBuilder.toQuery().replace("select * ", "").replace("where", "and") + ) } } const children = this.getStructureKeys(structure) for (const child of children) { const childStructure = structure[child] as Select - queryParts = queryParts.concat( - this.buildQueryParts( - childStructure, - mainAlias, - mainEntity, - child, - aliasPath.concat(parentProperty), - level + 1, - aliasMapping + queryParts = queryParts + .concat( + this.buildQueryParts( + childStructure, + mainAlias, + mainEntity, + child, + aliasPath.concat(parentProperty), + level + 1, + aliasMapping + ) ) - ) + .filter(Boolean) } return queryParts @@ -404,7 +456,26 @@ export class QueryBuilder { selectParts: object = {} ): object { const currentAliasPath = [...aliasPath, parentProperty].join(".") + + const isSelectableField = this.allSchemaFields.has(parentProperty) + if (isSelectableField) { + // We are currently selecting a specific field of the parent entity + // Let's remove the parent alias from the select parts to not select everything entirely + // and add the specific field to the select parts + const parentAliasPath = aliasPath.join(".") + const alias = aliasMapping[parentAliasPath] + delete selectParts[parentAliasPath] + selectParts[currentAliasPath] = this.knex.raw( + `${alias}.data->'${parentProperty}'` + ) + return selectParts + } + const alias = aliasMapping[currentAliasPath] + // If the entity is not found in the schema (not indexed), we don't need to build the select parts + if (!alias) { + return selectParts + } selectParts[currentAliasPath] = `${alias}.data` selectParts[currentAliasPath + ".id"] = `${alias}.id` @@ -473,7 +544,8 @@ export class QueryBuilder { const rootKey = this.getStructureKeys(structure)[0] const rootStructure = structure[rootKey] as Select - const entity = this.getEntity(rootKey).ref.entity + + const entity = this.getEntity(rootKey)!.ref.entity const rootEntity = entity.toLowerCase() const aliasMapping: { [path: string]: string } = {} @@ -494,20 +566,23 @@ export class QueryBuilder { if (countAllResults) { selectParts["offset_"] = this.knex.raw( - `DENSE_RANK() OVER (ORDER BY ${rootEntity}0.id)` + `DENSE_RANK() OVER (ORDER BY ${this.getShortAlias( + aliasMapping, + rootEntity + )}.id)` ) } queryBuilder.select(selectParts) - queryBuilder.from(`index_data AS ${rootEntity}0`) + queryBuilder.from( + `cat_${rootEntity} AS ${this.getShortAlias(aliasMapping, rootEntity)}` + ) joinParts.forEach((joinPart) => { queryBuilder.joinRaw(joinPart) }) - queryBuilder.where(`${aliasMapping[rootEntity]}.name`, "=", entity) - // WHERE clause this.parseWhere(aliasMapping, filter, queryBuilder) @@ -559,6 +634,11 @@ export class QueryBuilder { const initializeMaps = (structure: Select, path: string[]) => { const currentPath = path.join(".") + const entity = this.getEntity(currentPath, false) + if (!entity) { + return + } + maps[currentPath] = {} if (path.length > 1) { @@ -566,9 +646,19 @@ export class QueryBuilder { const parents = path.slice(0, -1) const parentPath = parents.join(".") - isListMap[currentPath] = !!this.getEntity(currentPath).ref.parents.find( - (p) => p.targetProp === property - )?.isList + // In the case of specific selection + // We dont need to check if the property is a list + const isSelectableField = this.allSchemaFields.has(property) + if (isSelectableField) { + pathDetails[currentPath] = { property, parents, parentPath } + isListMap[currentPath] = false + return + } + + isListMap[currentPath] = !!this.getEntity( + currentPath, + false + )?.ref?.parents?.find((p) => p.targetProp === property)?.isList pathDetails[currentPath] = { property, parents, parentPath } } @@ -595,6 +685,20 @@ export class QueryBuilder { return key + id } + const columnMap = {} + const columnNames = Object.keys(resultSet[0] ?? {}) + for (const property of columnNames) { + const segments = property.split(".") + const field = segments.pop() + const parent = segments.join(".") + + columnMap[parent] ??= [] + columnMap[parent].push({ + field, + property, + }) + } + resultSet.forEach((row) => { for (const path in maps) { const id = row[`${path}.id`] @@ -603,6 +707,15 @@ export class QueryBuilder { if (!pathDetails[path]) { if (!maps[path][id]) { maps[path][id] = row[path] || undefined + + // If there is an id, but no object values, it means that specific fields were selected + // so we recompose the object with all selected fields. (id will always be selected) + if (!maps[path][id] && id) { + maps[path][id] = {} + for (const column of columnMap[path]) { + maps[path][id][column.field] = row[column.property] + } + } } continue } @@ -616,6 +729,15 @@ export class QueryBuilder { maps[path][id] = row[path] || undefined + // If there is an id, but no object values, it means that specific fields were selected + // so we recompose the object with all selected fields. (id will always be selected) + if (!maps[path][id] && id) { + maps[path][id] = {} + for (const column of columnMap[path]) { + maps[path][id][column.field] = row[column.property] + } + } + const parentObj = maps[parentPath][row[`${parentPath}.id`]] if (!parentObj) { @@ -623,8 +745,8 @@ export class QueryBuilder { } const isList = isListMap[parentPath + "." + property] - if (isList) { - parentObj[property] ??= [] + if (isList && !Array.isArray(parentObj[property])) { + parentObj[property] = [] } if (maps[path][id] !== undefined) { diff --git a/packages/modules/index/src/utils/sync/configuration.ts b/packages/modules/index/src/utils/sync/configuration.ts new file mode 100644 index 0000000000..6cda0a1d46 --- /dev/null +++ b/packages/modules/index/src/utils/sync/configuration.ts @@ -0,0 +1,142 @@ +import { simpleHash } from "@medusajs/framework/utils" +import { IndexTypes, InferEntityType } from "@medusajs/types" +import { IndexMetadata } from "@models" +import { schemaObjectRepresentationPropertiesToOmit } from "@types" +import { DataSynchronizer } from "../../services/data-synchronizer" +import { IndexMetadataService } from "../../services/index-metadata" +import { IndexSyncService } from "../../services/index-sync" +import { IndexMetadataStatus } from "../index-metadata-status" + +export class Configuration { + #schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation + #indexMetadataService: IndexMetadataService + #indexSyncService: IndexSyncService + #dataSynchronizer: DataSynchronizer + + constructor({ + schemaObjectRepresentation, + indexMetadataService, + indexSyncService, + dataSynchronizer, + }: { + schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation + indexMetadataService: IndexMetadataService + indexSyncService: IndexSyncService + dataSynchronizer: DataSynchronizer + }) { + this.#schemaObjectRepresentation = schemaObjectRepresentation ?? {} + this.#indexMetadataService = indexMetadataService + this.#indexSyncService = indexSyncService + this.#dataSynchronizer = dataSynchronizer + } + + async checkChanges(): Promise[]> { + const schemaObjectRepresentation = this.#schemaObjectRepresentation + + const currentConfig = await this.#indexMetadataService.list() + const currentConfigMap = new Map( + currentConfig.map((c) => [c.entity, c] as const) + ) + + type modifiedConfig = { + id?: string + entity: string + fields: string[] + fields_hash: string + status?: IndexMetadataStatus + }[] + + type dataSyncEntry = { + id?: string + entity: string + last_key: null + }[] + + const entityPresent = new Set() + const newConfig: modifiedConfig = [] + const updatedConfig: modifiedConfig = [] + const deletedConfig: { entity: string }[] = [] + const idxSyncData: dataSyncEntry = [] + + for (const [entityName, schemaEntityObjectRepresentation] of Object.entries( + schemaObjectRepresentation + )) { + if (schemaObjectRepresentationPropertiesToOmit.includes(entityName)) { + continue + } + + const entity = schemaEntityObjectRepresentation.entity + const fields = schemaEntityObjectRepresentation.fields.sort().join(",") + const fields_hash = simpleHash(fields) + + const existingEntityConfig = currentConfigMap.get(entity) + + entityPresent.add(entity) + if ( + !existingEntityConfig || + existingEntityConfig.fields_hash !== fields_hash + ) { + const meta = { + id: existingEntityConfig?.id, + entity, + fields, + fields_hash, + } + + if (!existingEntityConfig) { + newConfig.push(meta) + } else { + updatedConfig.push({ + ...meta, + status: IndexMetadataStatus.PENDING, + }) + } + + idxSyncData.push({ + entity, + last_key: null, + }) + } + } + + for (const [entity] of currentConfigMap) { + if (!entityPresent.has(entity)) { + deletedConfig.push({ entity }) + } + } + + if (newConfig.length > 0) { + await this.#indexMetadataService.create(newConfig) + } + if (updatedConfig.length > 0) { + await this.#indexMetadataService.update(updatedConfig) + } + + if (deletedConfig.length > 0) { + await this.#indexMetadataService.delete(deletedConfig) + await this.#dataSynchronizer.removeEntities( + deletedConfig.map((c) => c.entity) + ) + } + + if (idxSyncData.length > 0) { + if (updatedConfig.length > 0) { + const ids = await this.#indexSyncService.list({ + entity: updatedConfig.map((c) => c.entity), + }) + idxSyncData.forEach((sync) => { + const id = ids.find((i) => i.entity === sync.entity)?.id + if (id) { + sync.id = id + } + }) + } + + await this.#indexSyncService.upsert(idxSyncData) + } + + return await this.#indexMetadataService.list({ + status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING], + }) + } +} diff --git a/packages/modules/index/src/utils/sync/data-synchronizer.ts b/packages/modules/index/src/utils/sync/data-synchronizer.ts deleted file mode 100644 index 85739962d4..0000000000 --- a/packages/modules/index/src/utils/sync/data-synchronizer.ts +++ /dev/null @@ -1,137 +0,0 @@ -import { - IndexTypes, - RemoteQueryFunction, - SchemaObjectEntityRepresentation, - Event, -} from "@medusajs/framework/types" -import { CommonEvents } from "@medusajs/framework/utils" - -export class DataSynchronizer { - #storageProvider: IndexTypes.StorageProvider - #schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation - #query: RemoteQueryFunction - - constructor({ - storageProvider, - schemaObjectRepresentation, - query, - }: { - storageProvider: IndexTypes.StorageProvider - schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation - query: RemoteQueryFunction - }) { - this.#storageProvider = storageProvider - this.#schemaObjectRepresentation = schemaObjectRepresentation - this.#query = query - } - - async sync({ - entityName, - pagination = {}, - ack, - }: { - entityName: string - pagination?: { - cursor?: string - updated_at?: string | Date - limit?: number - batchSize?: number - } - ack: (ack: { - lastCursor: string | null - done?: boolean - err?: Error - }) => Promise - }) { - const schemaEntityObjectRepresentation = this.#schemaObjectRepresentation[ - entityName - ] as SchemaObjectEntityRepresentation - - const { fields, alias, moduleConfig } = schemaEntityObjectRepresentation - - const entityPrimaryKey = fields.find( - (field) => !!moduleConfig.linkableKeys?.[field] - ) - - if (!entityPrimaryKey) { - void ack({ - lastCursor: pagination.cursor ?? null, - err: new Error( - `Entity ${entityName} does not have a linkable primary key` - ), - }) - return - } - - let processed = 0 - let currentCursor = pagination.cursor! - const batchSize = pagination.batchSize ?? 1000 - const limit = pagination.limit ?? Infinity - let done = false - let error = null - - while (processed < limit || !done) { - const filters: Record = {} - - if (currentCursor) { - filters[entityPrimaryKey] = { $gt: currentCursor } - } - - if (pagination.updated_at) { - filters["updated_at"] = { $gt: pagination.updated_at } - } - - const { data } = await this.#query.graph({ - entity: alias, - fields: [entityPrimaryKey], - filters, - pagination: { - order: { - [entityPrimaryKey]: "asc", - }, - take: batchSize, - }, - }) - - done = !data.length - if (done) { - break - } - - const envelop: Event = { - data, - name: `*.${CommonEvents.CREATED}`, - } - - try { - await this.#storageProvider.consumeEvent( - schemaEntityObjectRepresentation - )(envelop) - currentCursor = data[data.length - 1][entityPrimaryKey] - processed += data.length - - void ack({ lastCursor: currentCursor }) - } catch (err) { - error = err - break - } - } - - let acknoledgement: { lastCursor: string; done?: boolean; err?: Error } = { - lastCursor: currentCursor, - done: true, - } - - if (error) { - acknoledgement = { - lastCursor: currentCursor, - err: error, - } - void ack(acknoledgement) - return acknoledgement - } - - void ack(acknoledgement) - return acknoledgement - } -} diff --git a/packages/modules/index/src/orchestrator/index.ts b/packages/modules/index/src/utils/sync/orchestrator.ts similarity index 72% rename from packages/modules/index/src/orchestrator/index.ts rename to packages/modules/index/src/utils/sync/orchestrator.ts index ce9f796695..cdc63d080f 100644 --- a/packages/modules/index/src/orchestrator/index.ts +++ b/packages/modules/index/src/utils/sync/orchestrator.ts @@ -1,4 +1,4 @@ -import { ILockingModule, IndexTypes, MedusaContainer } from "@medusajs/types" +import { ILockingModule } from "@medusajs/types" export class Orchestrator { /** @@ -29,18 +29,11 @@ export class Orchestrator { * - Lock duration is the maximum duration for which to hold the lock. * After this the lock will be removed. * - * - Task runner is the implementation function to execute a task. - * Orchestrator has no inbuilt execution logic and it relies on - * the task runner for the same. - * * The entity is provided to the taskRunner only when the orchestrator * is able to acquire a lock. */ #options: { lockDuration: number - taskRunner: ( - entity: IndexTypes.SchemaObjectEntityRepresentation - ) => Promise } /** @@ -53,7 +46,7 @@ export class Orchestrator { * while an entity is getting synced to avoid multiple processes * from syncing the same entity */ - #entities: IndexTypes.SchemaObjectEntityRepresentation[] = [] + #entities: string[] = [] /** * The current state of the orchestrator @@ -77,16 +70,13 @@ export class Orchestrator { } constructor( - container: MedusaContainer, - entities: IndexTypes.SchemaObjectEntityRepresentation[], + lockingModule: ILockingModule, + entities: string[], options: { lockDuration: number - taskRunner: ( - entity: IndexTypes.SchemaObjectEntityRepresentation - ) => Promise } ) { - this.#lockingModule = container.resolve("locking") + this.#lockingModule = lockingModule this.#entities = entities this.#options = options } @@ -106,44 +96,65 @@ export class Orchestrator { } } + /** + * Acquires or renew the lock for a given key. + */ + async renewLock(forKey: string): Promise { + return this.#acquireLock(forKey) + } + /** * Processes the entity at a given index. If there are no entities * left, the orchestrator state will be set to completed. + * + * - Task runner is the implementation function to execute a task. + * Orchestrator has no inbuilt execution logic and it relies on + * the task runner for the same. */ - async #processAtIndex(index: number) { - const entity = this.#entities[index] - if (!entity) { - this.#state = "completed" - return - } - - this.#currentIndex = index - const lockAcquired = await this.#acquireLock(entity.entity) + async #processAtIndex( + taskRunner: (entity: string) => Promise, + entity: string + ) { + const lockAcquired = await this.#acquireLock(entity) if (lockAcquired) { try { - await this.#options.taskRunner(entity) + await taskRunner(entity) } catch (error) { this.#state = "error" throw error } finally { - await this.#lockingModule.release(entity.entity, { + await this.#lockingModule.release(entity, { ownerId: this.#lockingOwner, }) } } - - return this.#processAtIndex(index + 1) } /** * Run the orchestrator to process the entities one by one. + * + * - Task runner is the implementation function to execute a task. + * Orchestrator has no inbuilt execution logic and it relies on + * the task runner for the same. */ - async process() { + async process(taskRunner: (entity: string) => Promise) { if (this.state !== "idle") { throw new Error("Cannot re-run an already running orchestrator instance") } this.#state = "processing" - return this.#processAtIndex(0) + + for (let i = 0; i < this.#entities.length; i++) { + this.#currentIndex = i + const entity = this.#entities[i] + if (!entity) { + this.#state = "completed" + break + } + + await this.#processAtIndex(taskRunner, entity) + } + + this.#state = "completed" } }