From 499381d1194b33c8212e6e995a3208364c3ab188 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Wed, 21 May 2025 17:06:35 +0200 Subject: [PATCH] chore(index): Few adjustments (#12557) **What** - Adjust lock duration, it is in seconds and not in ms - Log on lock release - Renew lock separately from the other promises - Add more logs - Log when lock can't be acquired. It can be expected in case two processes try to sync the same entity and in that case it can be ignored. But at least it gives some information in case it happens for another reason - Log when the release lock failed, the lock will remain in the locking provider for 1 minute before being removed. But it wont prevent other entities to be synced --- .changeset/proud-carrots-punch.md | 5 +++ .../__tests__/orchestrator.spec.ts | 32 ++++++++++++------- .../index/src/services/data-synchronizer.ts | 26 +++++++-------- .../index/src/utils/sync/configuration.ts | 6 +++- .../index/src/utils/sync/orchestrator.ts | 31 ++++++++++++++---- 5 files changed, 65 insertions(+), 35 deletions(-) create mode 100644 .changeset/proud-carrots-punch.md diff --git a/.changeset/proud-carrots-punch.md b/.changeset/proud-carrots-punch.md new file mode 100644 index 0000000000..2c4e2101a1 --- /dev/null +++ b/.changeset/proud-carrots-punch.md @@ -0,0 +1,5 @@ +--- +"@medusajs/index": patch +--- + +chore(index): Few adjustments diff --git a/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts b/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts index d0a6446ae8..5b4282773a 100644 --- a/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts @@ -1,5 +1,5 @@ import { asValue } from "awilix" -import { container } from "@medusajs/framework" +import { container, logger } from "@medusajs/framework" import type { IndexTypes } from "@medusajs/types" import { Orchestrator } from "@utils" @@ -12,7 +12,7 @@ function creatingFakeLockingModule() { } this.lockEntities.add(key) }, - release(key: string) { + async release(key: string) { this.lockEntities.delete(key) }, } @@ -55,6 +55,7 @@ describe("Orchestrator", () => { entities.map((e) => e.entity), { lockDuration: 60 * 1000, + logger: logger, } ) @@ -101,6 +102,7 @@ describe("Orchestrator", () => { entities.map((e) => e.entity), { lockDuration: 60 * 1000, + logger: logger, } ) @@ -150,6 +152,7 @@ describe("Orchestrator", () => { entityNames, { lockDuration: 60 * 1000, + logger: logger, } ) @@ -162,6 +165,7 @@ describe("Orchestrator", () => { entityNames, { lockDuration: 60 * 1000, + logger: logger, } ) @@ -169,16 +173,18 @@ describe("Orchestrator", () => { orchestrator.process(taskRunner), orchestrator1.process(taskRunner2), ]) - expect(processedEntities).toEqual([ - { - entity: "brand", - owner: "instance-1", - }, - { - entity: "product", - owner: "instance-2", - }, - ]) + expect(processedEntities).toEqual( + expect.arrayContaining([ + { + entity: "brand", + owner: "instance-1", + }, + { + entity: "product", + owner: "instance-2", + }, + ]) + ) expect(lockingModule.lockEntities.size).toEqual(0) }) @@ -221,6 +227,7 @@ describe("Orchestrator", () => { entities.map((e) => e.entity), { lockDuration: 60 * 1000, + logger: logger, } ) @@ -269,6 +276,7 @@ describe("Orchestrator", () => { entities.map((e) => e.entity), { lockDuration: 60 * 1000, + logger: logger, } ) diff --git a/packages/modules/index/src/services/data-synchronizer.ts b/packages/modules/index/src/services/data-synchronizer.ts index 4723c73862..cc4c94aeea 100644 --- a/packages/modules/index/src/services/data-synchronizer.ts +++ b/packages/modules/index/src/services/data-synchronizer.ts @@ -85,12 +85,13 @@ export class DataSynchronizer { fields: string fields_hash: string }[], - lockDuration: number = 1000 * 60 * 5 + lockDuration: number = 60 // 1 minute ) { this.#isReadyOrThrow() const entitiesToSync = entities.map((entity) => entity.entity) this.#orchestrator = new Orchestrator(this.#locking, entitiesToSync, { lockDuration, + logger: this.#logger, }) await this.#orchestrator.process(this.#taskRunner.bind(this)) } @@ -156,26 +157,23 @@ export class DataSynchronizer { ack: async (ack) => { const endTime = performance.now() const chunkElapsedTime = (endTime - chunkStartTime).toFixed(2) - const promises: Promise[] = [] if (ack.lastCursor) { this.#logger.debug( `[Index engine] syncing entity '${entity}' updating last cursor to ${ack.lastCursor} (+${chunkElapsedTime}ms)` ) - promises.push( - this.#indexSyncService.update({ - data: { - last_key: ack.lastCursor, - }, - selector: { - entity, - }, - }) - ) + await this.#indexSyncService.update({ + data: { + last_key: ack.lastCursor, + }, + selector: { + entity, + }, + }) if (!ack.done && !ack.err) { - promises.push(this.#orchestrator.renewLock(entity)) + await this.#orchestrator.renewLock(entity) } } @@ -192,8 +190,6 @@ export class DataSynchronizer { ) } - await promiseAll(promises) - chunkStartTime = performance.now() }, }) diff --git a/packages/modules/index/src/utils/sync/configuration.ts b/packages/modules/index/src/utils/sync/configuration.ts index dab634b389..b6fa7a4fe6 100644 --- a/packages/modules/index/src/utils/sync/configuration.ts +++ b/packages/modules/index/src/utils/sync/configuration.ts @@ -139,7 +139,11 @@ export class Configuration { } const changes = await this.#indexMetadataService.list({ - status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING], + status: [ + IndexMetadataStatus.PENDING, + IndexMetadataStatus.PROCESSING, + IndexMetadataStatus.ERROR, + ], }) this.#logger.info( diff --git a/packages/modules/index/src/utils/sync/orchestrator.ts b/packages/modules/index/src/utils/sync/orchestrator.ts index cdc63d080f..19cf1c39f5 100644 --- a/packages/modules/index/src/utils/sync/orchestrator.ts +++ b/packages/modules/index/src/utils/sync/orchestrator.ts @@ -1,4 +1,4 @@ -import { ILockingModule } from "@medusajs/types" +import { ILockingModule, Logger } from "@medusajs/types" export class Orchestrator { /** @@ -6,6 +6,11 @@ export class Orchestrator { */ #lockingModule: ILockingModule + /** + * Reference to the logger + */ + #logger: Logger + /** * Owner id when acquiring locks */ @@ -74,11 +79,13 @@ export class Orchestrator { entities: string[], options: { lockDuration: number + logger: Logger } ) { this.#lockingModule = lockingModule this.#entities = entities this.#options = options + this.#logger = options.logger } /** @@ -104,14 +111,14 @@ export class Orchestrator { } /** - * Processes the entity at a given index. If there are no entities + * Processes the entity. If there are no entities * left, the orchestrator state will be set to completed. * * - Task runner is the implementation function to execute a task. * Orchestrator has no inbuilt execution logic and it relies on * the task runner for the same. */ - async #processAtIndex( + async #processEntity( taskRunner: (entity: string) => Promise, entity: string ) { @@ -123,10 +130,20 @@ export class Orchestrator { this.#state = "error" throw error } finally { - await this.#lockingModule.release(entity, { - ownerId: this.#lockingOwner, - }) + await this.#lockingModule + .release(entity, { + ownerId: this.#lockingOwner, + }) + .catch(() => { + this.#logger.error( + `[Index engine] failed to release lock for entity '${entity}'` + ) + }) } + } else { + this.#logger.warn( + `[Index engine] failed to acquire lock for entity '${entity}' on pid ${process.pid}. It means another process is already processing this entity or a lock is still present in your locking provider.` + ) } } @@ -152,7 +169,7 @@ export class Orchestrator { break } - await this.#processAtIndex(taskRunner, entity) + await this.#processEntity(taskRunner, entity) } this.#state = "completed"