diff --git a/.changeset/weak-elephants-reply.md b/.changeset/weak-elephants-reply.md new file mode 100644 index 0000000000..546ecc8932 --- /dev/null +++ b/.changeset/weak-elephants-reply.md @@ -0,0 +1,8 @@ +--- +"@medusajs/medusa": patch +"@medusajs/index": patch +"@medusajs/framework": patch +"@medusajs/types": patch +--- + +feat(index): Add support for API end points to interact with the index module diff --git a/integration-tests/http/__tests__/index/index.spec.ts b/integration-tests/http/__tests__/index/index.spec.ts new file mode 100644 index 0000000000..773ca92042 --- /dev/null +++ b/integration-tests/http/__tests__/index/index.spec.ts @@ -0,0 +1,162 @@ +import { medusaIntegrationTestRunner } from "@medusajs/test-utils" +import { + adminHeaders, + createAdminUser, +} from "../../../helpers/create-admin-user" +import { setTimeout } from "timers/promises" + +process.env.ENABLE_INDEX_MODULE = "true" + +jest.setTimeout(300000) + +medusaIntegrationTestRunner({ + testSuite: ({ dbConnection, getContainer, api }) => { + let container + + beforeEach(async () => { + container = getContainer() + await createAdminUser(dbConnection, adminHeaders, container) + }) + + afterAll(() => { + delete process.env.ENABLE_INDEX_MODULE + }) + + describe("Admin Index API", () => { + describe("GET /admin/index/details", () => { + it("should return index information with metadata", async () => { + await setTimeout(3000) + + const response = await api.get(`/admin/index/details`, adminHeaders) + + expect(response.status).toEqual(200) + expect(response.data).toHaveProperty("metadata") + expect(response.data.metadata.length).toBe(7) + + // Verify all expected entities are present with correct structure + const entities = [ + "Product", + "ProductVariant", + "LinkProductVariantPriceSet", + "Price", + "SalesChannel", + "LinkProductSalesChannel", + "PriceSet", + ] + + entities.forEach((entityName) => { + const entityMetadata = response.data.metadata.find( + (m) => m.entity === entityName + ) + expect(entityMetadata).toBeDefined() + expect(entityMetadata).toMatchObject({ + id: expect.any(String), + entity: entityName, + status: expect.stringMatching(/^(pending|processing|done)$/), + fields: expect.any(Array), + updated_at: expect.any(String), + }) + expect(entityMetadata).toHaveProperty("last_synced_key") + }) + + // Verify specific field structures for key entities + const productMetadata = response.data.metadata.find( + (m) => m.entity === "Product" + ) + expect(productMetadata.fields).toEqual( + expect.arrayContaining(["id", "title", "handle", "status"]) + ) + + const variantMetadata = response.data.metadata.find( + (m) => m.entity === "ProductVariant" + ) + expect(variantMetadata.fields).toEqual( + expect.arrayContaining(["id", "product_id", "sku"]) + ) + }) + + describe("POST /admin/index/sync", () => { + it("should trigger sync with default strategy (continue)", async () => { + const response = await api.post( + `/admin/index/sync`, + {}, + adminHeaders + ) + + expect(response.status).toEqual(200) + }) + + it("should trigger sync with full strategy", async () => { + const response = await api.post( + `/admin/index/sync`, + { strategy: "full" }, + adminHeaders + ) + + expect(response.status).toEqual(200) + }) + + it("should trigger sync with reset strategy", async () => { + const response = await api.post( + `/admin/index/sync`, + { strategy: "reset" }, + adminHeaders + ) + + expect(response.status).toEqual(200) + }) + + it("should reject invalid strategy", async () => { + const response = await api + .post(`/admin/index/sync`, { strategy: "invalid" }, adminHeaders) + .catch((e) => e) + + expect(response.response.status).toEqual(400) + }) + + it("should sync and reflect in metadata status", async () => { + const syncResponsePromise = api.post( + `/admin/index/sync`, + { strategy: "full" }, + adminHeaders + ) + const updatedResponse = await api.get( + `/admin/index/details`, + adminHeaders + ) + + const syncResponse = await syncResponsePromise + expect(syncResponse.status).toEqual(200) + expect(updatedResponse.status).toEqual(200) + + if (updatedResponse.data.metadata.length > 0) { + const hashedMetadataWithStatusPending = + updatedResponse.data.metadata.some( + (m) => m.status === "pending" + ) + expect(hashedMetadataWithStatusPending).toBe(true) + } + }) + + it("should reset index and clear all data", async () => { + const syncResponsePromise = api.post( + `/admin/index/sync`, + { strategy: "reset" }, + adminHeaders + ) + const response = await api.get(`/admin/index/details`, adminHeaders) + + const syncResponse = await syncResponsePromise + expect(syncResponse.status).toEqual(200) + expect(response.status).toEqual(200) + + const metadata = response.data.metadata + metadata.forEach((m) => { + expect(["pending", "processing"]).toContain(m.status) + }) + }) + }) + }) + }) + }, +}) diff --git a/integration-tests/http/medusa-config.js b/integration-tests/http/medusa-config.js index 37cd6baf2d..96ad5f9a41 100644 --- a/integration-tests/http/medusa-config.js +++ b/integration-tests/http/medusa-config.js @@ -30,6 +30,9 @@ module.exports = defineConfig({ jwtSecret: "test", }, }, + featureFlags: { + index_engine: process.env.ENABLE_INDEX_MODULE === "true", + }, modules: { [Modules.FULFILLMENT]: { /** @type {import('@medusajs/fulfillment').FulfillmentModuleOptions} */ @@ -71,5 +74,9 @@ module.exports = defineConfig({ ], }, }, + [Modules.INDEX]: { + resolve: "@medusajs/index", + disable: process.env.ENABLE_INDEX_MODULE !== "true", + }, }, }) diff --git a/packages/core/framework/src/types/container.ts b/packages/core/framework/src/types/container.ts index 3ff653324b..22e13d21d9 100644 --- a/packages/core/framework/src/types/container.ts +++ b/packages/core/framework/src/types/container.ts @@ -12,6 +12,7 @@ import { IEventBusModuleService, IFileModuleService, IFulfillmentModuleService, + IIndexService, IInventoryService, ILockingModule, INotificationModuleService, @@ -78,6 +79,7 @@ declare module "@medusajs/types" { [Modules.LOCKING]: ILockingModule [Modules.SETTINGS]: ISettingsModuleService [Modules.CACHING]: ICachingModuleService + [Modules.INDEX]: IIndexService } } diff --git a/packages/core/types/src/http/index.ts b/packages/core/types/src/http/index.ts index 6be5507109..bf16412062 100644 --- a/packages/core/types/src/http/index.ts +++ b/packages/core/types/src/http/index.ts @@ -46,3 +46,4 @@ export * from "./tax-region" export * from "./user" export * from "./view-configuration" export * from "./workflow-execution" +export * from "./index/index" diff --git a/packages/core/types/src/http/index/admin/payload.ts b/packages/core/types/src/http/index/admin/payload.ts new file mode 100644 index 0000000000..49691f31e9 --- /dev/null +++ b/packages/core/types/src/http/index/admin/payload.ts @@ -0,0 +1,3 @@ +export interface AdminIndexSyncPayload { + strategy: "full" | "reset" +} diff --git a/packages/core/types/src/http/index/admin/responses.ts b/packages/core/types/src/http/index/admin/responses.ts new file mode 100644 index 0000000000..b16cf96e43 --- /dev/null +++ b/packages/core/types/src/http/index/admin/responses.ts @@ -0,0 +1,5 @@ +import { IndexInfo } from "../../../index-data" + +export interface AdminIndexDetailsResponse { + metadata: IndexInfo[] +} diff --git a/packages/core/types/src/http/index/index.ts b/packages/core/types/src/http/index/index.ts new file mode 100644 index 0000000000..23e3636689 --- /dev/null +++ b/packages/core/types/src/http/index/index.ts @@ -0,0 +1,2 @@ +export * from "./admin/payload" +export * from "./admin/responses" diff --git a/packages/core/types/src/index-data/service.ts b/packages/core/types/src/index-data/service.ts index dc505806ed..99aab48ed9 100644 --- a/packages/core/types/src/index-data/service.ts +++ b/packages/core/types/src/index-data/service.ts @@ -13,8 +13,33 @@ export interface IndexModuleOptions { schema: string } +export interface IndexInfo { + id: string + entity: string + status: "pending" | "processing" | "done" | "error" + fields: string[] + updated_at: Date + last_synced_key: string | null +} + export interface IIndexService extends IModuleService { query( config: IndexQueryConfig ): Promise> + + /** + * Sync the index + * The sync strategy can be "full" meaning it will re sync the entire index, "reset" meaning it + * will reset the index data and start from scratch, or if not specified, it will continue the + * sync from the last sync point. + * + * @param strategy The sync strategy + */ + sync({ strategy }?: { strategy?: "full" | "reset" }): Promise + + /** + * Get the index information + * @returns The index information + */ + getInfo(): Promise } diff --git a/packages/medusa/src/api/admin/index/details/route.ts b/packages/medusa/src/api/admin/index/details/route.ts new file mode 100644 index 0000000000..fed374308c --- /dev/null +++ b/packages/medusa/src/api/admin/index/details/route.ts @@ -0,0 +1,19 @@ +import { AuthenticatedMedusaRequest, MedusaResponse } from "@medusajs/framework" +import { HttpTypes } from "@medusajs/framework/types" +import { Modules } from "@medusajs/framework/utils" + +/** + * Get the index information for all entities that are indexed and their sync state + * @param req + * @param res + */ +export const GET = async ( + req: AuthenticatedMedusaRequest, + res: MedusaResponse +) => { + const indexModuleService = req.scope.resolve(Modules.INDEX) + const indexInfo = await indexModuleService.getInfo() + res.json({ + metadata: indexInfo, + }) +} diff --git a/packages/medusa/src/api/admin/index/middlewares.ts b/packages/medusa/src/api/admin/index/middlewares.ts new file mode 100644 index 0000000000..adb292f4cc --- /dev/null +++ b/packages/medusa/src/api/admin/index/middlewares.ts @@ -0,0 +1,62 @@ +import { validateAndTransformBody } from "@medusajs/framework" +import { + AuthenticatedMedusaRequest, + MedusaNextFunction, + MedusaResponse, + MiddlewareRoute, +} from "@medusajs/framework/http" +import { Logger } from "@medusajs/framework/types" +import { + ContainerRegistrationKeys, + FeatureFlag, + Modules, +} from "@medusajs/framework/utils" +import IndexEngineFeatureFlag from "../../../feature-flags/index-engine" +import { authenticate } from "../../../utils/middlewares/authenticate-middleware" +import { AdminIndexSyncPayload } from "./validator" + +const isIndexEnabledMiddleware = ( + req: AuthenticatedMedusaRequest, + res: MedusaResponse, + next: MedusaNextFunction +) => { + const indexService = req.scope.resolve(Modules.INDEX, { + allowUnregistered: true, + }) + const logger = + req.scope.resolve(ContainerRegistrationKeys.LOGGER, { + allowUnregistered: true, + }) ?? (console as unknown as Logger) + + if ( + !indexService || + !FeatureFlag.isFeatureEnabled(IndexEngineFeatureFlag.key) + ) { + logger.warn( + "Trying to access '/admin/index/*' route but the index module is not configured" + ) + return res.status(404) + } + + return next() +} + +export const adminIndexRoutesMiddlewares: MiddlewareRoute[] = [ + { + method: ["GET"], + matcher: "/admin/index/details", + middlewares: [ + authenticate("user", ["session", "bearer", "api-key"]), + isIndexEnabledMiddleware, + ], + }, + { + method: ["POST"], + matcher: "/admin/index/sync", + middlewares: [ + authenticate("user", ["session", "bearer", "api-key"]), + isIndexEnabledMiddleware, + validateAndTransformBody(AdminIndexSyncPayload), + ], + }, +] diff --git a/packages/medusa/src/api/admin/index/sync/route.ts b/packages/medusa/src/api/admin/index/sync/route.ts new file mode 100644 index 0000000000..823a476cd5 --- /dev/null +++ b/packages/medusa/src/api/admin/index/sync/route.ts @@ -0,0 +1,15 @@ +import { AuthenticatedMedusaRequest, MedusaResponse } from "@medusajs/framework" +import { HttpTypes } from "@medusajs/framework/types" +import { Modules } from "@medusajs/framework/utils" + +export const POST = async ( + req: AuthenticatedMedusaRequest, + res: MedusaResponse +) => { + const indexService = req.scope.resolve(Modules.INDEX) + const strategy = req.validatedBody.strategy + + await indexService.sync({ strategy }) + + res.send(200) +} diff --git a/packages/medusa/src/api/admin/index/validator.ts b/packages/medusa/src/api/admin/index/validator.ts new file mode 100644 index 0000000000..2395f64f35 --- /dev/null +++ b/packages/medusa/src/api/admin/index/validator.ts @@ -0,0 +1,5 @@ +import z from "zod" + +export const AdminIndexSyncPayload = z.object({ + strategy: z.enum(["full", "reset"]).optional(), +}) diff --git a/packages/medusa/src/api/middlewares.ts b/packages/medusa/src/api/middlewares.ts index d98a560304..ec8f03f310 100644 --- a/packages/medusa/src/api/middlewares.ts +++ b/packages/medusa/src/api/middlewares.ts @@ -64,6 +64,7 @@ import { storeRegionRoutesMiddlewares } from "./store/regions/middlewares" import { storeReturnReasonRoutesMiddlewares } from "./store/return-reasons/middlewares" import { storeShippingOptionRoutesMiddlewares } from "./store/shipping-options/middlewares" import { adminShippingOptionTypeRoutesMiddlewares } from "./admin/shipping-option-types/middlewares" +import { adminIndexRoutesMiddlewares } from "./admin/index/middlewares" export default defineMiddlewares([ ...storeRoutesMiddlewares, @@ -132,4 +133,5 @@ export default defineMiddlewares([ ...adminPaymentCollectionsMiddlewares, ...viewConfigurationRoutesMiddlewares, ...columnRoutesMiddlewares, + ...adminIndexRoutesMiddlewares, ]) diff --git a/packages/modules/index/integration-tests/__tests__/index-engine-module-sync.spec.ts b/packages/modules/index/integration-tests/__tests__/index-engine-module-sync.spec.ts new file mode 100644 index 0000000000..2c8ba5800e --- /dev/null +++ b/packages/modules/index/integration-tests/__tests__/index-engine-module-sync.spec.ts @@ -0,0 +1,468 @@ +import { + configLoader, + container, + logger, + MedusaAppLoader, +} from "@medusajs/framework" +import { asValue } from "@medusajs/framework/awilix" +import { EntityManager } from "@medusajs/framework/mikro-orm/postgresql" +import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk" +import { IndexTypes } from "@medusajs/framework/types" +import { + ContainerRegistrationKeys, + Modules, + toMikroORMEntity, +} from "@medusajs/framework/utils" +import { initDb, TestDatabaseUtils } from "@medusajs/test-utils" +import { IndexData, IndexRelation, IndexMetadata, IndexSync } from "@models" +import { IndexMetadataStatus } from "@utils" +import * as path from "path" +import { setTimeout } from "timers/promises" +import { EventBusServiceMock } from "../__fixtures__" +import { dbName } from "../__fixtures__/medusa-config" + +const eventBusMock = new EventBusServiceMock() +const queryMock = { + graph: jest.fn(), +} + +const productId = "prod_1" +const productId2 = "prod_2" +const variantId = "var_1" +const variantId2 = "var_2" +const priceSetId = "price_set_1" +const priceId = "money_amount_1" +const linkId = "link_id_1" + +const dbUtils = TestDatabaseUtils.dbTestUtilFactory() + +jest.setTimeout(300000) + +let medusaAppLoader!: MedusaAppLoader +let index!: IndexTypes.IIndexService + +const beforeAll_ = async ({ + clearDatabase = true, +}: { clearDatabase?: boolean } = {}) => { + try { + const config = await configLoader( + path.join(__dirname, "./../__fixtures__"), + "medusa-config" + ) + + console.log(`Creating database ${dbName}`) + await dbUtils.create(dbName) + dbUtils.pgConnection_ = await initDb() + + container.register({ + [ContainerRegistrationKeys.LOGGER]: asValue(logger), + [ContainerRegistrationKeys.QUERY]: asValue(null), + [ContainerRegistrationKeys.PG_CONNECTION]: asValue(dbUtils.pgConnection_), + }) + + medusaAppLoader = new MedusaAppLoader(container as any) + + // Migrations + await medusaAppLoader.runModulesMigrations() + const linkPlanner = await medusaAppLoader.getLinksExecutionPlanner() + const plan = await linkPlanner.createPlan() + await linkPlanner.executePlan(plan) + + // Clear partially loaded instances + MedusaModule.clearInstances() + + // Bootstrap modules + const globalApp = await medusaAppLoader.load() + + index = container.resolve(Modules.INDEX) + + // Mock event bus the index module + ;(index as any).eventBusModuleService_ = eventBusMock + + await globalApp.onApplicationStart() + await setTimeout(3000) + ;(index as any).storageProvider_.query_ = queryMock + + if (clearDatabase) { + await afterEach_() + } + return globalApp + } catch (error) { + console.error("Error initializing", error?.message) + throw error + } +} + +const beforeEach_ = async () => { + jest.clearAllMocks() + + try { + await medusaAppLoader.runModulesLoader() + } catch (error) { + console.error("Error runner modules loaders", error?.message) + throw error + } +} + +const afterEach_ = async () => { + try { + await dbUtils.teardown({ schema: "public" }) + } catch (error) { + console.error("Error tearing down database:", error?.message) + throw error + } +} + +describe("sync management API", function () { + describe("server mode", function () { + let medusaApp: MedusaAppOutput + let onApplicationPrepareShutdown!: () => Promise + let onApplicationShutdown!: () => Promise + + beforeAll(async () => { + process.env.MEDUSA_WORKER_MODE = "server" + medusaApp = await beforeAll_() + onApplicationPrepareShutdown = medusaApp.onApplicationPrepareShutdown + onApplicationShutdown = medusaApp.onApplicationShutdown + }) + + afterAll(async () => { + if (onApplicationPrepareShutdown) { + await onApplicationPrepareShutdown() + } + if (onApplicationShutdown) { + await onApplicationShutdown() + } + await dbUtils.shutdown(dbName) + }) + + afterEach(afterEach_) + + let manager: EntityManager + + beforeEach(async () => { + await beforeEach_() + + manager = (medusaApp.sharedContainer!.resolve(Modules.INDEX) as any) + .container_.manager as EntityManager + }) + + afterEach(afterEach_) + + describe("getInfo", function () { + it("should return detailed index metadata with last synced keys", async () => { + const indexMetadataRepo = manager.getRepository( + toMikroORMEntity(IndexMetadata) + ) + const indexSyncRepo = manager.getRepository(toMikroORMEntity(IndexSync)) + + await indexMetadataRepo.upsertMany([ + { + id: "metadata_1", + entity: "product", + status: IndexMetadataStatus.DONE, + fields: ["id", "title"].sort().join(","), + fields_hash: "hash_1", + }, + { + id: "metadata_2", + entity: "product_variant", + status: IndexMetadataStatus.PENDING, + fields: ["id", "sku"].sort().join(","), + fields_hash: "hash_2", + }, + ]) + + await indexSyncRepo.upsertMany([ + { + id: "sync_1", + entity: "product", + last_key: "prod_123", + }, + { + id: "sync_2", + entity: "product_variant", + last_key: null, + }, + ]) + + const info = await index.getInfo() + + expect(info).toHaveLength(2) + expect(info).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + id: "metadata_1", + entity: "product", + status: IndexMetadataStatus.DONE, + fields: ["id", "title"], + last_synced_key: "prod_123", + }), + expect.objectContaining({ + id: "metadata_2", + entity: "product_variant", + status: IndexMetadataStatus.PENDING, + fields: ["id", "sku"], + last_synced_key: null, + }), + ]) + ) + }) + + it("should return empty array when no metadata exists", async () => { + const info = await index.getInfo() + expect(info).toBeDefined() + expect(info).toHaveLength(0) + }) + + it("should handle entities without sync records", async () => { + const indexMetadataRepo = manager.getRepository( + toMikroORMEntity(IndexMetadata) + ) + + await indexMetadataRepo.upsertMany([ + { + id: "metadata_test_1", + entity: "test_product", + status: IndexMetadataStatus.DONE, + fields: "id", + fields_hash: "hash_1", + }, + ]) + + const info = await index.getInfo() + + expect(info).toBeDefined() + expect(info).toHaveLength(1) + expect(info[0]).toMatchObject({ + entity: "test_product", + last_synced_key: null, + }) + }) + }) + + describe("sync with continue strategy", function () { + it("should emit continue-sync event in server mode", async () => { + jest.spyOn(eventBusMock, "emit") + + await index.sync({}) + + expect(eventBusMock.emit).toHaveBeenCalledWith( + expect.objectContaining({ + name: "index.continue-sync", + data: {}, + options: { internal: true }, + }) + ) + }) + }) + + describe("sync with full strategy", function () { + it("should reset metadata statuses and last_key, then emit event", async () => { + const indexMetadataRepo = manager.getRepository( + toMikroORMEntity(IndexMetadata) + ) + const indexSyncRepo = manager.getRepository(toMikroORMEntity(IndexSync)) + + await indexMetadataRepo.upsertMany([ + { + id: "test_metadata_1", + entity: "test_product_full", + status: IndexMetadataStatus.DONE, + fields: "id", + fields_hash: "hash_1", + }, + { + id: "test_metadata_2", + entity: "test_variant_full", + status: IndexMetadataStatus.ERROR, + fields: "id", + fields_hash: "hash_2", + }, + { + id: "test_metadata_3", + entity: "test_price_full", + status: IndexMetadataStatus.PROCESSING, + fields: "id", + fields_hash: "hash_3", + }, + ]) + + await indexSyncRepo.upsertMany([ + { + id: "test_sync_1", + entity: "test_product_full", + last_key: "prod_123", + }, + { + id: "test_sync_2", + entity: "test_variant_full", + last_key: "var_456", + }, + ]) + + jest.spyOn(eventBusMock, "emit") + + await index.sync({ strategy: "full" }) + + const testMetadata = (await indexMetadataRepo + .getEntityManager() + .fork() + .find(toMikroORMEntity(IndexMetadata), {})) as IndexMetadata[] + + expect(testMetadata).toHaveLength(3) + testMetadata.forEach((metadata) => { + expect(metadata.status).toBe(IndexMetadataStatus.PENDING) + }) + + const testSync = (await indexSyncRepo + .getEntityManager() + .fork() + .find(toMikroORMEntity(IndexSync), {})) as IndexSync[] + + testSync.forEach((sync) => { + expect(sync.last_key).toBeNull() + }) + + expect(eventBusMock.emit).toHaveBeenCalledWith( + expect.objectContaining({ + name: "index.full-sync", + data: {}, + options: { internal: true }, + }) + ) + }) + + it("should not reset PENDING status metadata", async () => { + const indexMetadataRepo = manager.getRepository( + toMikroORMEntity(IndexMetadata) + ) + + await indexMetadataRepo.upsertMany([ + { + id: "test_pending_metadata", + entity: "test_product_pending", + status: IndexMetadataStatus.PENDING, + fields: "id", + fields_hash: "hash_1", + }, + ]) + + await index.sync({ strategy: "full" }) + + const updatedMetadata = (await indexMetadataRepo + .getEntityManager() + .fork() + .find(toMikroORMEntity(IndexMetadata), {})) as IndexMetadata[] + + expect(updatedMetadata[0].status).toBe(IndexMetadataStatus.PENDING) + }) + }) + + describe("sync with reset strategy", function () { + it("should truncate all index tables and emit event", async () => { + const indexDataRepo = manager.getRepository(toMikroORMEntity(IndexData)) + const indexRelationRepo = manager.getRepository( + toMikroORMEntity(IndexRelation) + ) + const indexMetadataRepo = manager.getRepository( + toMikroORMEntity(IndexMetadata) + ) + const indexSyncRepo = manager.getRepository(toMikroORMEntity(IndexSync)) + + await indexDataRepo.upsertMany([ + { + id: productId, + name: "Product", + data: { id: productId }, + }, + ]) + + await indexMetadataRepo.upsertMany([ + { + id: "metadata_1", + entity: "product", + status: IndexMetadataStatus.DONE, + fields: ["id"], + fields_hash: "hash_1", + }, + ]) + + await indexSyncRepo.upsertMany([ + { + id: "sync_1", + entity: "product", + last_key: "prod_123", + }, + ]) + + jest.spyOn(eventBusMock, "emit") + + await index.sync({ strategy: "reset" }) + + const indexData = await indexDataRepo + .getEntityManager() + .fork() + .find(toMikroORMEntity(IndexData), {}) + const indexRelations = await indexRelationRepo + .getEntityManager() + .fork() + .find(toMikroORMEntity(IndexRelation), {}) + const indexMetadata = await indexMetadataRepo + .getEntityManager() + .fork() + .find(toMikroORMEntity(IndexMetadata), {}) + const indexSync = await indexSyncRepo + .getEntityManager() + .fork() + .find(toMikroORMEntity(IndexSync), {}) + + expect(indexData).toHaveLength(0) + expect(indexRelations).toHaveLength(0) + expect(indexMetadata).toHaveLength(0) + expect(indexSync).toHaveLength(0) + + expect(eventBusMock.emit).toHaveBeenCalledWith( + expect.objectContaining({ + name: "index.reset-sync", + data: {}, + options: { internal: true }, + }) + ) + }) + + it("should handle empty tables gracefully", async () => { + await expect(index.sync({ strategy: "reset" })).resolves.not.toThrow() + + const indexData = await manager.find(toMikroORMEntity(IndexData), {}) + expect(indexData).toHaveLength(0) + }) + }) + + describe("sync strategy parameter validation", function () { + it("should default to continue sync when no strategy provided", async () => { + jest.spyOn(eventBusMock, "emit") + + await index.sync({}) + + expect(eventBusMock.emit).toHaveBeenCalledWith( + expect.objectContaining({ + name: "index.continue-sync", + }) + ) + }) + + it("should handle undefined strategy", async () => { + jest.spyOn(eventBusMock, "emit") + + await index.sync({ strategy: undefined }) + + expect(eventBusMock.emit).toHaveBeenCalledWith( + expect.objectContaining({ + name: "index.continue-sync", + }) + ) + }) + }) + }) +}) diff --git a/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts b/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts index cde8232dbf..425c4baa9a 100644 --- a/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/index-engine-module.spec.ts @@ -4,6 +4,8 @@ import { logger, MedusaAppLoader, } from "@medusajs/framework" +import { asValue } from "@medusajs/framework/awilix" +import { EntityManager } from "@medusajs/framework/mikro-orm/postgresql" import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk" import { EventBusTypes, IndexTypes } from "@medusajs/framework/types" import { @@ -12,9 +14,7 @@ import { toMikroORMEntity, } from "@medusajs/framework/utils" import { initDb, TestDatabaseUtils } from "@medusajs/test-utils" -import { EntityManager } from "@medusajs/framework/mikro-orm/postgresql" import { IndexData, IndexRelation } from "@models" -import { asValue } from "@medusajs/framework/awilix" import * as path from "path" import { setTimeout } from "timers/promises" import { EventBusServiceMock } from "../__fixtures__" diff --git a/packages/modules/index/package.json b/packages/modules/index/package.json index d09329ad28..6a6608e790 100644 --- a/packages/modules/index/package.json +++ b/packages/modules/index/package.json @@ -29,7 +29,7 @@ "resolve:aliases": "tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && tsc-alias -p tsconfig.resolved.json && rimraf tsconfig.resolved.json", "build": "rimraf dist && tsc --build && npm run resolve:aliases", "test": "jest --passWithNoTests ./src", - "test:integration": "jest --forceExit -- integration-tests/__tests__/**/*.ts", + "test:integration": "jest --forceExit -- integration-tests/__tests__/**/*.spec.ts", "migration:initial": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create --initial", "migration:create": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:create", "migration:up": "MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts MIKRO_ORM_ALLOW_GLOBAL_CLI=true medusa-mikro-orm migration:up", diff --git a/packages/modules/index/src/services/index-module-service.ts b/packages/modules/index/src/services/index-module-service.ts index 17252c2bd2..13befd23dc 100644 --- a/packages/modules/index/src/services/index-module-service.ts +++ b/packages/modules/index/src/services/index-module-service.ts @@ -1,7 +1,12 @@ +import { SqlEntityManager } from "@medusajs/framework/mikro-orm/postgresql" import { Constructor, + Context, + FilterQuery, + FindConfig, IEventBusModuleService, IndexTypes, + InferEntityType, InternalModuleDeclaration, Logger, ModulesSdkTypes, @@ -11,15 +16,21 @@ import { MikroOrmBaseRepository as BaseRepository, ContainerRegistrationKeys, GraphQLUtils, + InjectManager, + MedusaContext, Modules, ModulesSdkUtils, + promiseAll, + toMikroORMEntity, } from "@medusajs/framework/utils" +import { IndexData, IndexMetadata, IndexRelation, IndexSync } from "@models" import { schemaObjectRepresentationPropertiesToOmit } from "@types" import { buildSchemaObjectRepresentation, Configuration, defaultSchema, gqlSchemaToTypes, + IndexMetadataStatus, } from "@utils" import { baseGraphqlSchema } from "../utils/base-graphql-schema" import { DataSynchronizer } from "./data-synchronizer" @@ -42,6 +53,17 @@ export default class IndexModuleService { #isWorkerMode: boolean = false + private static readonly SyncSubscribersDescriptor = { + continueSync: { + eventName: "index.continue-sync", + methodName: "continueSync", + }, + fullSync: { eventName: "index.full-sync", methodName: "fullSync" }, + resetSync: { eventName: "index.reset-sync", methodName: "resetSync" }, + } as const + + private readonly baseRepository_: BaseRepository + private readonly container_: InjectedDependencies private readonly moduleOptions_: IndexTypes.IndexModuleOptions @@ -55,6 +77,8 @@ export default class IndexModuleService protected storageProvider_: IndexTypes.StorageProvider + private configurationChecker_: Configuration + private get indexMetadataService_(): ModulesSdkTypes.IMedusaInternalService { return this.container_.indexMetadataService } @@ -77,12 +101,15 @@ export default class IndexModuleService constructor( container: InjectedDependencies, + moduleOptions: IndexTypes.IndexModuleOptions, protected readonly moduleDeclaration: InternalModuleDeclaration ) { super(...arguments) + this.baseRepository_ = container.baseRepository this.container_ = container - this.moduleOptions_ = (moduleDeclaration.options ?? + this.moduleOptions_ = (moduleOptions ?? + moduleDeclaration.options ?? moduleDeclaration) as unknown as IndexTypes.IndexModuleOptions this.#isWorkerMode = moduleDeclaration.worker_mode !== "server" @@ -140,7 +167,7 @@ export default class IndexModuleService storageProvider: this.storageProvider_, }) - const configurationChecker = new Configuration({ + this.configurationChecker_ = new Configuration({ logger: this.logger_, schemaObjectRepresentation: this.schemaObjectRepresentation_, indexMetadataService: this.indexMetadataService_, @@ -148,7 +175,7 @@ export default class IndexModuleService dataSynchronizer: this.dataSynchronizer_, }) const entitiesMetadataChanged = - await configurationChecker.checkChanges() + await this.configurationChecker_.checkChanges() if (entitiesMetadataChanged.length) { await this.dataSynchronizer_.syncEntities(entitiesMetadataChanged) @@ -166,9 +193,14 @@ export default class IndexModuleService } protected registerListeners() { + if (!this.#isWorkerMode) { + return + } + const schemaObjectRepresentation = (this.schemaObjectRepresentation_ ?? {}) as IndexTypes.SchemaObjectRepresentation + // Register entity event listeners for (const [entityName, schemaEntityObjectRepresentation] of Object.entries( schemaObjectRepresentation )) { @@ -185,6 +217,16 @@ export default class IndexModuleService ) }) } + + // Register sync subscribers + for (const { eventName, methodName } of Object.values( + IndexModuleService.SyncSubscribersDescriptor + )) { + this.eventBusModuleService_.subscribe( + eventName, + this[methodName].bind(this) + ) + } } private buildSchemaObjectRepresentation_(): @@ -204,4 +246,230 @@ export default class IndexModuleService return executableSchema } + + /** + * Example output: + * + * + * ```json + * [ + * { + * "id": "prod_123", + * "entity": "product", + * "status": "pending", + * "fields": ["id"], + * "updated_at": "", + * "last_synced_key": "prod_4321" + * }, + * ... + * ] + * ``` + * @returns Detailed index metadata with the last synced key for each entity + */ + @InjectManager() + async getInfo( + @MedusaContext() sharedContext?: Context + ): Promise { + const listArguments = [ + {} as FilterQuery>, + {} as FindConfig>, + sharedContext, + ] + + const [indexMetadata, indexSync] = await promiseAll([ + this.indexMetadataService_.list(...listArguments) as Promise< + InferEntityType[] + >, + this.indexSyncService_.list(...listArguments) as Promise< + InferEntityType[] + >, + ]) + + const lastEntitySyncedKeyMap = new Map( + indexSync + .filter((sync) => sync.last_key !== null) + .map((sync) => [sync.entity, sync.last_key!]) + ) + + const indexInfo = indexMetadata.map((metadata) => { + return { + id: metadata.id, + entity: metadata.entity, + status: metadata.status, + fields: metadata.fields.split(","), + updated_at: metadata.updated_at, + last_synced_key: lastEntitySyncedKeyMap.get(metadata.entity) ?? null, + } satisfies IndexTypes.IndexInfo + }) + + return indexInfo + } + + async sync({ strategy }: { strategy?: "full" | "reset" } = {}) { + if (strategy && !["full", "reset"].includes(strategy)) { + throw new Error( + `Invalid sync strategy: ${strategy}. Must be "full" or "reset"` + ) + } + + switch (strategy) { + case "full": + await this.fullSync() + break + case "reset": + await this.resetSync() + break + default: + await this.continueSync() + break + } + } + + /** + * Continue the sync of the entities no matter their status + * @param sharedContext + * @returns + */ + private async continueSync() { + if (!this.#isWorkerMode) { + await this.baseRepository_.transaction(async (transactionManager) => { + await this.indexMetadataService_.update( + { + selector: { + status: [ + IndexMetadataStatus.DONE, + IndexMetadataStatus.ERROR, + IndexMetadataStatus.PROCESSING, + ], + }, + data: { + status: IndexMetadataStatus.PENDING, + }, + }, + { transactionManager } + ) + + this.eventBusModuleService_.emit({ + name: IndexModuleService.SyncSubscribersDescriptor.continueSync + .eventName, + data: {}, + options: { + internal: true, + }, + }) + }) + + return + } + + try { + const entities = await this.configurationChecker_.checkChanges() + + if (!entities.length) { + return + } + + return await this.dataSynchronizer_.syncEntities(entities) + } catch (e) { + this.logger_.error(e) + throw new Error("[Index engine] Failed to continue sync") + } + } + + private async fullSync() { + if (!this.#isWorkerMode) { + await this.baseRepository_.transaction(async (transactionManager) => { + await promiseAll([ + this.indexMetadataService_.update( + { + selector: { + status: [ + IndexMetadataStatus.DONE, + IndexMetadataStatus.ERROR, + IndexMetadataStatus.PROCESSING, + ], + }, + data: { + status: IndexMetadataStatus.PENDING, + }, + }, + { transactionManager } + ), + this.indexSyncService_.update( + { + selector: { last_key: { $ne: null } }, + data: { last_key: null }, + }, + { transactionManager } + ), + ]) + + await this.eventBusModuleService_.emit({ + name: IndexModuleService.SyncSubscribersDescriptor.fullSync.eventName, + data: {}, + options: { + internal: true, + }, + }) + }) + + return + } + + try { + const entities = await this.configurationChecker_.checkChanges() + + if (!entities.length) { + return + } + + return await this.dataSynchronizer_.syncEntities(entities) + } catch (e) { + this.logger_.error(e) + throw new Error("[Index engine] Failed to full sync") + } + } + + private async resetSync() { + if (!this.#isWorkerMode) { + await this.baseRepository_.transaction( + async (transactionManager: SqlEntityManager) => { + const truncableTables = [ + toMikroORMEntity(IndexData).prototype, + toMikroORMEntity(IndexRelation).prototype, + toMikroORMEntity(IndexMetadata).prototype, + toMikroORMEntity(IndexSync).prototype, + ].map((table) => table.__helper.__meta.collection) + + await transactionManager.execute( + `TRUNCATE TABLE ${truncableTables.join(", ")} CASCADE` + ) + + await this.eventBusModuleService_.emit({ + name: IndexModuleService.SyncSubscribersDescriptor.resetSync + .eventName, + data: {}, + options: { + internal: true, + }, + }) + } + ) + + return + } + + try { + const changes = await this.configurationChecker_.checkChanges() + + if (!changes.length) { + return + } + + await this.dataSynchronizer_.syncEntities(changes) + } catch (e) { + this.logger_.error(e) + throw new Error("[Index engine] Failed to reset sync") + } + } } diff --git a/packages/modules/index/src/utils/sync/configuration.ts b/packages/modules/index/src/utils/sync/configuration.ts index b6fa7a4fe6..bbc94c1a63 100644 --- a/packages/modules/index/src/utils/sync/configuration.ts +++ b/packages/modules/index/src/utils/sync/configuration.ts @@ -38,7 +38,7 @@ export class Configuration { this.#logger.info("[Index engine] Checking for index changes") const schemaObjectRepresentation = this.#schemaObjectRepresentation - const currentConfig = await this.#indexMetadataService.list() + const currentConfig = await this.#indexMetadataService.list({}) const currentConfigMap = new Map( currentConfig.map((c) => [c.entity, c] as const) )