chore(index): config changes (#11121)

Closes FRMW-2884
This commit is contained in:
Carlos R. L. Rodrigues
2025-01-27 09:54:46 -03:00
committed by GitHub
parent 10962a5c54
commit 5093224914
9 changed files with 673 additions and 7 deletions

View File

@@ -0,0 +1,21 @@
export const updateRemovedSchema = `
type Product @Listeners(values: ["product.created", "product.updated", "product.deleted"]) {
id: String
title: String
handle: String
variants: [ProductVariant]
}
type ProductVariant @Listeners(values: ["variant.created", "variant.updated", "variant.deleted"]) {
id: String
product_id: String
sku: String
prices: [Price]
description: String
}
type Price @Listeners(values: ["price.created", "price.updated", "price.deleted"]) {
amount: Float
currency_code: String
}
`

View File

@@ -0,0 +1,30 @@
export const updatedSchema = `
type Product @Listeners(values: ["product.created", "product.updated", "product.deleted"]) {
id: String
title: String
handle: String
deep: InternalNested
variants: [ProductVariant]
}
type InternalNested {
a: Int
obj: InternalObject
}
type InternalObject {
b: Int
}
type ProductVariant @Listeners(values: ["variant.created", "variant.updated", "variant.deleted"]) {
id: String
product_id: String
sku: String
prices: [Price]
}
type Price @Listeners(values: ["price.created", "price.updated", "price.deleted"]) {
amount: Float
currency_code: String
}
`

View File

@@ -0,0 +1,298 @@
import {
configLoader,
container,
logger,
MedusaAppLoader,
} from "@medusajs/framework"
import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk"
import {
ContainerRegistrationKeys,
ModuleRegistrationName,
Modules,
} from "@medusajs/framework/utils"
import { initDb, TestDatabaseUtils } from "@medusajs/test-utils"
import { EntityManager } from "@mikro-orm/postgresql"
import { asValue } from "awilix"
import path from "path"
import { setTimeout } from "timers/promises"
import { EventBusServiceMock } from "../__fixtures__"
import { dbName } from "../__fixtures__/medusa-config"
import { updateRemovedSchema } from "../__fixtures__/update-removed-schema"
import { updatedSchema } from "../__fixtures__/updated-schema"
const eventBusMock = new EventBusServiceMock()
const queryMock = jest.fn().mockReturnValue({
graph: jest.fn(),
})
const dbUtils = TestDatabaseUtils.dbTestUtilFactory()
jest.setTimeout(300000)
let isFirstTime = true
let medusaAppLoader!: MedusaAppLoader
const beforeAll_ = async () => {
try {
await configLoader(
path.join(__dirname, "./../__fixtures__"),
"medusa-config"
)
console.log(`Creating database ${dbName}`)
await dbUtils.create(dbName)
dbUtils.pgConnection_ = await initDb()
container.register({
[ContainerRegistrationKeys.LOGGER]: asValue(logger),
[ContainerRegistrationKeys.QUERY]: asValue(null),
[ContainerRegistrationKeys.PG_CONNECTION]: asValue(dbUtils.pgConnection_),
})
medusaAppLoader = new MedusaAppLoader(container as any)
// Migrations
await medusaAppLoader.runModulesMigrations()
const linkPlanner = await medusaAppLoader.getLinksExecutionPlanner()
const plan = await linkPlanner.createPlan()
await linkPlanner.executePlan(plan)
// Clear partially loaded instances
MedusaModule.clearInstances()
// Bootstrap modules
const globalApp = await medusaAppLoader.load()
const index = container.resolve(Modules.INDEX)
// Mock event bus the index module
;(index as any).eventBusModuleService_ = eventBusMock
await globalApp.onApplicationStart()
;(index as any).storageProvider_.query_ = queryMock
return globalApp
} catch (error) {
console.error("Error initializing", error?.message)
throw error
}
}
const beforeEach_ = async () => {
jest.clearAllMocks()
if (isFirstTime) {
isFirstTime = false
return
}
try {
await medusaAppLoader.runModulesLoader()
} catch (error) {
console.error("Error runner modules loaders", error?.message)
throw error
}
}
const afterEach_ = async () => {
try {
await dbUtils.teardown({ schema: "public" })
} catch (error) {
console.error("Error tearing down database:", error?.message)
throw error
}
}
describe("IndexModuleService syncIndexConfig", function () {
let medusaApp: MedusaAppOutput
let module: any
let manager: EntityManager
let onApplicationPrepareShutdown!: () => Promise<void>
let onApplicationShutdown!: () => Promise<void>
beforeAll(async () => {
medusaApp = await beforeAll_()
onApplicationPrepareShutdown = medusaApp.onApplicationPrepareShutdown
onApplicationShutdown = medusaApp.onApplicationShutdown
})
afterAll(async () => {
await onApplicationPrepareShutdown()
await onApplicationShutdown()
await dbUtils.shutdown(dbName)
})
beforeEach(async () => {
await beforeEach_()
module = medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX)
manager = module.container_.manager as EntityManager
})
afterEach(afterEach_)
it("should full sync all entities when the config has changed", async () => {
await setTimeout(1000)
const currentMetadata = await module.listIndexMetadata()
expect(currentMetadata).toHaveLength(7)
expect(currentMetadata).toEqual(
expect.arrayContaining([
expect.objectContaining({
entity: "InternalObject",
fields: "b",
status: "done",
}),
expect.objectContaining({
entity: "Product",
fields: "id,title",
status: "done",
}),
expect.objectContaining({
entity: "InternalNested",
fields: "a",
status: "done",
}),
expect.objectContaining({
entity: "PriceSet",
fields: "id",
status: "done",
}),
expect.objectContaining({
entity: "Price",
fields: "amount,price_set.id",
status: "done",
}),
expect.objectContaining({
entity: "ProductVariant",
fields: "id,product.id,product_id,sku",
status: "done",
}),
expect.objectContaining({
entity: "LinkProductVariantPriceSet",
fields: "id,price_set_id,variant_id",
status: "done",
}),
])
)
// update config schema
module.schemaObjectRepresentation_ = null
module.moduleOptions_ ??= {}
module.moduleOptions_.schema = updatedSchema
module.buildSchemaObjectRepresentation_()
const syncRequired = await module.syncIndexConfig()
expect(syncRequired).toHaveLength(2)
expect(syncRequired).toEqual(
expect.arrayContaining([
expect.objectContaining({
entity: "Product",
fields: "handle,id,title",
status: "pending",
}),
expect.objectContaining({
entity: "Price",
fields: "amount,currency_code,price_set.id",
status: "pending",
}),
])
)
const updatedMetadata = await module.listIndexMetadata()
expect(updatedMetadata).toHaveLength(7)
expect(updatedMetadata).toEqual(
expect.arrayContaining([
expect.objectContaining({
entity: "InternalObject",
fields: "b",
status: "done",
}),
expect.objectContaining({
entity: "Product",
fields: "handle,id,title",
status: "pending",
}),
expect.objectContaining({
entity: "InternalNested",
fields: "a",
status: "done",
}),
expect.objectContaining({
entity: "PriceSet",
fields: "id",
status: "done",
}),
expect.objectContaining({
entity: "Price",
fields: "amount,currency_code,price_set.id",
status: "pending",
}),
expect.objectContaining({
entity: "ProductVariant",
fields: "id,product.id,product_id,sku",
status: "done",
}),
expect.objectContaining({
entity: "LinkProductVariantPriceSet",
fields: "id,price_set_id,variant_id",
status: "done",
}),
])
)
await module.syncEntities(syncRequired)
// Sync again removing entities not linked
module.schemaObjectRepresentation_ = null
module.moduleOptions_ ??= {}
module.moduleOptions_.schema = updateRemovedSchema
module.buildSchemaObjectRepresentation_()
const syncRequired2 = await module.syncIndexConfig()
expect(syncRequired2).toHaveLength(1)
expect(syncRequired2).toEqual(
expect.arrayContaining([
expect.objectContaining({
entity: "ProductVariant",
fields: "description,id,product.id,product_id,sku",
status: "pending",
}),
])
)
const updatedMetadata2 = await module.listIndexMetadata()
expect(updatedMetadata2).toHaveLength(5)
expect(updatedMetadata2).toEqual(
expect.arrayContaining([
expect.objectContaining({
entity: "Product",
fields: "handle,id,title",
status: "done",
}),
expect.objectContaining({
entity: "PriceSet",
fields: "id",
status: "done",
}),
expect.objectContaining({
entity: "Price",
fields: "amount,currency_code,price_set.id",
status: "done",
}),
expect.objectContaining({
entity: "ProductVariant",
fields: "description,id,product.id,product_id,sku",
status: "pending",
}),
expect.objectContaining({
entity: "LinkProductVariantPriceSet",
fields: "id,price_set_id,variant_id",
status: "done",
}),
])
)
})
})

