diff --git a/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts b/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts new file mode 100644 index 0000000000..b560d17aff --- /dev/null +++ b/packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts @@ -0,0 +1,244 @@ +import { asValue } from "awilix" +import { container } from "@medusajs/framework" +import type { IndexTypes } from "@medusajs/types" +import { Orchestrator } from "../../src/orchestrator" + +function creatingFakeLockingModule() { + return { + lockEntities: new Set(), + acquire(key: string) { + if (this.lockEntities.has(key)) { + throw new Error("Lock already exists") + } + this.lockEntities.add(key) + }, + release(key: string) { + this.lockEntities.delete(key) + }, + } +} + +describe("Orchestrator", () => { + test("process each entity via the task runner", async () => { + const processedEntities: string[] = [] + const lockingModule = creatingFakeLockingModule() + + const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [ + { + entity: "brand", + alias: "brand", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + { + entity: "product", + alias: "product", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + ] + + container.register({ + locking: asValue(lockingModule), + }) + + const orchestrator = new Orchestrator(container, entities, { + lockDuration: 60 * 1000, + async taskRunner(entity) { + expect(orchestrator.state).toEqual("processing") + processedEntities.push(entity.entity) + }, + }) + + await orchestrator.process() + expect(lockingModule.lockEntities.size).toEqual(0) + expect(orchestrator.state).toEqual("completed") + expect(processedEntities).toEqual(["brand", "product"]) + }) + + test("do not process tasks when unable to acquire lock", async () => { + const processedEntities: string[] = [] + const lockingModule = creatingFakeLockingModule() + + const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [ + { + entity: "brand", + alias: "brand", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + { + entity: "product", + alias: "product", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + ] + + container.register({ + locking: asValue({ + ...lockingModule, + acquire() { + throw new Error("Unable to acquire lock") + }, + }), + }) + + const orchestrator = new Orchestrator(container, entities, { + lockDuration: 60 * 1000, + async taskRunner(entity) { + processedEntities.push(entity.entity) + }, + }) + + await orchestrator.process() + expect(processedEntities).toEqual([]) + }) + + test("share tasks between multiple instances", async () => { + const processedEntities: { owner: string; entity: string }[] = [] + const lockingModule = creatingFakeLockingModule() + + const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [ + { + entity: "brand", + alias: "brand", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + { + entity: "product", + alias: "product", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + ] + + container.register({ + locking: asValue(lockingModule), + }) + + const orchestrator = new Orchestrator(container, entities, { + lockDuration: 60 * 1000, + async taskRunner(entity) { + processedEntities.push({ entity: entity.entity, owner: "instance-1" }) + }, + }) + const orchestrator1 = new Orchestrator(container, entities, { + lockDuration: 60 * 1000, + async taskRunner(entity) { + processedEntities.push({ entity: entity.entity, owner: "instance-2" }) + }, + }) + + await Promise.all([orchestrator.process(), orchestrator1.process()]) + expect(processedEntities).toEqual([ + { + entity: "brand", + owner: "instance-1", + }, + { + entity: "product", + owner: "instance-2", + }, + ]) + expect(lockingModule.lockEntities.size).toEqual(0) + }) + + test("stop processing when task runner throws error", async () => { + const processedEntities: string[] = [] + const lockingModule = creatingFakeLockingModule() + + const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [ + { + entity: "brand", + alias: "brand", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + { + entity: "product", + alias: "product", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + ] + + container.register({ + locking: asValue(lockingModule), + }) + + const orchestrator = new Orchestrator(container, entities, { + lockDuration: 60 * 1000, + async taskRunner(entity) { + if (entity.entity === "product") { + throw new Error("Cannot process") + } + processedEntities.push(entity.entity) + }, + }) + + await expect(orchestrator.process()).rejects.toThrow("Cannot process") + expect(orchestrator.state).toEqual("error") + expect(processedEntities).toEqual(["brand"]) + expect(lockingModule.lockEntities.size).toEqual(0) + }) + + test("throw error when the same instance is executed to process tasks parallely", async () => { + const processedEntities: string[] = [] + const lockingModule = creatingFakeLockingModule() + + const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [ + { + entity: "brand", + alias: "brand", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + { + entity: "product", + alias: "product", + fields: ["*"], + listeners: [], + moduleConfig: {}, + parents: [], + }, + ] + + container.register({ + locking: asValue(lockingModule), + }) + + const orchestrator = new Orchestrator(container, entities, { + lockDuration: 60 * 1000, + async taskRunner(entity) { + expect(orchestrator.state).toEqual("processing") + processedEntities.push(entity.entity) + }, + }) + + await expect( + Promise.all([orchestrator.process(), orchestrator.process()]) + ).rejects.toThrow("Cannot re-run an already running orchestrator instance") + + expect(lockingModule.lockEntities.size).toEqual(0) + }) +}) diff --git a/packages/modules/index/src/orchestrator/index.ts b/packages/modules/index/src/orchestrator/index.ts new file mode 100644 index 0000000000..ce9f796695 --- /dev/null +++ b/packages/modules/index/src/orchestrator/index.ts @@ -0,0 +1,149 @@ +import { ILockingModule, IndexTypes, MedusaContainer } from "@medusajs/types" + +export class Orchestrator { + /** + * Reference to the locking module + */ + #lockingModule: ILockingModule + + /** + * Owner id when acquiring locks + */ + #lockingOwner = `index-sync-${process.pid}` + + /** + * The current state of the orchestrator + * + * - In "idle" state, one can call the "run" method. + * - In "processing" state, the orchestrator is looping over the entities + * and processing them. + * - In "completed" state, the provided entities have been processed. + * - The "error" state is set when the task runner throws an error. + */ + #state: "idle" | "processing" | "completed" | "error" = "idle" + + /** + * Options for the locking module and the task runner to execute the + * task. + * + * - Lock duration is the maximum duration for which to hold the lock. + * After this the lock will be removed. + * + * - Task runner is the implementation function to execute a task. + * Orchestrator has no inbuilt execution logic and it relies on + * the task runner for the same. + * + * The entity is provided to the taskRunner only when the orchestrator + * is able to acquire a lock. + */ + #options: { + lockDuration: number + taskRunner: ( + entity: IndexTypes.SchemaObjectEntityRepresentation + ) => Promise + } + + /** + * Index of the entity that is currently getting processed. + */ + #currentIndex: number = 0 + + /** + * Collection of entities to process in sequence. A lock is obtained + * while an entity is getting synced to avoid multiple processes + * from syncing the same entity + */ + #entities: IndexTypes.SchemaObjectEntityRepresentation[] = [] + + /** + * The current state of the orchestrator + */ + get state() { + return this.#state + } + + /** + * Reference to the currently processed entity + */ + get current() { + return this.#entities[this.#currentIndex] + } + + /** + * Reference to the number of entities left for processing + */ + get remainingCount() { + return this.#entities.length - (this.#currentIndex + 1) + } + + constructor( + container: MedusaContainer, + entities: IndexTypes.SchemaObjectEntityRepresentation[], + options: { + lockDuration: number + taskRunner: ( + entity: IndexTypes.SchemaObjectEntityRepresentation + ) => Promise + } + ) { + this.#lockingModule = container.resolve("locking") + this.#entities = entities + this.#options = options + } + + /** + * Acquires using the lock module. + */ + async #acquireLock(forKey: string): Promise { + try { + await this.#lockingModule.acquire(forKey, { + expire: this.#options.lockDuration, + ownerId: this.#lockingOwner, + }) + return true + } catch { + return false + } + } + + /** + * Processes the entity at a given index. If there are no entities + * left, the orchestrator state will be set to completed. + */ + async #processAtIndex(index: number) { + const entity = this.#entities[index] + if (!entity) { + this.#state = "completed" + return + } + + this.#currentIndex = index + const lockAcquired = await this.#acquireLock(entity.entity) + if (lockAcquired) { + try { + await this.#options.taskRunner(entity) + } catch (error) { + this.#state = "error" + throw error + } finally { + await this.#lockingModule.release(entity.entity, { + ownerId: this.#lockingOwner, + }) + } + } + + return this.#processAtIndex(index + 1) + } + + /** + * Run the orchestrator to process the entities one by one. + */ + async process() { + if (this.state !== "idle") { + throw new Error("Cannot re-run an already running orchestrator instance") + } + + this.#state = "processing" + return this.#processAtIndex(0) + } +}