feat(index): Add http/module api to interact with the index engine (#13869)

* feat(index): Add support to trigger sync manually

* feat(index): Add API route to interact with

* feat(index): Add API route to interact with

* feat(index): Add API route to interact with

* test(): Add http integration tests

* Create weak-elephants-reply.md
This commit is contained in:
Adrien de Peretti
2025-10-28 20:31:39 +01:00
committed by GitHub
parent 540ae996ff
commit 85b1f3d43a
19 changed files with 1061 additions and 7 deletions

View File

@@ -1,7 +1,12 @@
import { SqlEntityManager } from "@medusajs/framework/mikro-orm/postgresql"
import {
Constructor,
Context,
FilterQuery,
FindConfig,
IEventBusModuleService,
IndexTypes,
InferEntityType,
InternalModuleDeclaration,
Logger,
ModulesSdkTypes,
@@ -11,15 +16,21 @@ import {
MikroOrmBaseRepository as BaseRepository,
ContainerRegistrationKeys,
GraphQLUtils,
InjectManager,
MedusaContext,
Modules,
ModulesSdkUtils,
promiseAll,
toMikroORMEntity,
} from "@medusajs/framework/utils"
import { IndexData, IndexMetadata, IndexRelation, IndexSync } from "@models"
import { schemaObjectRepresentationPropertiesToOmit } from "@types"
import {
buildSchemaObjectRepresentation,
Configuration,
defaultSchema,
gqlSchemaToTypes,
IndexMetadataStatus,
} from "@utils"
import { baseGraphqlSchema } from "../utils/base-graphql-schema"
import { DataSynchronizer } from "./data-synchronizer"
@@ -42,6 +53,17 @@ export default class IndexModuleService
{
#isWorkerMode: boolean = false
private static readonly SyncSubscribersDescriptor = {
continueSync: {
eventName: "index.continue-sync",
methodName: "continueSync",
},
fullSync: { eventName: "index.full-sync", methodName: "fullSync" },
resetSync: { eventName: "index.reset-sync", methodName: "resetSync" },
} as const
private readonly baseRepository_: BaseRepository
private readonly container_: InjectedDependencies
private readonly moduleOptions_: IndexTypes.IndexModuleOptions
@@ -55,6 +77,8 @@ export default class IndexModuleService
protected storageProvider_: IndexTypes.StorageProvider
private configurationChecker_: Configuration
private get indexMetadataService_(): ModulesSdkTypes.IMedusaInternalService<any> {
return this.container_.indexMetadataService
}
@@ -77,12 +101,15 @@ export default class IndexModuleService
constructor(
container: InjectedDependencies,
moduleOptions: IndexTypes.IndexModuleOptions,
protected readonly moduleDeclaration: InternalModuleDeclaration
) {
super(...arguments)
this.baseRepository_ = container.baseRepository
this.container_ = container
this.moduleOptions_ = (moduleDeclaration.options ??
this.moduleOptions_ = (moduleOptions ??
moduleDeclaration.options ??
moduleDeclaration) as unknown as IndexTypes.IndexModuleOptions
this.#isWorkerMode = moduleDeclaration.worker_mode !== "server"
@@ -140,7 +167,7 @@ export default class IndexModuleService
storageProvider: this.storageProvider_,
})
const configurationChecker = new Configuration({
this.configurationChecker_ = new Configuration({
logger: this.logger_,
schemaObjectRepresentation: this.schemaObjectRepresentation_,
indexMetadataService: this.indexMetadataService_,
@@ -148,7 +175,7 @@ export default class IndexModuleService
dataSynchronizer: this.dataSynchronizer_,
})
const entitiesMetadataChanged =
await configurationChecker.checkChanges()
await this.configurationChecker_.checkChanges()
if (entitiesMetadataChanged.length) {
await this.dataSynchronizer_.syncEntities(entitiesMetadataChanged)
@@ -166,9 +193,14 @@ export default class IndexModuleService
}
protected registerListeners() {
if (!this.#isWorkerMode) {
return
}
const schemaObjectRepresentation = (this.schemaObjectRepresentation_ ??
{}) as IndexTypes.SchemaObjectRepresentation
// Register entity event listeners
for (const [entityName, schemaEntityObjectRepresentation] of Object.entries(
schemaObjectRepresentation
)) {
@@ -185,6 +217,16 @@ export default class IndexModuleService
)
})
}
// Register sync subscribers
for (const { eventName, methodName } of Object.values(
IndexModuleService.SyncSubscribersDescriptor
)) {
this.eventBusModuleService_.subscribe(
eventName,
this[methodName].bind(this)
)
}
}
private buildSchemaObjectRepresentation_():
@@ -204,4 +246,230 @@ export default class IndexModuleService
return executableSchema
}
/**
* Example output:
*
*
* ```json
* [
* {
* "id": "prod_123",
* "entity": "product",
* "status": "pending",
* "fields": ["id"],
* "updated_at": "<timestamp of last indexed data>",
* "last_synced_key": "prod_4321"
* },
* ...
* ]
* ```
* @returns Detailed index metadata with the last synced key for each entity
*/
@InjectManager()
async getInfo(
@MedusaContext() sharedContext?: Context
): Promise<IndexTypes.IndexInfo[]> {
const listArguments = [
{} as FilterQuery<InferEntityType<typeof IndexMetadata>>,
{} as FindConfig<InferEntityType<typeof IndexMetadata>>,
sharedContext,
]
const [indexMetadata, indexSync] = await promiseAll([
this.indexMetadataService_.list(...listArguments) as Promise<
InferEntityType<typeof IndexMetadata>[]
>,
this.indexSyncService_.list(...listArguments) as Promise<
InferEntityType<typeof IndexSync>[]
>,
])
const lastEntitySyncedKeyMap = new Map<string, string>(
indexSync
.filter((sync) => sync.last_key !== null)
.map((sync) => [sync.entity, sync.last_key!])
)
const indexInfo = indexMetadata.map((metadata) => {
return {
id: metadata.id,
entity: metadata.entity,
status: metadata.status,
fields: metadata.fields.split(","),
updated_at: metadata.updated_at,
last_synced_key: lastEntitySyncedKeyMap.get(metadata.entity) ?? null,
} satisfies IndexTypes.IndexInfo
})
return indexInfo
}
async sync({ strategy }: { strategy?: "full" | "reset" } = {}) {
if (strategy && !["full", "reset"].includes(strategy)) {
throw new Error(
`Invalid sync strategy: ${strategy}. Must be "full" or "reset"`
)
}
switch (strategy) {
case "full":
await this.fullSync()
break
case "reset":
await this.resetSync()
break
default:
await this.continueSync()
break
}
}
/**
* Continue the sync of the entities no matter their status
* @param sharedContext
* @returns
*/
private async continueSync() {
if (!this.#isWorkerMode) {
await this.baseRepository_.transaction(async (transactionManager) => {
await this.indexMetadataService_.update(
{
selector: {
status: [
IndexMetadataStatus.DONE,
IndexMetadataStatus.ERROR,
IndexMetadataStatus.PROCESSING,
],
},
data: {
status: IndexMetadataStatus.PENDING,
},
},
{ transactionManager }
)
this.eventBusModuleService_.emit({
name: IndexModuleService.SyncSubscribersDescriptor.continueSync
.eventName,
data: {},
options: {
internal: true,
},
})
})
return
}
try {
const entities = await this.configurationChecker_.checkChanges()
if (!entities.length) {
return
}
return await this.dataSynchronizer_.syncEntities(entities)
} catch (e) {
this.logger_.error(e)
throw new Error("[Index engine] Failed to continue sync")
}
}
private async fullSync() {
if (!this.#isWorkerMode) {
await this.baseRepository_.transaction(async (transactionManager) => {
await promiseAll([
this.indexMetadataService_.update(
{
selector: {
status: [
IndexMetadataStatus.DONE,
IndexMetadataStatus.ERROR,
IndexMetadataStatus.PROCESSING,
],
},
data: {
status: IndexMetadataStatus.PENDING,
},
},
{ transactionManager }
),
this.indexSyncService_.update(
{
selector: { last_key: { $ne: null } },
data: { last_key: null },
},
{ transactionManager }
),
])
await this.eventBusModuleService_.emit({
name: IndexModuleService.SyncSubscribersDescriptor.fullSync.eventName,
data: {},
options: {
internal: true,
},
})
})
return
}
try {
const entities = await this.configurationChecker_.checkChanges()
if (!entities.length) {
return
}
return await this.dataSynchronizer_.syncEntities(entities)
} catch (e) {
this.logger_.error(e)
throw new Error("[Index engine] Failed to full sync")
}
}
private async resetSync() {
if (!this.#isWorkerMode) {
await this.baseRepository_.transaction(
async (transactionManager: SqlEntityManager) => {
const truncableTables = [
toMikroORMEntity(IndexData).prototype,
toMikroORMEntity(IndexRelation).prototype,
toMikroORMEntity(IndexMetadata).prototype,
toMikroORMEntity(IndexSync).prototype,
].map((table) => table.__helper.__meta.collection)
await transactionManager.execute(
`TRUNCATE TABLE ${truncableTables.join(", ")} CASCADE`
)
await this.eventBusModuleService_.emit({
name: IndexModuleService.SyncSubscribersDescriptor.resetSync
.eventName,
data: {},
options: {
internal: true,
},
})
}
)
return
}
try {
const changes = await this.configurationChecker_.checkChanges()
if (!changes.length) {
return
}
await this.dataSynchronizer_.syncEntities(changes)
} catch (e) {
this.logger_.error(e)
throw new Error("[Index engine] Failed to reset sync")
}
}
}

View File

@@ -38,7 +38,7 @@ export class Configuration {
this.#logger.info("[Index engine] Checking for index changes")
const schemaObjectRepresentation = this.#schemaObjectRepresentation
const currentConfig = await this.#indexMetadataService.list()
const currentConfig = await this.#indexMetadataService.list({})
const currentConfigMap = new Map(
currentConfig.map((c) => [c.entity, c] as const)
)