View File

@@ -74,6 +74,7 @@
"keyName": "IDX_index_data_deleted_at",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_data_deleted_at\" ON \"index_data\" (deleted_at) WHERE deleted_at IS NULL"
@@ -82,6 +83,7 @@
"keyName": "IDX_index_data_gin",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_data_gin\" ON \"index_data\" USING GIN (data) WHERE deleted_at IS NULL"
@@ -90,6 +92,7 @@
"keyName": "IDX_index_data_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_data_id\" ON \"index_data\" (id) WHERE deleted_at IS NULL"
@@ -98,6 +101,7 @@
"keyName": "IDX_index_data_name",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_data_name\" ON \"index_data\" (name) WHERE deleted_at IS NULL"
@@ -109,22 +113,147 @@
"name"
],
"composite": true,
"constraint": true,
"primary": true,
"unique": true
}
],
"checks": [],
"foreignKeys": {}
"foreignKeys": {},
"nativeEnums": {}
},
{
"columns": {
"id": {
"name": "id",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"mappedType": "text"
},
"entity": {
"name": "entity",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"mappedType": "text"
},
"fields": {
"name": "fields",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"mappedType": "text"
},
"fields_hash": {
"name": "fields_hash",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"mappedType": "text"
},
"status": {
"name": "status",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"default": "'pending'",
"enumItems": [
"pending",
"processing",
"done",
"error"
],
"mappedType": "enum"
},
"created_at": {
"name": "created_at",
"type": "timestamptz",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"length": 6,
"default": "now()",
"mappedType": "datetime"
},
"updated_at": {
"name": "updated_at",
"type": "timestamptz",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"length": 6,
"default": "now()",
"mappedType": "datetime"
},
"deleted_at": {
"name": "deleted_at",
"type": "timestamptz",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": true,
"length": 6,
"mappedType": "datetime"
}
},
"name": "index_metadata",
"schema": "public",
"indexes": [
{
"keyName": "IDX_index_metadata_deleted_at",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_metadata_deleted_at\" ON \"index_metadata\" (deleted_at) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_index_metadata_entity",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_index_metadata_entity\" ON \"index_metadata\" (entity) WHERE deleted_at IS NULL"
},
{
"keyName": "index_metadata_pkey",
"columnNames": [
"id"
],
"composite": false,
"constraint": true,
"primary": true,
"unique": true
}
],
"checks": [],
"foreignKeys": {},
"nativeEnums": {}
},
{
"columns": {
"id": {
"name": "id",
"type": "serial",
"unsigned": true,
"unsigned": false,
"autoincrement": true,
"primary": true,
"nullable": true,
"nullable": false,
"mappedType": "integer"
},
"pivot": {
@@ -212,6 +341,7 @@
"keyName": "IDX_index_relation_parent_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_relation_parent_id\" ON \"index_relation\" (parent_id) WHERE deleted_at IS NULL"
@@ -220,6 +350,7 @@
"keyName": "IDX_index_relation_child_id",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_relation_child_id\" ON \"index_relation\" (child_id) WHERE deleted_at IS NULL"
@@ -228,6 +359,7 @@
"keyName": "IDX_index_relation_deleted_at",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_index_relation_deleted_at\" ON \"index_relation\" (deleted_at) WHERE deleted_at IS NULL"
@@ -238,12 +370,15 @@
"id"
],
"composite": false,
"constraint": true,
"primary": true,
"unique": true
}
],
"checks": [],
"foreignKeys": {}
"foreignKeys": {},
"nativeEnums": {}
}
]
],
"nativeEnums": {}
}

