diff --git a/.changeset/proud-carrots-punch.md b/.changeset/proud-carrots-punch.md new file mode 100644 index 0000000000..2c4e2101a1 --- /dev/null +++ b/.changeset/proud-carrots-punch.md @@ -0,0 +1,5 @@ +--- +"@medusajs/index": patch +--- + +chore(index): Few adjustments diff --git a/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts b/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts index d0a6446ae8..5b4282773a 100644 --- a/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts +++ b/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts @@ -1,5 +1,5 @@ import { asValue } from "awilix" -import { container } from "@medusajs/framework" +import { container, logger } from "@medusajs/framework" import type { IndexTypes } from "@medusajs/types" import { Orchestrator } from "@utils" @@ -12,7 +12,7 @@ function creatingFakeLockingModule() { } this.lockEntities.add(key) }, - release(key: string) { + async release(key: string) { this.lockEntities.delete(key) }, } @@ -55,6 +55,7 @@ describe("Orchestrator", () => { entities.map((e) => e.entity), { lockDuration: 60 * 1000, + logger: logger, } ) @@ -101,6 +102,7 @@ describe("Orchestrator", () => { entities.map((e) => e.entity), { lockDuration: 60 * 1000, + logger: logger, } ) @@ -150,6 +152,7 @@ describe("Orchestrator", () => { entityNames, { lockDuration: 60 * 1000, + logger: logger, } ) @@ -162,6 +165,7 @@ describe("Orchestrator", () => { entityNames, { lockDuration: 60 * 1000, + logger: logger, } ) @@ -169,16 +173,18 @@ describe("Orchestrator", () => { orchestrator.process(taskRunner), orchestrator1.process(taskRunner2), ]) - expect(processedEntities).toEqual([ - { - entity: "brand", - owner: "instance-1", - }, - { - entity: "product", - owner: "instance-2", - }, - ]) + expect(processedEntities).toEqual( + expect.arrayContaining([ + { + entity: "brand", + owner: "instance-1", + }, + { + entity: "product", + owner: "instance-2", + }, + ]) + ) expect(lockingModule.lockEntities.size).toEqual(0) }) @@ -221,6 +227,7 @@ describe("Orchestrator", () => { entities.map((e) => e.entity), { lockDuration: 60 * 1000, + logger: logger, } ) @@ -269,6 +276,7 @@ describe("Orchestrator", () => { entities.map((e) => e.entity), { lockDuration: 60 * 1000, + logger: logger, } ) diff --git a/packages/modules/index/src/services/data-synchronizer.ts b/packages/modules/index/src/services/data-synchronizer.ts index 4723c73862..cc4c94aeea 100644 --- a/packages/modules/index/src/services/data-synchronizer.ts +++ b/packages/modules/index/src/services/data-synchronizer.ts @@ -85,12 +85,13 @@ export class DataSynchronizer { fields: string fields_hash: string }[], - lockDuration: number = 1000 * 60 * 5 + lockDuration: number = 60 // 1 minute ) { this.#isReadyOrThrow() const entitiesToSync = entities.map((entity) => entity.entity) this.#orchestrator = new Orchestrator(this.#locking, entitiesToSync, { lockDuration, + logger: this.#logger, }) await this.#orchestrator.process(this.#taskRunner.bind(this)) } @@ -156,26 +157,23 @@ export class DataSynchronizer { ack: async (ack) => { const endTime = performance.now() const chunkElapsedTime = (endTime - chunkStartTime).toFixed(2) - const promises: Promise[] = [] if (ack.lastCursor) { this.#logger.debug( `[Index engine] syncing entity '${entity}' updating last cursor to ${ack.lastCursor} (+${chunkElapsedTime}ms)` ) - promises.push( - this.#indexSyncService.update({ - data: { - last_key: ack.lastCursor, - }, - selector: { - entity, - }, - }) - ) + await this.#indexSyncService.update({ + data: { + last_key: ack.lastCursor, + }, + selector: { + entity, + }, + }) if (!ack.done && !ack.err) { - promises.push(this.#orchestrator.renewLock(entity)) + await this.#orchestrator.renewLock(entity) } } @@ -192,8 +190,6 @@ export class DataSynchronizer { ) } - await promiseAll(promises) - chunkStartTime = performance.now() }, }) diff --git a/packages/modules/index/src/utils/sync/configuration.ts b/packages/modules/index/src/utils/sync/configuration.ts index dab634b389..b6fa7a4fe6 100644 --- a/packages/modules/index/src/utils/sync/configuration.ts +++ b/packages/modules/index/src/utils/sync/configuration.ts @@ -139,7 +139,11 @@ export class Configuration { } const changes = await this.#indexMetadataService.list({ - status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING], + status: [ + IndexMetadataStatus.PENDING, + IndexMetadataStatus.PROCESSING, + IndexMetadataStatus.ERROR, + ], }) this.#logger.info( diff --git a/packages/modules/index/src/utils/sync/orchestrator.ts b/packages/modules/index/src/utils/sync/orchestrator.ts index cdc63d080f..19cf1c39f5 100644 --- a/packages/modules/index/src/utils/sync/orchestrator.ts +++ b/packages/modules/index/src/utils/sync/orchestrator.ts @@ -1,4 +1,4 @@ -import { ILockingModule } from "@medusajs/types" +import { ILockingModule, Logger } from "@medusajs/types" export class Orchestrator { /** @@ -6,6 +6,11 @@ export class Orchestrator { */ #lockingModule: ILockingModule + /** + * Reference to the logger + */ + #logger: Logger + /** * Owner id when acquiring locks */ @@ -74,11 +79,13 @@ export class Orchestrator { entities: string[], options: { lockDuration: number + logger: Logger } ) { this.#lockingModule = lockingModule this.#entities = entities this.#options = options + this.#logger = options.logger } /** @@ -104,14 +111,14 @@ export class Orchestrator { } /** - * Processes the entity at a given index. If there are no entities + * Processes the entity. If there are no entities * left, the orchestrator state will be set to completed. * * - Task runner is the implementation function to execute a task. * Orchestrator has no inbuilt execution logic and it relies on * the task runner for the same. */ - async #processAtIndex( + async #processEntity( taskRunner: (entity: string) => Promise, entity: string ) { @@ -123,10 +130,20 @@ export class Orchestrator { this.#state = "error" throw error } finally { - await this.#lockingModule.release(entity, { - ownerId: this.#lockingOwner, - }) + await this.#lockingModule + .release(entity, { + ownerId: this.#lockingOwner, + }) + .catch(() => { + this.#logger.error( + `[Index engine] failed to release lock for entity '${entity}'` + ) + }) } + } else { + this.#logger.warn( + `[Index engine] failed to acquire lock for entity '${entity}' on pid ${process.pid}. It means another process is already processing this entity or a lock is still present in your locking provider.` + ) } } @@ -152,7 +169,7 @@ export class Orchestrator { break } - await this.#processAtIndex(taskRunner, entity) + await this.#processEntity(taskRunner, entity) } this.#state = "completed"