Feat/index sync data (#11169)

**what**
Synchronisation process  implementation for configured entity to be indexed
This commit is contained in:
Adrien de Peretti
2025-01-27 14:56:12 +01:00
committed by GitHub
parent 5093224914
commit ea402875a5
7 changed files with 685 additions and 33 deletions

View File

@@ -50,7 +50,7 @@ export type RemoteQueryInput<TEntry extends string> = {
/**
* The number of items to skip before retrieving the returned items.
*/
skip: number
skip?: number
/**
* The maximum number of items to return.
*/

View File

@@ -0,0 +1,486 @@
import {
configLoader,
container,
logger,
MedusaAppLoader,
} from "@medusajs/framework"
import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk"
import { IndexTypes, InferEntityType } from "@medusajs/framework/types"
import {
ContainerRegistrationKeys,
ModuleRegistrationName,
Modules,
toMikroORMEntity,
} from "@medusajs/framework/utils"
import { initDb, TestDatabaseUtils } from "@medusajs/test-utils"
import { asValue } from "awilix"
import * as path from "path"
import { DataSynchronizer } from "../../src/utils/sync/data-synchronizer"
import { EventBusServiceMock } from "../__fixtures__"
import { dbName } from "../__fixtures__/medusa-config"
import { EntityManager } from "@mikro-orm/postgresql"
import { IndexData, IndexRelation } from "@models"
const eventBusMock = new EventBusServiceMock()
const queryMock = {
graph: jest.fn(),
}
const dbUtils = TestDatabaseUtils.dbTestUtilFactory()
jest.setTimeout(30000)
const testProductId = "test_prod_1"
const testProductId2 = "test_prod_2"
const testVariantId = "test_var_1"
const testVariantId2 = "test_var_2"
const mockData = [
{
id: testProductId,
title: "Test Product",
updated_at: new Date(),
},
{
id: testProductId2,
title: "Test Product",
updated_at: new Date(),
},
{
id: testVariantId,
title: "Test Variant",
product: {
id: testProductId,
},
updated_at: new Date(),
},
{
id: testVariantId2,
title: "Test Variant 2",
product: {
id: testProductId2,
},
updated_at: new Date(),
},
]
let medusaAppLoader!: MedusaAppLoader
let index!: IndexTypes.IIndexService
const beforeAll_ = async () => {
try {
await configLoader(
path.join(__dirname, "./../__fixtures__"),
"medusa-config"
)
console.log(`Creating database ${dbName}`)
await dbUtils.create(dbName)
dbUtils.pgConnection_ = await initDb()
container.register({
[ContainerRegistrationKeys.LOGGER]: asValue(logger),
[ContainerRegistrationKeys.QUERY]: asValue(null),
[ContainerRegistrationKeys.PG_CONNECTION]: asValue(dbUtils.pgConnection_),
})
medusaAppLoader = new MedusaAppLoader(container as any)
// Migrations
await medusaAppLoader.runModulesMigrations()
const linkPlanner = await medusaAppLoader.getLinksExecutionPlanner()
const plan = await linkPlanner.createPlan()
await linkPlanner.executePlan(plan)
// Clear partially loaded instances
MedusaModule.clearInstances()
// Bootstrap modules
const globalApp = await medusaAppLoader.load()
index = container.resolve(Modules.INDEX)
// Mock event bus the index module
;(index as any).eventBusModuleService_ = eventBusMock
await globalApp.onApplicationStart()
;(index as any).storageProvider_.query_ = queryMock
return globalApp
} catch (error) {
console.error("Error initializing", error?.message)
throw error
}
}
describe("DataSynchronizer", () => {
let index: IndexTypes.IIndexService
let dataSynchronizer: DataSynchronizer
let medusaApp: MedusaAppOutput
let onApplicationPrepareShutdown!: () => Promise<void>
let onApplicationShutdown!: () => Promise<void>
let manager: EntityManager
beforeAll(async () => {
medusaApp = await beforeAll_()
onApplicationPrepareShutdown = medusaApp.onApplicationPrepareShutdown
onApplicationShutdown = medusaApp.onApplicationShutdown
manager = (
medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) as any
).container_.manager as EntityManager
})
afterAll(async () => {
await onApplicationPrepareShutdown()
await onApplicationShutdown()
await dbUtils.shutdown(dbName)
})
beforeEach(async () => {
jest.clearAllMocks()
index = container.resolve(Modules.INDEX)
const productSchemaObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation =
{
fields: ["id", "title", "updated_at"],
alias: "product",
moduleConfig: {
linkableKeys: {
id: "Product",
product_id: "Product",
product_variant_id: "ProductVariant",
},
},
entity: "Product",
parents: [],
listeners: ["product.created"],
}
const productVariantSchemaObjectRepresentation: IndexTypes.SchemaObjectEntityRepresentation =
{
fields: ["id", "title", "product.id", "updated_at"],
alias: "product_variant",
moduleConfig: {
linkableKeys: {
id: "ProductVariant",
product_id: "Product",
product_variant_id: "ProductVariant",
},
},
entity: "ProductVariant",
parents: [
{
ref: productSchemaObjectRepresentation,
inSchemaRef: productSchemaObjectRepresentation,
targetProp: "id",
},
],
listeners: ["product-variant.created"],
}
const mockSchemaRepresentation = {
product: productSchemaObjectRepresentation,
product_variant: productVariantSchemaObjectRepresentation,
}
dataSynchronizer = new DataSynchronizer({
storageProvider: (index as any).storageProvider_,
schemaObjectRepresentation: mockSchemaRepresentation,
query: queryMock as any,
})
})
describe("sync", () => {
it("should sync products data correctly", async () => {
// Mock query response for products
queryMock.graph.mockImplementation(async (config) => {
if (Array.isArray(config.filters.id)) {
if (config.filters.id.includes(testProductId)) {
return {
data: [mockData[0]],
}
} else if (config.filters.id.includes(testProductId2)) {
return {
data: [mockData[1]],
}
}
}
if (Object.keys(config.filters).length === 0) {
return {
data: [mockData[0]],
}
} else if (config.filters.id["$gt"] === mockData[0].id) {
return {
data: [mockData[1]],
}
}
return {
data: [],
}
})
const ackMock = jest.fn()
const result = await dataSynchronizer.sync({
entityName: "product",
ack: ackMock,
})
// First loop fetching products
expect(queryMock.graph).toHaveBeenNthCalledWith(1, {
entity: "product",
fields: ["id"],
filters: {},
pagination: {
order: {
id: "asc",
},
take: 1000,
},
})
// First time fetching product data for creation from the storage provider
expect(queryMock.graph).toHaveBeenNthCalledWith(2, {
entity: "product",
filters: {
id: [testProductId],
},
fields: ["id", "title", "updated_at"],
})
// Second loop fetching products
expect(queryMock.graph).toHaveBeenNthCalledWith(3, {
entity: "product",
fields: ["id"],
filters: {
id: {
$gt: testProductId,
},
},
pagination: {
order: {
id: "asc",
},
take: 1000,
},
})
// Second time fetching product data for creation from the storage provider
expect(queryMock.graph).toHaveBeenNthCalledWith(4, {
entity: "product",
filters: {
id: [testProductId2],
},
fields: ["id", "title", "updated_at"],
})
expect(ackMock).toHaveBeenNthCalledWith(1, {
lastCursor: testProductId,
})
expect(ackMock).toHaveBeenNthCalledWith(2, {
lastCursor: testProductId2,
})
expect(ackMock).toHaveBeenNthCalledWith(3, {
lastCursor: testProductId2,
done: true,
})
expect(result).toEqual({
lastCursor: testProductId2,
done: true,
})
const indexData = await manager.find<InferEntityType<IndexData>>(
toMikroORMEntity(IndexData),
{}
)
const indexRelationData = await manager.find(
toMikroORMEntity(IndexRelation),
{}
)
expect(indexData).toHaveLength(2)
expect(indexData[0].id).toEqual(testProductId)
expect(indexData[1].id).toEqual(testProductId2)
expect(indexRelationData).toHaveLength(0)
})
})
it("should sync products and product variants data correctly", async () => {
// Mock query response for products
queryMock.graph.mockImplementation(async (config) => {
if (config.entity === "product") {
if (Array.isArray(config.filters.id)) {
if (config.filters.id.includes(testProductId)) {
return {
data: [mockData[0]],
}
} else if (config.filters.id.includes(testProductId2)) {
return {
data: [mockData[1]],
}
}
}
if (Object.keys(config.filters).length === 0) {
return {
data: [mockData[0]],
}
} else if (config.filters.id["$gt"] === mockData[0].id) {
return {
data: [mockData[1]],
}
}
}
if (config.entity === "product_variant") {
if (Array.isArray(config.filters.id)) {
if (config.filters.id.includes(testVariantId)) {
return {
data: [mockData[2]],
}
} else if (config.filters.id.includes(testVariantId2)) {
return {
data: [mockData[3]],
}
}
}
if (Object.keys(config.filters).length === 0) {
return {
data: [mockData[2]],
}
} else if (config.filters.id["$gt"] === mockData[2].id) {
return {
data: [mockData[3]],
}
}
}
return {
data: [],
}
})
const ackMock = jest.fn()
await dataSynchronizer.sync({
entityName: "product",
ack: ackMock,
})
jest.clearAllMocks()
const result = await dataSynchronizer.sync({
entityName: "product_variant",
ack: ackMock,
})
// First loop fetching product variants
expect(queryMock.graph).toHaveBeenNthCalledWith(1, {
entity: "product_variant",
fields: ["id"],
filters: {},
pagination: {
order: {
id: "asc",
},
take: 1000,
},
})
// First time fetching product variant data for creation from the storage provider
expect(queryMock.graph).toHaveBeenNthCalledWith(2, {
entity: "product_variant",
filters: {
id: [testVariantId],
},
fields: ["id", "title", "product.id", "updated_at"],
})
// Second loop fetching product variants
expect(queryMock.graph).toHaveBeenNthCalledWith(3, {
entity: "product_variant",
fields: ["id"],
filters: {
id: {
$gt: testVariantId,
},
},
pagination: {
order: {
id: "asc",
},
take: 1000,
},
})
// Second time fetching product variant data for creation from the storage provider
expect(queryMock.graph).toHaveBeenNthCalledWith(4, {
entity: "product_variant",
filters: {
id: [testVariantId2],
},
fields: ["id", "title", "product.id", "updated_at"],
})
expect(ackMock).toHaveBeenNthCalledWith(1, {
lastCursor: testVariantId,
})
expect(ackMock).toHaveBeenNthCalledWith(2, {
lastCursor: testVariantId2,
})
expect(ackMock).toHaveBeenNthCalledWith(3, {
lastCursor: testVariantId2,
done: true,
})
expect(result).toEqual({
lastCursor: testVariantId2,
done: true,
})
const indexData = await manager.find<InferEntityType<IndexData>>(
toMikroORMEntity(IndexData),
{}
)
const indexRelationData = await manager.find<
InferEntityType<IndexRelation>
>(toMikroORMEntity(IndexRelation), {})
expect(indexData).toHaveLength(4)
expect(indexData[0].id).toEqual(testProductId)
expect(indexData[1].id).toEqual(testProductId2)
expect(indexData[2].id).toEqual(testVariantId)
expect(indexData[3].id).toEqual(testVariantId2)
expect(indexRelationData).toHaveLength(2)
expect(indexRelationData[0]).toEqual(
expect.objectContaining({
parent_id: testProductId,
child_id: testVariantId,
parent_name: "Product",
child_name: "ProductVariant",
pivot: "Product-ProductVariant",
})
)
expect(indexRelationData[1]).toEqual(
expect.objectContaining({
parent_id: testProductId2,
child_id: testVariantId2,
parent_name: "Product",
child_name: "ProductVariant",
pivot: "Product-ProductVariant",
})
)
})
// TODO: Add tests for errors handling and failure handling
})

View File

@@ -1,7 +1,5 @@
{
"namespaces": [
"public"
],
"namespaces": ["public"],
"name": "public",
"tables": [
{
@@ -108,10 +106,7 @@
},
{
"keyName": "index_data_pkey",
"columnNames": [
"id",
"name"
],
"columnNames": ["id", "name"],
"composite": true,
"constraint": true,
"primary": true,
@@ -168,12 +163,7 @@
"primary": false,
"nullable": false,
"default": "'pending'",
"enumItems": [
"pending",
"processing",
"done",
"error"
],
"enumItems": ["pending", "processing", "done", "error"],
"mappedType": "enum"
},
"created_at": {
@@ -232,9 +222,7 @@
},
{
"keyName": "index_metadata_pkey",
"columnNames": [
"id"
],
"columnNames": ["id"],
"composite": false,
"constraint": true,
"primary": true,
@@ -366,9 +354,7 @@
},
{
"keyName": "index_relation_pkey",
"columnNames": [
"id"
],
"columnNames": ["id"],
"composite": false,
"constraint": true,
"primary": true,

View File

@@ -0,0 +1,21 @@
import { Migration } from "@mikro-orm/migrations"
export class Migration20250127105159 extends Migration {
override async up(): Promise<void> {
this.addSql(
`alter table if exists "index_relation" alter column "id" set not null;`
)
this.addSql(
`alter table if exists "index_relation" add constraint "IDX_index_relation_id_pivot_parent_name_child_name_parent_id_child_id_unique" unique ("parent_id", "child_id", "child_name", "parent_name", "pivot");`
)
}
override async down(): Promise<void> {
this.addSql(
`alter table if exists "index_relation" drop constraint "IDX_index_relation_id_pivot_parent_name_child_name_parent_id_child_id_unique";`
)
this.addSql(
`alter table if exists "index_relation" alter column "id" drop not null;`
)
}
}

View File

@@ -8,4 +8,5 @@ const IndexRelation = model.define("IndexRelation", {
child_name: model.text(),
child_id: model.text().index("IDX_index_relation_child_id"),
})
export default IndexRelation

View File

@@ -14,7 +14,11 @@ import {
MedusaContext,
toMikroORMEntity,
} from "@medusajs/framework/utils"
import { EntityManager, SqlEntityManager } from "@mikro-orm/postgresql"
import {
EntityManager,
EntityRepository,
SqlEntityManager,
} from "@mikro-orm/postgresql"
import { IndexData, IndexRelation } from "@models"
import { createPartitions, QueryBuilder } from "../utils"
import { flattenObjectKeys } from "../utils/flatten-object-keys"
@@ -204,13 +208,14 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
}
const { fields, alias } = schemaEntityObjectRepresentation
const { data: entityData } = await this.query_.graph({
const graphResult = await this.query_.graph({
entity: alias,
filters: {
id: ids,
},
fields: [...new Set(["id", ...fields])],
})
const { data: entityData } = graphResult
const argument = {
entity: schemaEntityObjectRepresentation.entity,
@@ -340,7 +345,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
transactionManager: SqlEntityManager
}
const indexRepository = em.getRepository(toMikroORMEntity(IndexData))
const indexRelationRepository = em.getRepository(
const indexRelationRepository: EntityRepository<any> = em.getRepository(
toMikroORMEntity(IndexRelation)
)
@@ -369,6 +374,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
id: cleanedEntityData.id,
name: entity,
data: cleanedEntityData,
// stale: false,
})
/**
@@ -394,18 +400,29 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
id: (parentData_ as any).id,
name: parentEntity,
data: parentData_,
// stale: false,
})
const parentIndexRelationEntry = indexRelationRepository.create({
parent_id: (parentData_ as any).id,
parent_name: parentEntity,
child_id: cleanedEntityData.id,
child_name: entity,
pivot: `${parentEntity}-${entity}`,
})
indexRelationRepository
.getEntityManager()
.persist(parentIndexRelationEntry)
await indexRelationRepository.upsert(
{
parent_id: (parentData_ as any).id,
parent_name: parentEntity,
child_id: cleanedEntityData.id,
child_name: entity,
pivot: `${parentEntity}-${entity}`,
// stale: false,
},
{
onConflictAction: "merge",
onConflictFields: [
"pivot",
"parent_id",
"child_id",
"parent_name",
"child_name",
],
}
)
}
}
}
@@ -453,6 +470,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
acc[property] = entityData[property]
return acc
}, {}),
// stale: false,
}
})
)
@@ -608,6 +626,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
id: cleanedEntityData.id,
name: entity,
data: cleanedEntityData,
// stale: false,
})
/**
@@ -620,6 +639,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
child_id: cleanedEntityData.id,
child_name: entity,
pivot: `${parentEntityName}-${entity}`,
// stale: false,
})
const childIndexRelationEntry = indexRelationRepository.create({
@@ -628,6 +648,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
child_id: entityData[childPropertyId] as string,
child_name: childEntityName,
pivot: `${entity}-${childEntityName}`,
// stale: false,
})
indexRelationRepository

View File

@@ -0,0 +1,137 @@
import {
IndexTypes,
RemoteQueryFunction,
SchemaObjectEntityRepresentation,
Event,
} from "@medusajs/framework/types"
import { CommonEvents } from "@medusajs/framework/utils"
export class DataSynchronizer {
#storageProvider: IndexTypes.StorageProvider
#schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation
#query: RemoteQueryFunction
constructor({
storageProvider,
schemaObjectRepresentation,
query,
}: {
storageProvider: IndexTypes.StorageProvider
schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation
query: RemoteQueryFunction
}) {
this.#storageProvider = storageProvider
this.#schemaObjectRepresentation = schemaObjectRepresentation
this.#query = query
}
async sync({
entityName,
pagination = {},
ack,
}: {
entityName: string
pagination?: {
cursor?: string
updated_at?: string | Date
limit?: number
batchSize?: number
}
ack: (ack: {
lastCursor: string | null
done?: boolean
err?: Error
}) => Promise<void>
}) {
const schemaEntityObjectRepresentation = this.#schemaObjectRepresentation[
entityName
] as SchemaObjectEntityRepresentation
const { fields, alias, moduleConfig } = schemaEntityObjectRepresentation
const entityPrimaryKey = fields.find(
(field) => !!moduleConfig.linkableKeys?.[field]
)
if (!entityPrimaryKey) {
void ack({
lastCursor: pagination.cursor ?? null,
err: new Error(
`Entity ${entityName} does not have a linkable primary key`
),
})
return
}
let processed = 0
let currentCursor = pagination.cursor!
const batchSize = pagination.batchSize ?? 1000
const limit = pagination.limit ?? Infinity
let done = false
let error = null
while (processed < limit || !done) {
const filters: Record<string, any> = {}
if (currentCursor) {
filters[entityPrimaryKey] = { $gt: currentCursor }
}
if (pagination.updated_at) {
filters["updated_at"] = { $gt: pagination.updated_at }
}
const { data } = await this.#query.graph({
entity: alias,
fields: [entityPrimaryKey],
filters,
pagination: {
order: {
[entityPrimaryKey]: "asc",
},
take: batchSize,
},
})
done = !data.length
if (done) {
break
}
const envelop: Event = {
data,
name: `*.${CommonEvents.CREATED}`,
}
try {
await this.#storageProvider.consumeEvent(
schemaEntityObjectRepresentation
)(envelop)
currentCursor = data[data.length - 1][entityPrimaryKey]
processed += data.length
void ack({ lastCursor: currentCursor })
} catch (err) {
error = err
break
}
}
let acknoledgement: { lastCursor: string; done?: boolean; err?: Error } = {
lastCursor: currentCursor,
done: true,
}
if (error) {
acknoledgement = {
lastCursor: currentCursor,
err: error,
}
void ack(acknoledgement)
return acknoledgement
}
void ack(acknoledgement)
return acknoledgement
}
}