View File

@@ -0,0 +1,19 @@
import { Migration } from "@mikro-orm/migrations"
export class Migration20250122154720 extends Migration {
override async up(): Promise<void> {
this.addSql(
`create table if not exists "index_metadata" ("id" text not null, "entity" text not null, "fields" text not null, "fields_hash" text not null, "status" text check ("status" in ('pending', 'processing', 'done', 'error')) not null default 'pending', "created_at" timestamptz not null default now(), "updated_at" timestamptz not null default now(), "deleted_at" timestamptz null, constraint "index_metadata_pkey" primary key ("id"));`
)
this.addSql(
`CREATE INDEX IF NOT EXISTS "IDX_index_metadata_deleted_at" ON "index_metadata" (deleted_at) WHERE deleted_at IS NULL;`
)
this.addSql(
`CREATE UNIQUE INDEX IF NOT EXISTS "IDX_index_metadata_entity" ON "index_metadata" (entity) WHERE deleted_at IS NULL;`
)
}
override async down(): Promise<void> {
this.addSql(`drop table if exists "index_metadata" cascade;`)
}
}

View File

@@ -0,0 +1,22 @@
import { model } from "@medusajs/framework/utils"
import { IndexMetadataStatus } from "../utils/index-metadata-status"
const IndexMetadata = model
.define("IndexMetadata", {
id: model.id().primaryKey(),
entity: model.text(),
fields: model.text(),
fields_hash: model.text(),
status: model
.enum(IndexMetadataStatus)
.default(IndexMetadataStatus.PENDING),
})
.indexes([
{
name: "IDX_index_metadata_entity",
on: ["entity"],
unique: true,
},
])
export default IndexMetadata

