Chore(index): Sync logs management (#11522)

**What**
- Add index engine sync log information
- Ad `setTimeout(0)` to give breath to the event loop and ensuring not blocking the event loop and allow for other tasks queue execution to happen while syncing

here is an example:
**LOG_LEVEL=info**

![Screenshot 2025-02-19 at 10 09 25](https://github.com/user-attachments/assets/fc74dc32-1bc1-4123-9de3-f37817b7e783)

**LOG_LEVEL=debug**

![Screenshot 2025-02-19 at 10 10 35](https://github.com/user-attachments/assets/222a1ce1-9267-4cb0-9518-dc4c7aa2b6f4)
This commit is contained in:
Adrien de Peretti
2025-02-19 12:54:38 +01:00
committed by GitHub
parent 647077057c
commit 77f37c5f97
6 changed files with 57 additions and 9 deletions

View File

@@ -31,6 +31,7 @@ Object.keys(config.modules).forEach((key) => {
config.modules[Modules.INDEX] = {
resolve: "@medusajs/index",
dependencies: [
ContainerRegistrationKeys.LOGGER,
Modules.EVENT_BUS,
Modules.LOCKING,
ContainerRegistrationKeys.REMOTE_QUERY,

View File

@@ -133,7 +133,7 @@ describe("IndexModuleService syncIndexConfig", function () {
afterEach(afterEach_)
it.only("should full sync all entities when the config has changed", async () => {
it("should full sync all entities when the config has changed", async () => {
await setTimeout(1000)
const currentMetadata = await indexMetadataService.list()
@@ -191,6 +191,7 @@ describe("IndexModuleService syncIndexConfig", function () {
;(index as any).buildSchemaObjectRepresentation_()
let configurationChecker = new Configuration({
logger,
schemaObjectRepresentation: (index as any).schemaObjectRepresentation_,
indexMetadataService,
indexSyncService,
@@ -277,6 +278,7 @@ describe("IndexModuleService syncIndexConfig", function () {
)
configurationChecker = new Configuration({
logger,
schemaObjectRepresentation: (index as any).schemaObjectRepresentation_,
indexMetadataService,
indexSyncService,

View File

@@ -15,7 +15,7 @@ import {
SchemaObjectEntityRepresentation,
} from "@medusajs/types"
import { IndexMetadataStatus, Orchestrator } from "@utils"
import { setTimeout } from "timers/promises"
export class DataSynchronizer {
#container: Record<string, any>
#isReady: boolean = false
@@ -160,6 +160,8 @@ export class DataSynchronizer {
}
async #taskRunner(entity: string) {
this.#logger.info(`[Index engine] syncing entity '${entity}'`)
const [[lastCursor]] = await promiseAll([
this.#indexSyncService.list(
{
@@ -176,15 +178,24 @@ export class DataSynchronizer {
),
])
let startTime = performance.now()
let chunkStartTime = startTime
const finalAcknoledgement = await this.syncEntity({
entityName: entity,
pagination: {
cursor: lastCursor?.last_key,
},
ack: async (ack) => {
const endTime = performance.now()
const chunkElapsedTime = (endTime - chunkStartTime).toFixed(2)
const promises: Promise<any>[] = []
if (ack.lastCursor) {
this.#logger.debug(
`[Index engine] syncing entity '${entity}' updating last cursor to ${ack.lastCursor} (+${chunkElapsedTime}ms)`
)
promises.push(
this.#indexSyncService.update({
data: {
@@ -201,7 +212,22 @@ export class DataSynchronizer {
}
}
if (ack.err) {
this.#logger.error(
`[Index engine] syncing entity '${entity}' failed with error (+${chunkElapsedTime}ms):\n${ack.err.message}`
)
}
if (ack.done) {
const elapsedTime = (endTime - startTime).toFixed(2)
this.#logger.info(
`[Index engine] syncing entity '${entity}' done (+${elapsedTime}ms)`
)
}
await promiseAll(promises)
chunkStartTime = performance.now()
},
})
@@ -272,7 +298,7 @@ export class DataSynchronizer {
const acknoledgement = {
lastCursor: pagination.cursor ?? null,
err: new Error(
"Entity does not have a property 'id'. The 'id' must be provided and must be orderable (e.g ulid)"
`Entity ${entityName} does not have a property 'id'. The 'id' must be provided and must be orderable (e.g ulid)`
),
}
@@ -329,13 +355,11 @@ export class DataSynchronizer {
await ack({ lastCursor: currentCursor })
} catch (err) {
this.#logger.error(
`Index engine] sync failed for entity ${entityName}`,
err
)
error = err
break
}
await setTimeout(0)
}
let acknoledgement: { lastCursor: string; done?: boolean; err?: Error } = {

View File

@@ -130,6 +130,7 @@ export default class IndexModuleService
})
const configurationChecker = new Configuration({
logger: this.logger_,
schemaObjectRepresentation: this.schemaObjectRepresentation_,
indexMetadataService: this.indexMetadataService_,
indexSyncService: this.indexSyncService_,

View File

@@ -1,5 +1,5 @@
import { simpleHash } from "@medusajs/framework/utils"
import { IndexTypes, InferEntityType } from "@medusajs/types"
import { IndexTypes, InferEntityType, Logger } from "@medusajs/types"
import { IndexMetadata } from "@models"
import { schemaObjectRepresentationPropertiesToOmit } from "@types"
import { DataSynchronizer } from "../../services/data-synchronizer"
@@ -12,25 +12,32 @@ export class Configuration {
#indexMetadataService: IndexMetadataService
#indexSyncService: IndexSyncService
#dataSynchronizer: DataSynchronizer
#logger: Logger
constructor({
schemaObjectRepresentation,
indexMetadataService,
indexSyncService,
dataSynchronizer,
logger,
}: {
schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation
indexMetadataService: IndexMetadataService
indexSyncService: IndexSyncService
dataSynchronizer: DataSynchronizer
logger: Logger
}) {
this.#schemaObjectRepresentation = schemaObjectRepresentation ?? {}
this.#indexMetadataService = indexMetadataService
this.#indexSyncService = indexSyncService
this.#dataSynchronizer = dataSynchronizer
this.#logger = logger
}
async checkChanges(): Promise<InferEntityType<typeof IndexMetadata>[]> {
this.#logger.info(
"[Index engine] Checking for changes in the index configuration"
)
const schemaObjectRepresentation = this.#schemaObjectRepresentation
const currentConfig = await this.#indexMetadataService.list()
@@ -135,8 +142,16 @@ export class Configuration {
await this.#indexSyncService.upsert(idxSyncData)
}
return await this.#indexMetadataService.list({
const changes = await this.#indexMetadataService.list({
status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING],
})
this.#logger.info(
`[Index engine] Found ${changes.length} change${
changes.length > 1 ? "s" : ""
} in the index configuration that are either pending or processing`
)
return changes
}
}