diff --git a/.changeset/nice-seas-hunt.md b/.changeset/nice-seas-hunt.md new file mode 100644 index 0000000000..0693cda71e --- /dev/null +++ b/.changeset/nice-seas-hunt.md @@ -0,0 +1,5 @@ +--- +"@medusajs/index": patch +--- + +Chore(index): Sync logs management diff --git a/packages/modules/index/integration-tests/__fixtures__/medusa-config.js b/packages/modules/index/integration-tests/__fixtures__/medusa-config.js index d06a47164b..cd9de6aa02 100644 --- a/packages/modules/index/integration-tests/__fixtures__/medusa-config.js +++ b/packages/modules/index/integration-tests/__fixtures__/medusa-config.js @@ -31,6 +31,7 @@ Object.keys(config.modules).forEach((key) => { config.modules[Modules.INDEX] = { resolve: "@medusajs/index", dependencies: [ + ContainerRegistrationKeys.LOGGER, Modules.EVENT_BUS, Modules.LOCKING, ContainerRegistrationKeys.REMOTE_QUERY, diff --git a/packages/modules/index/integration-tests/__tests__/config-sync.spec.ts b/packages/modules/index/integration-tests/__tests__/config-sync.spec.ts index 6a6dd82747..52b7094a58 100644 --- a/packages/modules/index/integration-tests/__tests__/config-sync.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/config-sync.spec.ts @@ -133,7 +133,7 @@ describe("IndexModuleService syncIndexConfig", function () { afterEach(afterEach_) - it.only("should full sync all entities when the config has changed", async () => { + it("should full sync all entities when the config has changed", async () => { await setTimeout(1000) const currentMetadata = await indexMetadataService.list() @@ -191,6 +191,7 @@ describe("IndexModuleService syncIndexConfig", function () { ;(index as any).buildSchemaObjectRepresentation_() let configurationChecker = new Configuration({ + logger, schemaObjectRepresentation: (index as any).schemaObjectRepresentation_, indexMetadataService, indexSyncService, @@ -277,6 +278,7 @@ describe("IndexModuleService syncIndexConfig", function () { ) configurationChecker = new Configuration({ + logger, schemaObjectRepresentation: (index as any).schemaObjectRepresentation_, indexMetadataService, indexSyncService, diff --git a/packages/modules/index/src/services/data-synchronizer.ts b/packages/modules/index/src/services/data-synchronizer.ts index 837ab26e85..74d40f67ca 100644 --- a/packages/modules/index/src/services/data-synchronizer.ts +++ b/packages/modules/index/src/services/data-synchronizer.ts @@ -15,7 +15,7 @@ import { SchemaObjectEntityRepresentation, } from "@medusajs/types" import { IndexMetadataStatus, Orchestrator } from "@utils" - +import { setTimeout } from "timers/promises" export class DataSynchronizer { #container: Record #isReady: boolean = false @@ -160,6 +160,8 @@ export class DataSynchronizer { } async #taskRunner(entity: string) { + this.#logger.info(`[Index engine] syncing entity '${entity}'`) + const [[lastCursor]] = await promiseAll([ this.#indexSyncService.list( { @@ -176,15 +178,24 @@ export class DataSynchronizer { ), ]) + let startTime = performance.now() + let chunkStartTime = startTime + const finalAcknoledgement = await this.syncEntity({ entityName: entity, pagination: { cursor: lastCursor?.last_key, }, ack: async (ack) => { + const endTime = performance.now() + const chunkElapsedTime = (endTime - chunkStartTime).toFixed(2) const promises: Promise[] = [] if (ack.lastCursor) { + this.#logger.debug( + `[Index engine] syncing entity '${entity}' updating last cursor to ${ack.lastCursor} (+${chunkElapsedTime}ms)` + ) + promises.push( this.#indexSyncService.update({ data: { @@ -201,7 +212,22 @@ export class DataSynchronizer { } } + if (ack.err) { + this.#logger.error( + `[Index engine] syncing entity '${entity}' failed with error (+${chunkElapsedTime}ms):\n${ack.err.message}` + ) + } + + if (ack.done) { + const elapsedTime = (endTime - startTime).toFixed(2) + this.#logger.info( + `[Index engine] syncing entity '${entity}' done (+${elapsedTime}ms)` + ) + } + await promiseAll(promises) + + chunkStartTime = performance.now() }, }) @@ -272,7 +298,7 @@ export class DataSynchronizer { const acknoledgement = { lastCursor: pagination.cursor ?? null, err: new Error( - "Entity does not have a property 'id'. The 'id' must be provided and must be orderable (e.g ulid)" + `Entity ${entityName} does not have a property 'id'. The 'id' must be provided and must be orderable (e.g ulid)` ), } @@ -329,13 +355,11 @@ export class DataSynchronizer { await ack({ lastCursor: currentCursor }) } catch (err) { - this.#logger.error( - `Index engine] sync failed for entity ${entityName}`, - err - ) error = err break } + + await setTimeout(0) } let acknoledgement: { lastCursor: string; done?: boolean; err?: Error } = { diff --git a/packages/modules/index/src/services/index-module-service.ts b/packages/modules/index/src/services/index-module-service.ts index c35d4ec04e..a0369871a4 100644 --- a/packages/modules/index/src/services/index-module-service.ts +++ b/packages/modules/index/src/services/index-module-service.ts @@ -130,6 +130,7 @@ export default class IndexModuleService }) const configurationChecker = new Configuration({ + logger: this.logger_, schemaObjectRepresentation: this.schemaObjectRepresentation_, indexMetadataService: this.indexMetadataService_, indexSyncService: this.indexSyncService_, diff --git a/packages/modules/index/src/utils/sync/configuration.ts b/packages/modules/index/src/utils/sync/configuration.ts index 6cda0a1d46..e3fd2916e0 100644 --- a/packages/modules/index/src/utils/sync/configuration.ts +++ b/packages/modules/index/src/utils/sync/configuration.ts @@ -1,5 +1,5 @@ import { simpleHash } from "@medusajs/framework/utils" -import { IndexTypes, InferEntityType } from "@medusajs/types" +import { IndexTypes, InferEntityType, Logger } from "@medusajs/types" import { IndexMetadata } from "@models" import { schemaObjectRepresentationPropertiesToOmit } from "@types" import { DataSynchronizer } from "../../services/data-synchronizer" @@ -12,25 +12,32 @@ export class Configuration { #indexMetadataService: IndexMetadataService #indexSyncService: IndexSyncService #dataSynchronizer: DataSynchronizer + #logger: Logger constructor({ schemaObjectRepresentation, indexMetadataService, indexSyncService, dataSynchronizer, + logger, }: { schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation indexMetadataService: IndexMetadataService indexSyncService: IndexSyncService dataSynchronizer: DataSynchronizer + logger: Logger }) { this.#schemaObjectRepresentation = schemaObjectRepresentation ?? {} this.#indexMetadataService = indexMetadataService this.#indexSyncService = indexSyncService this.#dataSynchronizer = dataSynchronizer + this.#logger = logger } async checkChanges(): Promise[]> { + this.#logger.info( + "[Index engine] Checking for changes in the index configuration" + ) const schemaObjectRepresentation = this.#schemaObjectRepresentation const currentConfig = await this.#indexMetadataService.list() @@ -135,8 +142,16 @@ export class Configuration { await this.#indexSyncService.upsert(idxSyncData) } - return await this.#indexMetadataService.list({ + const changes = await this.#indexMetadataService.list({ status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING], }) + + this.#logger.info( + `[Index engine] Found ${changes.length} change${ + changes.length > 1 ? "s" : "" + } in the index configuration that are either pending or processing` + ) + + return changes } }