View File

@@ -1,2 +1,3 @@
export { default as IndexData } from "./index-data"
export { default as IndexMetadata } from "./index-metadata"
export { default as IndexRelation } from "./index-relation"

View File

@@ -3,17 +3,22 @@ import {
IEventBusModuleService,
IndexTypes,
InternalModuleDeclaration,
ModulesSdkTypes,
RemoteQueryFunction,
} from "@medusajs/framework/types"
import {
ContainerRegistrationKeys,
MikroOrmBaseRepository as BaseRepository,
ContainerRegistrationKeys,
Modules,
ModulesSdkUtils,
simpleHash,
} from "@medusajs/framework/utils"
import { IndexMetadata } from "@models"
import { schemaObjectRepresentationPropertiesToOmit } from "@types"
import { buildSchemaObjectRepresentation } from "../utils/build-config"
import { defaultSchema } from "../utils/default-schema"
import { gqlSchemaToTypes } from "../utils/gql-to-types"
import { IndexMetadataStatus } from "../utils/index-metadata-status"
type InjectedDependencies = {
[Modules.EVENT_BUS]: IEventBusModuleService
@@ -21,9 +26,15 @@ type InjectedDependencies = {
[ContainerRegistrationKeys.QUERY]: RemoteQueryFunction
storageProviderCtrOptions: unknown
baseRepository: BaseRepository
indexMetadataService: ModulesSdkTypes.IMedusaInternalService<any>
}
export default class IndexModuleService implements IndexTypes.IIndexService {
export default class IndexModuleService
extends ModulesSdkUtils.MedusaService({
IndexMetadata,
})
implements IndexTypes.IIndexService
{
private readonly container_: InjectedDependencies
private readonly moduleOptions_: IndexTypes.IndexModuleOptions
@@ -37,16 +48,21 @@ export default class IndexModuleService implements IndexTypes.IIndexService {
protected storageProvider_: IndexTypes.StorageProvider
private indexMetadataService_: ModulesSdkTypes.IMedusaInternalService<any>
constructor(
container: InjectedDependencies,
protected readonly moduleDeclaration: InternalModuleDeclaration
) {
super(...arguments)
this.container_ = container
this.moduleOptions_ = (moduleDeclaration.options ??
moduleDeclaration) as unknown as IndexTypes.IndexModuleOptions
const {
[Modules.EVENT_BUS]: eventBusModuleService,
indexMetadataService,
storageProviderCtr,
storageProviderCtrOptions,
} = container
@@ -54,6 +70,7 @@ export default class IndexModuleService implements IndexTypes.IIndexService {
this.eventBusModuleService_ = eventBusModuleService
this.storageProviderCtr_ = storageProviderCtr
this.storageProviderCtrOptions_ = storageProviderCtrOptions
this.indexMetadataService_ = indexMetadataService
if (!this.eventBusModuleService_) {
throw new Error(
@@ -88,11 +105,128 @@ export default class IndexModuleService implements IndexTypes.IIndexService {
}
await gqlSchemaToTypes(this.moduleOptions_.schema ?? defaultSchema)
const fullSyncRequired = await this.syncIndexConfig()
if (fullSyncRequired.length > 0) {
await this.syncEntities(fullSyncRequired)
}
} catch (e) {
console.log(e)
}
}
private async syncIndexConfig() {
const schemaObjectRepresentation = (this.schemaObjectRepresentation_ ??
{}) as IndexTypes.SchemaObjectRepresentation
const currentConfig = await this.indexMetadataService_.list()
const currentConfigMap = new Map(
currentConfig.map((c) => [c.entity, c] as const)
)
type modifiedConfig = {
id?: string
entity: string
fields: string[]
fields_hash: string
status?: IndexMetadataStatus
}[]
const entityPresent = new Set<string>()
const newConfig: modifiedConfig = []
const updatedConfig: modifiedConfig = []
const deletedConfig: { entity: string }[] = []
for (const [entityName, schemaEntityObjectRepresentation] of Object.entries(
schemaObjectRepresentation
)) {
if (schemaObjectRepresentationPropertiesToOmit.includes(entityName)) {
continue
}
const entity = schemaEntityObjectRepresentation.entity
const fields = schemaEntityObjectRepresentation.fields.sort().join(",")
const fields_hash = simpleHash(fields)
const existingEntityConfig = currentConfigMap.get(entity)
entityPresent.add(entity)
if (
!existingEntityConfig ||
existingEntityConfig.fields_hash !== fields_hash
) {
const meta = {
id: existingEntityConfig?.id,
entity,
fields,
fields_hash,
}
if (!existingEntityConfig) {
newConfig.push(meta)
} else {
updatedConfig.push({
...meta,
status: IndexMetadataStatus.PENDING,
})
}
}
}
for (const [entity] of currentConfigMap) {
if (!entityPresent.has(entity)) {
deletedConfig.push({ entity })
}
}
if (newConfig.length > 0) {
await this.indexMetadataService_.create(newConfig)
}
if (updatedConfig.length > 0) {
await this.indexMetadataService_.update(updatedConfig)
}
if (deletedConfig.length > 0) {
await this.indexMetadataService_.delete(deletedConfig)
}
return await this.indexMetadataService_.list({
status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING],
})
}
private async syncEntities(
entities: {
entity: string
fields: string[]
fields_hash: string
}[]
) {
const updatedStatus = async (
entity: string,
status: IndexMetadataStatus
) => {
await this.indexMetadataService_.update({
data: {
status,
},
selector: {
entity,
},
})
}
for (const entity of entities) {
await updatedStatus(entity.entity, IndexMetadataStatus.PROCESSING)
try {
// await this.syncEntity(entity)
await updatedStatus(entity.entity, IndexMetadataStatus.DONE)
} catch (e) {
await updatedStatus(entity.entity, IndexMetadataStatus.ERROR)
}
}
}
async query<const TEntry extends string>(
config: IndexTypes.IndexQueryConfig<TEntry>
): Promise<IndexTypes.QueryResultSet<TEntry>> {

View File

@@ -0,0 +1,6 @@
export enum IndexMetadataStatus {
PENDING = "pending",
PROCESSING = "processing",
DONE = "done",
ERROR = "error",
}